blob: 66ac7b042af660547af74b66cabd6c3cd617ae99 [file] [log] [blame]
/*
* Copyright (c) 2010-2013, 2016, 2018 Eike Stepper (Loehne, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.emf.cdo.tests;
import org.eclipse.emf.cdo.common.CDOCommonRepository;
import org.eclipse.emf.cdo.common.branch.CDOBranch;
import org.eclipse.emf.cdo.common.commit.CDOCommitInfo;
import org.eclipse.emf.cdo.common.revision.CDORevision;
import org.eclipse.emf.cdo.common.revision.CDORevisionUtil;
import org.eclipse.emf.cdo.common.util.CDOCommonUtil;
import org.eclipse.emf.cdo.internal.server.mem.MEMStore;
import org.eclipse.emf.cdo.server.IRepository;
import org.eclipse.emf.cdo.server.mem.IMEMStore;
import org.eclipse.emf.cdo.session.CDOSession;
import org.eclipse.emf.cdo.session.CDOSessionInvalidationEvent;
import org.eclipse.emf.cdo.spi.server.InternalRepository;
import org.eclipse.emf.cdo.spi.server.InternalStore;
import org.eclipse.emf.cdo.spi.server.InternalSynchronizableRepository;
import org.eclipse.emf.cdo.tests.config.IRepositoryConfig;
import org.eclipse.emf.cdo.tests.config.impl.RepositoryConfig.OfflineConfig;
import org.eclipse.emf.cdo.transaction.CDOTransaction;
import org.eclipse.emf.cdo.util.CDOUtil;
import org.eclipse.emf.cdo.util.CommitException;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.event.IEvent;
import org.eclipse.net4j.util.io.IOUtil;
import org.eclipse.net4j.util.tests.TestListener;
import org.eclipse.emf.ecore.EObject;
import java.util.List;
import java.util.Map;
/**
* @author Eike Stepper
*/
public abstract class AbstractSyncingTest extends AbstractCDOTest
{
protected static final long SLEEP_MILLIS = 100;
private static final boolean VERBOSE_WAIT = false;
public OfflineConfig getOfflineConfig()
{
return (OfflineConfig)getRepositoryConfig();
}
@Override
public boolean isValid()
{
IRepositoryConfig repositoryConfig = getRepositoryConfig();
return repositoryConfig instanceof OfflineConfig;
}
@Override
public InternalSynchronizableRepository getRepository()
{
return (InternalSynchronizableRepository)super.getRepository();
}
@Override
public synchronized Map<String, Object> getTestProperties()
{
Map<String, Object> testProperties = super.getTestProperties();
testProperties.put(OfflineConfig.PROP_TEST_RAW_REPLICATION, isRawReplication());
testProperties.put(OfflineConfig.PROP_TEST_DELAYED_COMMIT_HANDLING, getTestDelayedCommitHandling());
testProperties.put(OfflineConfig.PROP_TEST_DELAYED2_COMMIT_HANDLING, getTestDelayed2CommitHandling());
testProperties.put(OfflineConfig.PROP_TEST_FAILOVER, isFailover());
testProperties.put(OfflineConfig.PROP_TEST_HINDER_INITIAL_REPLICATION, isHinderInitialReplication());
return testProperties;
}
protected boolean isFailover()
{
return false;
}
protected boolean isRawReplication()
{
return true;
}
protected long getTestDelayedCommitHandling()
{
return 0L;
}
protected long getTestDelayed2CommitHandling()
{
return 0L;
}
protected boolean isHinderInitialReplication()
{
return false;
}
protected static void checkEvent(final TestListener listener, final int newPackageUnits, final int newObjects, final int changedObjects,
final int detachedObjects) throws InterruptedException
{
try
{
final int[] counts = { 0, 0, 0, 0 };
new PollingTimeOuter()
{
@Override
protected boolean successful()
{
IEvent[] events = listener.getEvents();
for (IEvent event : events)
{
if (event instanceof CDOSessionInvalidationEvent)
{
CDOSessionInvalidationEvent e = (CDOSessionInvalidationEvent)event;
counts[0] += e.getNewPackageUnits().size();
counts[1] += e.getNewObjects().size();
counts[2] += e.getChangedObjects().size();
counts[3] += e.getDetachedObjects().size();
}
}
return counts[0] >= newPackageUnits && counts[1] >= newObjects && counts[2] >= changedObjects && counts[3] >= detachedObjects;
}
}.assertNoTimeOut();
assertEquals(newPackageUnits, counts[0]);
assertEquals(newObjects, counts[1]);
assertEquals(changedObjects, counts[2]);
assertEquals(detachedObjects, counts[3]);
}
finally
{
listener.clearEvents();
}
}
protected static void checkRevision(EObject object, InternalRepository repository, String location)
{
CDORevision revision = CDOUtil.getCDOObject(object).cdoRevision();
CDOBranch branch = repository.getBranchManager().getBranch(revision.getBranch().getID());
// Check if revision arrived in cache
checkRevision(revision, branch, repository.getRevisionManager().getCache().getAllRevisions(), location + " cache");
// Check if revision arrived in store
InternalStore store = repository.getStore();
if (store instanceof IMEMStore)
{
checkRevision(revision, branch, ((IMEMStore)store).getAllRevisions(), location + " store");
}
}
protected static void checkRevision(CDORevision revision, CDOBranch branch, Map<CDOBranch, List<CDORevision>> allRevisions, String location)
{
List<CDORevision> revisions = allRevisions.get(branch);
for (CDORevision rev : revisions)
{
if (rev.getID() == revision.getID() && rev.getBranch().getID() == revision.getBranch().getID() && rev.getVersion() == revision.getVersion())
{
return;
}
}
fail("Revision missing from " + location + ": " + revision);
}
protected static void checkRevisions(InternalRepository repository, long timeStamp, int expectedRevisions) throws InterruptedException
{
long lastCommitTimeStamp = repository.getLastCommitTimeStamp();
while (lastCommitTimeStamp < timeStamp)
{
lastCommitTimeStamp = repository.waitForCommit(DEFAULT_TIMEOUT);
}
InternalStore store = repository.getStore();
if (store instanceof MEMStore)
{
Map<CDOBranch, List<CDORevision>> allRevisions = ((MEMStore)store).getAllRevisions();
System.out.println("\n\n\n\n\n\n\n\n\n\n" + CDORevisionUtil.dumpAllRevisions(allRevisions));
List<CDORevision> revisions = allRevisions.get(repository.getBranchManager().getMainBranch());
assertEquals(expectedRevisions, revisions.size());
}
}
protected static void waitForOnline(CDOCommonRepository repository)
{
String what = repository.getName() + " becoming online";
while (repository.getState() != CDOCommonRepository.State.ONLINE)
{
waitFor(what);
}
if (VERBOSE_WAIT)
{
IOUtil.OUT().println(repository.getName() + " is online.");
}
}
protected static void waitForOffline(CDOCommonRepository repository)
{
String what = repository.getName() + " becoming offline";
while (repository.getState() == CDOCommonRepository.State.ONLINE)
{
waitFor(what);
}
if (VERBOSE_WAIT)
{
IOUtil.OUT().println(repository.getName() + " is offline.");
}
}
protected static CDOCommitInfo commitAndWaitForArrival(CDOTransaction transaction, CDOSession receiver)
{
try
{
CDOCommitInfo commitInfo = transaction.commit();
long timeStamp = commitInfo.getTimeStamp();
while (receiver.getLastUpdateTime() < timeStamp)
{
waitFor("arrival of commit " + CDOCommonUtil.formatTimeStamp(timeStamp));
}
return commitInfo;
}
catch (CommitException ex)
{
throw WrappedException.wrap(ex);
}
}
protected static CDOCommitInfo commitAndWaitForArrival(CDOTransaction transaction, IRepository receiver)
{
try
{
CDOCommitInfo commitInfo = transaction.commit();
long timeStamp = commitInfo.getTimeStamp();
while (receiver.getLastCommitTimeStamp() < timeStamp)
{
waitFor("arrival of commit " + CDOCommonUtil.formatTimeStamp(timeStamp));
}
return commitInfo;
}
catch (CommitException ex)
{
throw WrappedException.wrap(ex);
}
}
private static void waitFor(String what)
{
if (VERBOSE_WAIT)
{
IOUtil.OUT().println("Waiting for " + what + "...");
sleep(SLEEP_MILLIS);
}
else
{
ConcurrencyUtil.sleep(SLEEP_MILLIS);
}
}
protected static CDOTransaction openTransaction(CDOSession session)
{
CDOTransaction tx = session.openTransaction();
tx.options().setLockNotificationEnabled(true);
tx.enableDurableLocking();
return tx;
}
}