blob: 8be293f5d4dea02d2e1b5d833e92002fa9bd1d17 [file] [log] [blame]
/*
* Copyright (c) 2010-2015 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.emf.cdo.internal.server.syncing;
import org.eclipse.emf.cdo.CDOObject;
import org.eclipse.emf.cdo.common.CDOCommonRepository;
import org.eclipse.emf.cdo.common.CDOCommonSession.Options.LockNotificationMode;
import org.eclipse.emf.cdo.common.branch.CDOBranch;
import org.eclipse.emf.cdo.common.branch.CDOBranchPoint;
import org.eclipse.emf.cdo.common.branch.CDOBranchVersion;
import org.eclipse.emf.cdo.common.commit.CDOChangeKind;
import org.eclipse.emf.cdo.common.commit.CDOChangeSetData;
import org.eclipse.emf.cdo.common.commit.CDOCommitData;
import org.eclipse.emf.cdo.common.commit.CDOCommitInfo;
import org.eclipse.emf.cdo.common.commit.CDOCommitInfoHandler;
import org.eclipse.emf.cdo.common.id.CDOID;
import org.eclipse.emf.cdo.common.id.CDOIDUtil;
import org.eclipse.emf.cdo.common.lob.CDOLob;
import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo;
import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo.Operation;
import org.eclipse.emf.cdo.common.lock.CDOLockOwner;
import org.eclipse.emf.cdo.common.lock.CDOLockState;
import org.eclipse.emf.cdo.common.lock.CDOLockUtil;
import org.eclipse.emf.cdo.common.lock.IDurableLockingManager.LockArea;
import org.eclipse.emf.cdo.common.model.CDOPackageUnit;
import org.eclipse.emf.cdo.common.protocol.CDODataInput;
import org.eclipse.emf.cdo.common.protocol.CDOProtocol.CommitNotificationInfo;
import org.eclipse.emf.cdo.common.revision.CDOIDAndVersion;
import org.eclipse.emf.cdo.common.revision.CDORevision;
import org.eclipse.emf.cdo.common.revision.CDORevisionKey;
import org.eclipse.emf.cdo.common.revision.delta.CDORevisionDelta;
import org.eclipse.emf.cdo.common.util.CDOCommonUtil;
import org.eclipse.emf.cdo.common.util.CDOException;
import org.eclipse.emf.cdo.internal.common.commit.DelegatingCommitInfo;
import org.eclipse.emf.cdo.internal.common.revision.AbstractCDORevisionCache;
import org.eclipse.emf.cdo.internal.server.Repository;
import org.eclipse.emf.cdo.internal.server.TransactionCommitContext;
import org.eclipse.emf.cdo.server.IStoreAccessor;
import org.eclipse.emf.cdo.server.IStoreAccessor.CommitContext;
import org.eclipse.emf.cdo.server.ITransaction;
import org.eclipse.emf.cdo.server.StoreThreadLocal;
import org.eclipse.emf.cdo.spi.common.branch.CDOBranchAdjustable;
import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch;
import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager;
import org.eclipse.emf.cdo.spi.common.commit.CDOChangeKindCache;
import org.eclipse.emf.cdo.spi.common.commit.CDOCommitInfoUtil;
import org.eclipse.emf.cdo.spi.common.commit.InternalCDOCommitInfoManager;
import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionDelta;
import org.eclipse.emf.cdo.spi.server.InternalCommitContext;
import org.eclipse.emf.cdo.spi.server.InternalLockManager;
import org.eclipse.emf.cdo.spi.server.InternalRepository;
import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer;
import org.eclipse.emf.cdo.spi.server.InternalSession;
import org.eclipse.emf.cdo.spi.server.InternalSessionManager;
import org.eclipse.emf.cdo.spi.server.InternalStore;
import org.eclipse.emf.cdo.spi.server.InternalSynchronizableRepository;
import org.eclipse.emf.cdo.spi.server.InternalTransaction;
import org.eclipse.emf.cdo.spi.server.InternalView;
import org.eclipse.emf.cdo.spi.server.SyncingUtil;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.collection.IndexedList;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.monitor.Monitor;
import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.net4j.util.transaction.TransactionException;
import org.eclipse.emf.spi.cdo.CDOSessionProtocol;
import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult;
import org.eclipse.emf.spi.cdo.CDOSessionProtocol.LockObjectsResult;
import org.eclipse.emf.spi.cdo.CDOSessionProtocol.UnlockObjectsResult;
import org.eclipse.emf.spi.cdo.InternalCDOSession;
import org.eclipse.emf.spi.cdo.InternalCDOTransaction;
import org.eclipse.emf.spi.cdo.InternalCDOTransaction.InternalCDOCommitContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
/**
* TODO:
* <ul>
* <li>Handle new package units that had been committed during offline (testDisconnectAndCommitAndMergeWithNewPackages).
* <li>Make CDOIDs of new objects temporary when merging out of temp branch.
* <li>Provide custom branching strategies.
* <li>Consider non-auditing masters.
* <li>Test out-of-order commits.
* <li>Don't create branches table if branching not supported.
* <li>Implement raw replication for NUMERIC and DECIMAL.
* <li>Notify new branches during raw replication.
* </ul>
*
* @author Eike Stepper
*/
public abstract class SynchronizableRepository extends Repository.Default implements InternalSynchronizableRepository
{
protected static final CDOCommonRepository.Type MASTER = CDOCommonRepository.Type.MASTER;
protected static final CDOCommonRepository.Type BACKUP = CDOCommonRepository.Type.BACKUP;
protected static final CDOCommonRepository.Type CLONE = CDOCommonRepository.Type.CLONE;
protected static final CDOCommonRepository.State INITIAL = CDOCommonRepository.State.INITIAL;
protected static final CDOCommonRepository.State OFFLINE = CDOCommonRepository.State.OFFLINE;
protected static final CDOCommonRepository.State SYNCING = CDOCommonRepository.State.SYNCING;
protected static final CDOCommonRepository.State ONLINE = CDOCommonRepository.State.ONLINE;
private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.lastReplicatedBranchID"; //$NON-NLS-1$
private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.lastReplicatedCommitTime"; //$NON-NLS-1$
private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.gracefullyShutDown"; //$NON-NLS-1$
private InternalRepositorySynchronizer synchronizer;
private InternalSession replicatorSession;
private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID;
private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE;
private int lastTransactionID;
private ReadLock writeThroughCommitLock;
private WriteLock handleCommitInfoLock;
public SynchronizableRepository()
{
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
writeThroughCommitLock = rwLock.readLock();
handleCommitInfoLock = rwLock.writeLock();
}
public InternalRepositorySynchronizer getSynchronizer()
{
return synchronizer;
}
public void setSynchronizer(InternalRepositorySynchronizer synchronizer)
{
checkInactive();
this.synchronizer = synchronizer;
}
public InternalSession getReplicatorSession()
{
return replicatorSession;
}
@Override
public Object[] getElements()
{
List<Object> list = new ArrayList<Object>(Arrays.asList(super.getElements()));
list.add(synchronizer);
return list.toArray();
}
public boolean hasBeenReplicated()
{
return getLastReplicatedCommitTime() != CDOBranchPoint.UNSPECIFIED_DATE;
}
public int getLastReplicatedBranchID()
{
return lastReplicatedBranchID;
}
public void setLastReplicatedBranchID(int lastReplicatedBranchID)
{
if (this.lastReplicatedBranchID < lastReplicatedBranchID)
{
this.lastReplicatedBranchID = lastReplicatedBranchID;
}
}
public long getLastReplicatedCommitTime()
{
return lastReplicatedCommitTime;
}
public void setLastReplicatedCommitTime(long lastReplicatedCommitTime)
{
if (this.lastReplicatedCommitTime < lastReplicatedCommitTime)
{
this.lastReplicatedCommitTime = lastReplicatedCommitTime;
}
}
@Override
public void setLastCommitTimeStamp(long lastCommitTimeStamp)
{
super.setLastCommitTimeStamp(lastCommitTimeStamp);
if (getType() == MASTER)
{
// This MASTER might become a BACKUP, so don't replicate this commit in the future
setLastReplicatedCommitTime(lastCommitTimeStamp);
}
}
public String[] getLockAreaIDs()
{
try
{
StoreThreadLocal.setSession(replicatorSession);
final List<String> areaIDs = new LinkedList<String>();
getLockingManager().getLockAreas(null, new LockArea.Handler()
{
public boolean handleLockArea(LockArea area)
{
areaIDs.add(area.getDurableLockingID());
return true;
}
});
return areaIDs.toArray(new String[areaIDs.size()]);
}
finally
{
StoreThreadLocal.release();
}
}
public void handleBranch(CDOBranch branch)
{
if (branch.isLocal())
{
return;
}
int branchID = branch.getID();
String name = branch.getName();
CDOBranchPoint base = branch.getBase();
InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch();
long baseTimeStamp = base.getTimeStamp();
InternalCDOBranchManager branchManager = getBranchManager();
branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp);
setLastReplicatedBranchID(branchID);
}
public void handleCommitInfo(final CDOCommitInfo commitInfo)
{
CDOBranch branch = commitInfo.getBranch();
if (branch.isLocal())
{
return;
}
// Convert branches from remoteSession to localRepository
InternalCDOBranchManager newBranchManager = getBranchManager();
for (CDOIDAndVersion key : commitInfo.getNewObjects())
{
if (key instanceof CDOBranchAdjustable)
{
CDOBranchAdjustable branchAdjustable = (CDOBranchAdjustable)key;
branchAdjustable.adjustBranches(newBranchManager);
}
}
for (CDORevisionKey key : commitInfo.getChangedObjects())
{
if (key instanceof CDOBranchAdjustable)
{
CDOBranchAdjustable branchAdjustable = (CDOBranchAdjustable)key;
branchAdjustable.adjustBranches(newBranchManager);
}
}
for (CDOIDAndVersion key : commitInfo.getDetachedObjects())
{
if (key instanceof CDOBranchAdjustable)
{
CDOBranchAdjustable branchAdjustable = (CDOBranchAdjustable)key;
branchAdjustable.adjustBranches(newBranchManager);
}
}
final InternalCDOBranch newBranch = newBranchManager.getBranch(branch.getID());
CDOCommitInfo newCommitInfo = new DelegatingCommitInfo()
{
@Override
protected CDOCommitInfo getDelegate()
{
return commitInfo;
}
@Override
public CDOBranch getBranch()
{
return newBranch;
}
};
long timeStamp = newCommitInfo.getTimeStamp();
CDOBranchPoint head = newBranch.getHead();
InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head);
ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, newCommitInfo);
commitContext.preWrite();
boolean success = false;
try
{
handleCommitInfoLock.lock();
commitContext.write(new Monitor());
commitContext.commit(new Monitor());
setLastCommitTimeStamp(timeStamp);
setLastReplicatedCommitTime(timeStamp);
success = true;
}
finally
{
handleCommitInfoLock.unlock();
commitContext.postCommit(success);
transaction.close();
}
}
public void handleLockChangeInfo(CDOLockChangeInfo lockChangeInfo)
{
CDOLockOwner owner = lockChangeInfo.getLockOwner();
if (owner == null)
{
return;
}
String durableLockingID = owner.getDurableLockingID();
CDOBranch viewedBranch = lockChangeInfo.getBranch();
InternalLockManager lockManager = getLockingManager();
LockType lockType = lockChangeInfo.getLockType();
InternalView view = null;
try
{
view = SyncingUtil.openViewWithLockArea(replicatorSession, lockManager, viewedBranch, durableLockingID);
List<Object> lockables = new LinkedList<Object>();
for (CDOLockState lockState : lockChangeInfo.getLockStates())
{
lockables.add(lockState.getLockedObject());
}
if (lockChangeInfo.getOperation() == Operation.LOCK)
{
// If we can't lock immediately, there's a conflict, which means we're in big
// trouble: somehow locks were obtained on the clone but not on the master. What to do?
// TODO (CD) Consider this problem further
long timeout = 0;
super.lock(view, lockType, lockables, null, false, timeout);
}
else if (lockChangeInfo.getOperation() == Operation.UNLOCK)
{
super.doUnlock(view, lockType, lockables, false);
}
else
{
throw new IllegalStateException("Unexpected: " + lockChangeInfo.getOperation());
}
}
finally
{
LifecycleUtil.deactivate(view);
}
}
public boolean handleLockArea(LockArea area)
{
try
{
StoreThreadLocal.setSession(replicatorSession);
getLockingManager().updateLockArea(area);
getSessionManager().sendLockNotification(null, CDOLockUtil.createLockChangeInfo());
return true;
}
finally
{
StoreThreadLocal.release();
}
}
/**
* Called by ReplicateRepositoryRawRequest.confirming().
*/
public void replicateRaw(CDODataInput in, OMMonitor monitor) throws IOException
{
try
{
long previousCommitTime = getLastCommitTimeStamp();
int fromBranchID = lastReplicatedBranchID + 1;
int toBranchID = in.readInt();
long fromCommitTime = lastReplicatedCommitTime + 1L;
long toCommitTime = in.readLong();
StoreThreadLocal.setSession(replicatorSession);
IStoreAccessor.Raw accessor = (IStoreAccessor.Raw)StoreThreadLocal.getAccessor();
accessor.rawImport(in, fromBranchID, toBranchID, fromCommitTime, toCommitTime, monitor);
replicateRawReviseRevisions();
replicateRawReloadLocks();
replicateRawNotifyClients(lastReplicatedCommitTime, toCommitTime, previousCommitTime);
setLastReplicatedBranchID(toBranchID);
setLastReplicatedCommitTime(toCommitTime);
setLastCommitTimeStamp(toCommitTime);
}
finally
{
StoreThreadLocal.release();
}
}
public void goOnline()
{
if (getState() == OFFLINE)
{
LifecycleUtil.activate(synchronizer);
// Do not set the state to ONLINE yet; the synchronizer will set it to SYNCING first,
// and then to ONLINE after a succesful replication.
}
}
public void goOffline()
{
if (getState() != OFFLINE)
{
LifecycleUtil.deactivate(synchronizer);
setState(OFFLINE);
}
}
private void replicateRawReviseRevisions()
{
InternalCDORevisionCache cache = getRevisionManager().getCache();
for (CDORevision revision : cache.getCurrentRevisions())
{
cache.removeRevision(revision.getID(), revision);
}
}
private void replicateRawReloadLocks()
{
getLockingManager().reloadLocks();
}
private void replicateRawNotifyClients(long fromCommitTime, long toCommitTime, long previousCommitTime)
{
InternalCDOCommitInfoManager manager = getCommitInfoManager();
InternalSessionManager sessionManager = getSessionManager();
Map<CDOBranch, TimeRange> branches = replicateRawGetBranches(fromCommitTime, toCommitTime);
for (Entry<CDOBranch, TimeRange> entry : branches.entrySet())
{
CDOBranch branch = entry.getKey();
TimeRange range = entry.getValue();
fromCommitTime = range.getTime1();
toCommitTime = range.getTime2();
CDOBranchPoint startPoint = branch.getPoint(fromCommitTime);
CDOBranchPoint endPoint = branch.getPoint(toCommitTime);
CDOChangeSetData changeSet = getChangeSet(startPoint, endPoint);
List<CDOPackageUnit> newPackages = Collections.emptyList(); // TODO Notify about new packages
List<CDOIDAndVersion> newObjects = changeSet.getNewObjects();
List<CDORevisionKey> changedObjects = changeSet.getChangedObjects();
List<CDOIDAndVersion> detachedObjects = changeSet.getDetachedObjects();
CDOCommitData data = CDOCommitInfoUtil.createCommitData(newPackages, newObjects, changedObjects, detachedObjects);
String comment = "<replicate raw commits>"; //$NON-NLS-1$
final CDOCommitInfo commitInfo = manager.createCommitInfo( //
branch, toCommitTime, previousCommitTime, SYSTEM_USER_ID, comment, data);
CommitNotificationInfo info = new CommitNotificationInfo();
info.setSender(replicatorSession);
info.setCommitInfo(commitInfo);
info.setClearResourcePathCache(true);
sessionManager.sendCommitNotification(info);
}
CDOLockChangeInfo lockChangeInfo = CDOLockUtil.createLockChangeInfo();
sessionManager.sendLockNotification(replicatorSession, lockChangeInfo);
}
private Map<CDOBranch, TimeRange> replicateRawGetBranches(long fromCommitTime, long toCommitTime)
{
final Map<CDOBranch, TimeRange> branches = new HashMap<CDOBranch, TimeRange>();
CDOCommitInfoHandler handler = new CDOCommitInfoHandler()
{
public void handleCommitInfo(CDOCommitInfo commitInfo)
{
CDOBranch branch = commitInfo.getBranch();
long timeStamp = commitInfo.getTimeStamp();
TimeRange range = branches.get(branch);
if (range == null)
{
branches.put(branch, new TimeRange(timeStamp));
}
else
{
range.update(timeStamp);
}
}
};
getCommitInfoManager().getCommitInfos(null, fromCommitTime, toCommitTime, handler);
return branches;
}
@Override
public void notifyWriteAccessHandlers(ITransaction transaction, CommitContext commitContext, boolean beforeCommit,
OMMonitor monitor)
{
if (beforeCommit && commitContext.getNewPackageUnits().length != 0)
{
throw new IllegalStateException(
"Synchronizable repositories don't support dynamic addition of new packages. Use IRepository.setInitialPackages() instead.");
}
super.notifyWriteAccessHandlers(transaction, commitContext, beforeCommit, monitor);
}
@Override
public abstract InternalCommitContext createCommitContext(InternalTransaction transaction);
protected InternalCommitContext createNormalCommitContext(InternalTransaction transaction)
{
return super.createCommitContext(transaction);
}
protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction)
{
return new WriteThroughCommitContext(transaction);
}
@Override
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
checkState(synchronizer, "synchronizer"); //$NON-NLS-1$
}
@Override
protected void doActivate() throws Exception
{
super.doActivate();
InternalCDORevisionCache cache = getRevisionManager().getCache();
if (cache instanceof AbstractCDORevisionCache)
{
// Enable branch checks to ensure that no branches from the replicator session are used
((AbstractCDORevisionCache)cache).setBranchManager(getBranchManager());
}
InternalStore store = getStore();
if (!store.isFirstStart())
{
Map<String, String> map = store.getPersistentProperties(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN));
if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN))
{
setReplicationCountersToLatest();
}
else
{
Set<String> names = new HashSet<String>();
names.add(PROP_LAST_REPLICATED_BRANCH_ID);
names.add(PROP_LAST_REPLICATED_COMMIT_TIME);
map = store.getPersistentProperties(names);
setLastReplicatedBranchID(Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID)));
setLastReplicatedCommitTime(Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME)));
}
}
store.removePersistentProperties(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN));
Type type = getType();
if (type == MASTER)
{
setState(ONLINE);
}
else
{
if (hasBeenReplicated())
{
setState(OFFLINE);
}
else if (type == BACKUP)
{
if (getLastReplicatedCommitTime() == CDOBranchPoint.UNSPECIFIED_DATE)
{
boolean usedToBeMaster = getRootResourceID() != null;
if (usedToBeMaster)
{
setLastReplicatedCommitTime(getLastCommitTimeStamp());
}
}
}
startSynchronization();
}
}
@Override
protected void doDeactivate() throws Exception
{
stopSynchronization();
Map<String, String> map = new HashMap<String, String>();
map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID));
map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime));
map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString());
InternalStore store = getStore();
store.setPersistentProperties(map);
super.doDeactivate();
}
protected void startSynchronization()
{
replicatorSession = getSessionManager().openSession(null);
replicatorSession.options().setPassiveUpdateEnabled(false);
replicatorSession.options().setLockNotificationMode(LockNotificationMode.OFF);
synchronizer.setLocalRepository(this);
synchronizer.activate();
}
protected void stopSynchronization()
{
if (synchronizer != null)
{
synchronizer.deactivate();
}
}
@Override
protected void setPostActivateState()
{
// Do nothing (keep INITIAL)
}
protected void setReplicationCountersToLatest()
{
setLastReplicatedBranchID(getStore().getLastBranchID());
setLastReplicatedCommitTime(getStore().getLastNonLocalCommitTime());
}
@Override
protected void initRootResource()
{
// Non-MASTER repositories must wait for the first replication to receive their root resource ID
if (getType() == MASTER)
{
super.initRootResource();
}
}
@Override
public LockObjectsResult lock(InternalView view, LockType lockType, List<CDORevisionKey> revisionKeys,
boolean recursive, long timeout)
{
if (view.getBranch().isLocal())
{
return super.lock(view, lockType, revisionKeys, recursive, timeout);
}
if (getState() != ONLINE)
{
throw new CDOException("Cannot lock in a non-local branch when clone is not connected to master");
}
return lockThrough(view, lockType, revisionKeys, false, timeout);
}
private LockObjectsResult lockOnMaster(InternalView view, LockType type, List<CDORevisionKey> revKeys,
boolean recursive, long timeout) throws InterruptedException
{
// Delegate locking to the master
InternalCDOSession remoteSession = getSynchronizer().getRemoteSession();
CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol();
String areaID = view.getDurableLockingID();
if (areaID == null)
{
throw new IllegalStateException("Durable locking is not enabled for view " + view);
}
LockObjectsResult masterLockingResult = sessionProtocol.delegateLockObjects(areaID, revKeys, view.getBranch(), type,
recursive, timeout);
if (masterLockingResult.isSuccessful() && masterLockingResult.isWaitForUpdate())
{
if (!getSynchronizer().getRemoteSession().options().isPassiveUpdateEnabled())
{
throw new AssertionError(
"Master lock result requires clone to wait, but clone does not have passiveUpdates enabled.");
}
long requiredTimestamp = masterLockingResult.getRequiredTimestamp();
remoteSession.waitForUpdate(requiredTimestamp);
}
return masterLockingResult;
}
private LockObjectsResult lockThrough(InternalView view, LockType type, List<CDORevisionKey> keys, boolean recursive,
long timeout)
{
try
{
LockObjectsResult masterLockingResult = lockOnMaster(view, type, keys, recursive, timeout);
if (!masterLockingResult.isSuccessful())
{
return masterLockingResult;
}
LockObjectsResult localLockingResult = super.lock(view, type, keys, recursive, timeout);
return localLockingResult;
}
catch (InterruptedException ex)
{
throw WrappedException.wrap(ex);
}
}
@Override
public UnlockObjectsResult unlock(InternalView view, LockType lockType, List<CDOID> objectIDs, boolean recursive)
{
if (view.getBranch().isLocal())
{
super.unlock(view, lockType, objectIDs, recursive);
}
if (getState() != ONLINE)
{
throw new CDOException("Cannot unlock in a non-local branch when clone is not connected to master");
}
return unlockThrough(view, lockType, objectIDs, recursive);
}
private void unlockOnMaster(InternalView view, LockType lockType, List<CDOID> objectIDs, boolean recursive)
{
InternalCDOSession remoteSession = getSynchronizer().getRemoteSession();
CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol();
String lockAreaID = view.getDurableLockingID();
if (lockAreaID == null)
{
throw new IllegalStateException("Durable locking is not enabled for view " + view);
}
sessionProtocol.delegateUnlockObjects(lockAreaID, objectIDs, lockType, recursive);
}
private UnlockObjectsResult unlockThrough(InternalView view, LockType lockType, List<CDOID> objectIDs,
boolean recursive)
{
unlockOnMaster(view, lockType, objectIDs, recursive);
return super.unlock(view, lockType, objectIDs, recursive);
}
/**
* @author Eike Stepper
*/
private static final class TimeRange
{
private long time1;
private long time2;
public TimeRange(long time)
{
time1 = time;
time2 = time;
}
public void update(long time)
{
if (time < time1)
{
time1 = time;
}
if (time > time2)
{
time2 = time;
}
}
public long getTime1()
{
return time1;
}
public long getTime2()
{
return time2;
}
@Override
public String toString()
{
return "[" + CDOCommonUtil.formatTimeStamp(time1) + " - " + CDOCommonUtil.formatTimeStamp(time1) + "]";
}
}
/**
* @author Eike Stepper
*/
protected static final class CommitContextData implements CDOCommitData
{
private InternalCommitContext commitContext;
private CDOChangeKindCache changeKindCache;
public CommitContextData(InternalCommitContext commitContext)
{
this.commitContext = commitContext;
}
public boolean isEmpty()
{
return false;
}
public CDOChangeSetData copy()
{
throw new UnsupportedOperationException();
}
public void merge(CDOChangeSetData changeSetData)
{
throw new UnsupportedOperationException();
}
public List<CDOPackageUnit> getNewPackageUnits()
{
final InternalCDOPackageUnit[] newPackageUnits = commitContext.getNewPackageUnits();
return new IndexedList<CDOPackageUnit>()
{
@Override
public CDOPackageUnit get(int index)
{
return newPackageUnits[index];
}
@Override
public int size()
{
return newPackageUnits.length;
}
};
}
public List<CDOIDAndVersion> getNewObjects()
{
final InternalCDORevision[] newObjects = commitContext.getNewObjects();
return new IndexedList<CDOIDAndVersion>()
{
@Override
public CDOIDAndVersion get(int index)
{
return newObjects[index];
}
@Override
public int size()
{
return newObjects.length;
}
};
}
public List<CDORevisionKey> getChangedObjects()
{
final InternalCDORevisionDelta[] changedObjects = commitContext.getDirtyObjectDeltas();
return new IndexedList<CDORevisionKey>()
{
@Override
public CDORevisionKey get(int index)
{
return changedObjects[index];
}
@Override
public int size()
{
return changedObjects.length;
}
};
}
public List<CDOIDAndVersion> getDetachedObjects()
{
final CDOID[] detachedObjects = commitContext.getDetachedObjects();
return new IndexedList<CDOIDAndVersion>()
{
@Override
public CDOIDAndVersion get(int index)
{
return CDOIDUtil.createIDAndVersion(detachedObjects[index], CDOBranchVersion.UNSPECIFIED_VERSION);
}
@Override
public int size()
{
return detachedObjects.length;
}
};
}
public synchronized Map<CDOID, CDOChangeKind> getChangeKinds()
{
if (changeKindCache == null)
{
changeKindCache = new CDOChangeKindCache(this);
}
return changeKindCache;
}
public CDOChangeKind getChangeKind(CDOID id)
{
return getChangeKinds().get(id);
}
}
/**
* @author Eike Stepper
*/
protected final class WriteThroughCommitContext extends TransactionCommitContext
{
private static final int ARTIFICIAL_VIEW_ID = 0;
private CommitTransactionResult result;
public WriteThroughCommitContext(InternalTransaction transaction)
{
super(transaction);
}
@Override
public void preWrite()
{
// Do nothing
}
@Override
public void write(OMMonitor monitor)
{
// Do nothing
}
@Override
public void commit(OMMonitor monitor)
{
// Prepare commit to the master
final CDOCommitData commitData = new CommitContextData(this);
InternalCDOCommitContext ctx = new InternalCDOCommitContext()
{
public boolean isPartialCommit()
{
return false;
}
public Map<CDOID, CDORevisionDelta> getRevisionDeltas()
{
throw new UnsupportedOperationException();
}
public List<CDOPackageUnit> getNewPackageUnits()
{
return commitData.getNewPackageUnits();
}
public Map<CDOID, CDOObject> getNewObjects()
{
throw new UnsupportedOperationException();
}
public Collection<CDOLockState> getLocksOnNewObjects()
{
CDOLockState[] locksOnNewObjectsArr = WriteThroughCommitContext.this.getLocksOnNewObjects();
Collection<CDOLockState> locksOnNewObjects = Arrays.asList(locksOnNewObjectsArr);
return locksOnNewObjects;
}
public Collection<CDOLob<?>> getLobs()
{
return Collections.emptySet(); // TODO (CD) Did we forget to support this earlier?
}
public Map<CDOID, CDOObject> getDirtyObjects()
{
throw new UnsupportedOperationException();
}
public Map<CDOID, CDOObject> getDetachedObjects()
{
throw new UnsupportedOperationException();
}
public void preCommit()
{
throw new UnsupportedOperationException();
}
public void postCommit(CommitTransactionResult result)
{
throw new UnsupportedOperationException();
}
public InternalCDOTransaction getTransaction()
{
return null;
}
public CDOCommitData getCommitData()
{
return commitData;
}
public int getViewID()
{
return ARTIFICIAL_VIEW_ID;
}
public String getUserID()
{
return WriteThroughCommitContext.this.getUserID();
}
public boolean isAutoReleaseLocks()
{
return WriteThroughCommitContext.this.isAutoReleaseLocksEnabled();
}
public String getCommitComment()
{
return WriteThroughCommitContext.this.getCommitComment();
}
public CDOBranch getBranch()
{
return WriteThroughCommitContext.this.getTransaction().getBranch();
}
};
// Delegate commit to the master
CDOSessionProtocol sessionProtocol = getSynchronizer().getRemoteSession().getSessionProtocol();
result = sessionProtocol.commitDelegation(ctx, monitor);
// Stop if commit to master failed
String rollbackMessage = result.getRollbackMessage();
if (rollbackMessage != null)
{
throw new TransactionException(rollbackMessage);
}
// Prepare data needed for commit result and commit notifications
long timeStamp = result.getTimeStamp(); // result is set to null later!
addIDMappings(result.getIDMappings());
applyIDMappings(new Monitor());
try
{
writeThroughCommitLock.lock();
// Commit to the local repository
super.preWrite();
super.write(new Monitor());
super.commit(new Monitor());
}
finally
{
writeThroughCommitLock.unlock();
}
// Remember commit time in the local repository
setLastCommitTimeStamp(timeStamp);
setLastReplicatedCommitTime(timeStamp);
// Remember commit time in the replicator session.
getSynchronizer().getRemoteSession().setLastUpdateTime(timeStamp);
}
@Override
protected long[] createTimeStamp(OMMonitor monitor)
{
long timeStamp = result.getTimeStamp();
long previousTimeStamp = result.getPreviousTimeStamp();
result = null;
InternalRepository repository = getTransaction().getSession().getManager().getRepository();
repository.forceCommitTimeStamp(timeStamp, monitor);
return new long[] { timeStamp, previousTimeStamp };
}
@Override
protected void lockObjects() throws InterruptedException
{
// Do nothing
}
private void addIDMappings(Map<CDOID, CDOID> idMappings)
{
for (Map.Entry<CDOID, CDOID> idMapping : idMappings.entrySet())
{
CDOID oldID = idMapping.getKey();
CDOID newID = idMapping.getValue();
addIDMapping(oldID, newID);
}
}
}
}