| /* |
| * Copyright (c) 2010-2015 Eike Stepper (Berlin, Germany) and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Eike Stepper - initial API and implementation |
| */ |
| package org.eclipse.emf.cdo.internal.server.syncing; |
| |
| import org.eclipse.emf.cdo.CDOObject; |
| import org.eclipse.emf.cdo.common.CDOCommonRepository; |
| import org.eclipse.emf.cdo.common.CDOCommonSession.Options.LockNotificationMode; |
| import org.eclipse.emf.cdo.common.branch.CDOBranch; |
| import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; |
| import org.eclipse.emf.cdo.common.branch.CDOBranchVersion; |
| import org.eclipse.emf.cdo.common.commit.CDOChangeKind; |
| import org.eclipse.emf.cdo.common.commit.CDOChangeSetData; |
| import org.eclipse.emf.cdo.common.commit.CDOCommitData; |
| import org.eclipse.emf.cdo.common.commit.CDOCommitInfo; |
| import org.eclipse.emf.cdo.common.commit.CDOCommitInfoHandler; |
| import org.eclipse.emf.cdo.common.id.CDOID; |
| import org.eclipse.emf.cdo.common.id.CDOIDUtil; |
| import org.eclipse.emf.cdo.common.lob.CDOLob; |
| import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo; |
| import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo.Operation; |
| import org.eclipse.emf.cdo.common.lock.CDOLockOwner; |
| import org.eclipse.emf.cdo.common.lock.CDOLockState; |
| import org.eclipse.emf.cdo.common.lock.CDOLockUtil; |
| import org.eclipse.emf.cdo.common.lock.IDurableLockingManager.LockArea; |
| import org.eclipse.emf.cdo.common.model.CDOPackageUnit; |
| import org.eclipse.emf.cdo.common.protocol.CDODataInput; |
| import org.eclipse.emf.cdo.common.protocol.CDOProtocol.CommitNotificationInfo; |
| import org.eclipse.emf.cdo.common.revision.CDOIDAndVersion; |
| import org.eclipse.emf.cdo.common.revision.CDORevision; |
| import org.eclipse.emf.cdo.common.revision.CDORevisionKey; |
| import org.eclipse.emf.cdo.common.revision.delta.CDORevisionDelta; |
| import org.eclipse.emf.cdo.common.util.CDOCommonUtil; |
| import org.eclipse.emf.cdo.common.util.CDOException; |
| import org.eclipse.emf.cdo.internal.common.commit.DelegatingCommitInfo; |
| import org.eclipse.emf.cdo.internal.common.revision.AbstractCDORevisionCache; |
| import org.eclipse.emf.cdo.internal.server.Repository; |
| import org.eclipse.emf.cdo.internal.server.TransactionCommitContext; |
| import org.eclipse.emf.cdo.server.IStoreAccessor; |
| import org.eclipse.emf.cdo.server.IStoreAccessor.CommitContext; |
| import org.eclipse.emf.cdo.server.ITransaction; |
| import org.eclipse.emf.cdo.server.StoreThreadLocal; |
| import org.eclipse.emf.cdo.spi.common.branch.CDOBranchAdjustable; |
| import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch; |
| import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager; |
| import org.eclipse.emf.cdo.spi.common.commit.CDOChangeKindCache; |
| import org.eclipse.emf.cdo.spi.common.commit.CDOCommitInfoUtil; |
| import org.eclipse.emf.cdo.spi.common.commit.InternalCDOCommitInfoManager; |
| import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit; |
| import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; |
| import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache; |
| import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionDelta; |
| import org.eclipse.emf.cdo.spi.server.InternalCommitContext; |
| import org.eclipse.emf.cdo.spi.server.InternalLockManager; |
| import org.eclipse.emf.cdo.spi.server.InternalRepository; |
| import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer; |
| import org.eclipse.emf.cdo.spi.server.InternalSession; |
| import org.eclipse.emf.cdo.spi.server.InternalSessionManager; |
| import org.eclipse.emf.cdo.spi.server.InternalStore; |
| import org.eclipse.emf.cdo.spi.server.InternalSynchronizableRepository; |
| import org.eclipse.emf.cdo.spi.server.InternalTransaction; |
| import org.eclipse.emf.cdo.spi.server.InternalView; |
| import org.eclipse.emf.cdo.spi.server.SyncingUtil; |
| |
| import org.eclipse.net4j.util.WrappedException; |
| import org.eclipse.net4j.util.collection.IndexedList; |
| import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; |
| import org.eclipse.net4j.util.lifecycle.LifecycleUtil; |
| import org.eclipse.net4j.util.om.monitor.Monitor; |
| import org.eclipse.net4j.util.om.monitor.OMMonitor; |
| import org.eclipse.net4j.util.transaction.TransactionException; |
| |
| import org.eclipse.emf.spi.cdo.CDOSessionProtocol; |
| import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult; |
| import org.eclipse.emf.spi.cdo.CDOSessionProtocol.LockObjectsResult; |
| import org.eclipse.emf.spi.cdo.CDOSessionProtocol.UnlockObjectsResult; |
| import org.eclipse.emf.spi.cdo.InternalCDOSession; |
| import org.eclipse.emf.spi.cdo.InternalCDOTransaction; |
| import org.eclipse.emf.spi.cdo.InternalCDOTransaction.InternalCDOCommitContext; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| /** |
| * TODO: |
| * <ul> |
| * <li>Handle new package units that had been committed during offline (testDisconnectAndCommitAndMergeWithNewPackages). |
| * <li>Make CDOIDs of new objects temporary when merging out of temp branch. |
| * <li>Provide custom branching strategies. |
| * <li>Consider non-auditing masters. |
| * <li>Test out-of-order commits. |
| * <li>Don't create branches table if branching not supported. |
| * <li>Implement raw replication for NUMERIC and DECIMAL. |
| * <li>Notify new branches during raw replication. |
| * </ul> |
| * |
| * @author Eike Stepper |
| */ |
| public abstract class SynchronizableRepository extends Repository.Default implements InternalSynchronizableRepository |
| { |
| protected static final CDOCommonRepository.Type MASTER = CDOCommonRepository.Type.MASTER; |
| |
| protected static final CDOCommonRepository.Type BACKUP = CDOCommonRepository.Type.BACKUP; |
| |
| protected static final CDOCommonRepository.Type CLONE = CDOCommonRepository.Type.CLONE; |
| |
| protected static final CDOCommonRepository.State INITIAL = CDOCommonRepository.State.INITIAL; |
| |
| protected static final CDOCommonRepository.State OFFLINE = CDOCommonRepository.State.OFFLINE; |
| |
| protected static final CDOCommonRepository.State SYNCING = CDOCommonRepository.State.SYNCING; |
| |
| protected static final CDOCommonRepository.State ONLINE = CDOCommonRepository.State.ONLINE; |
| |
| private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.lastReplicatedBranchID"; //$NON-NLS-1$ |
| |
| private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.lastReplicatedCommitTime"; //$NON-NLS-1$ |
| |
| private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.gracefullyShutDown"; //$NON-NLS-1$ |
| |
| private InternalRepositorySynchronizer synchronizer; |
| |
| private InternalSession replicatorSession; |
| |
| private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID; |
| |
| private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE; |
| |
| private int lastTransactionID; |
| |
| private ReadLock writeThroughCommitLock; |
| |
| private WriteLock handleCommitInfoLock; |
| |
| public SynchronizableRepository() |
| { |
| ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); |
| writeThroughCommitLock = rwLock.readLock(); |
| handleCommitInfoLock = rwLock.writeLock(); |
| } |
| |
| public InternalRepositorySynchronizer getSynchronizer() |
| { |
| return synchronizer; |
| } |
| |
| public void setSynchronizer(InternalRepositorySynchronizer synchronizer) |
| { |
| checkInactive(); |
| this.synchronizer = synchronizer; |
| } |
| |
| public InternalSession getReplicatorSession() |
| { |
| return replicatorSession; |
| } |
| |
| @Override |
| public Object[] getElements() |
| { |
| List<Object> list = new ArrayList<Object>(Arrays.asList(super.getElements())); |
| list.add(synchronizer); |
| return list.toArray(); |
| } |
| |
| public boolean hasBeenReplicated() |
| { |
| return getLastReplicatedCommitTime() != CDOBranchPoint.UNSPECIFIED_DATE; |
| } |
| |
| public int getLastReplicatedBranchID() |
| { |
| return lastReplicatedBranchID; |
| } |
| |
| public void setLastReplicatedBranchID(int lastReplicatedBranchID) |
| { |
| if (this.lastReplicatedBranchID < lastReplicatedBranchID) |
| { |
| this.lastReplicatedBranchID = lastReplicatedBranchID; |
| } |
| } |
| |
| public long getLastReplicatedCommitTime() |
| { |
| return lastReplicatedCommitTime; |
| } |
| |
| public void setLastReplicatedCommitTime(long lastReplicatedCommitTime) |
| { |
| if (this.lastReplicatedCommitTime < lastReplicatedCommitTime) |
| { |
| this.lastReplicatedCommitTime = lastReplicatedCommitTime; |
| } |
| } |
| |
| @Override |
| public void setLastCommitTimeStamp(long lastCommitTimeStamp) |
| { |
| super.setLastCommitTimeStamp(lastCommitTimeStamp); |
| |
| if (getType() == MASTER) |
| { |
| // This MASTER might become a BACKUP, so don't replicate this commit in the future |
| setLastReplicatedCommitTime(lastCommitTimeStamp); |
| } |
| } |
| |
| public String[] getLockAreaIDs() |
| { |
| try |
| { |
| StoreThreadLocal.setSession(replicatorSession); |
| final List<String> areaIDs = new LinkedList<String>(); |
| getLockingManager().getLockAreas(null, new LockArea.Handler() |
| { |
| public boolean handleLockArea(LockArea area) |
| { |
| areaIDs.add(area.getDurableLockingID()); |
| return true; |
| } |
| }); |
| return areaIDs.toArray(new String[areaIDs.size()]); |
| } |
| finally |
| { |
| StoreThreadLocal.release(); |
| } |
| } |
| |
| public void handleBranch(CDOBranch branch) |
| { |
| if (branch.isLocal()) |
| { |
| return; |
| } |
| |
| int branchID = branch.getID(); |
| String name = branch.getName(); |
| |
| CDOBranchPoint base = branch.getBase(); |
| InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch(); |
| long baseTimeStamp = base.getTimeStamp(); |
| |
| InternalCDOBranchManager branchManager = getBranchManager(); |
| branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp); |
| setLastReplicatedBranchID(branchID); |
| } |
| |
| public void handleCommitInfo(final CDOCommitInfo commitInfo) |
| { |
| CDOBranch branch = commitInfo.getBranch(); |
| if (branch.isLocal()) |
| { |
| return; |
| } |
| |
| // Convert branches from remoteSession to localRepository |
| InternalCDOBranchManager newBranchManager = getBranchManager(); |
| |
| for (CDOIDAndVersion key : commitInfo.getNewObjects()) |
| { |
| if (key instanceof CDOBranchAdjustable) |
| { |
| CDOBranchAdjustable branchAdjustable = (CDOBranchAdjustable)key; |
| branchAdjustable.adjustBranches(newBranchManager); |
| } |
| } |
| |
| for (CDORevisionKey key : commitInfo.getChangedObjects()) |
| { |
| if (key instanceof CDOBranchAdjustable) |
| { |
| CDOBranchAdjustable branchAdjustable = (CDOBranchAdjustable)key; |
| branchAdjustable.adjustBranches(newBranchManager); |
| } |
| } |
| |
| for (CDOIDAndVersion key : commitInfo.getDetachedObjects()) |
| { |
| if (key instanceof CDOBranchAdjustable) |
| { |
| CDOBranchAdjustable branchAdjustable = (CDOBranchAdjustable)key; |
| branchAdjustable.adjustBranches(newBranchManager); |
| } |
| } |
| |
| final InternalCDOBranch newBranch = newBranchManager.getBranch(branch.getID()); |
| CDOCommitInfo newCommitInfo = new DelegatingCommitInfo() |
| { |
| @Override |
| protected CDOCommitInfo getDelegate() |
| { |
| return commitInfo; |
| } |
| |
| @Override |
| public CDOBranch getBranch() |
| { |
| return newBranch; |
| } |
| }; |
| |
| long timeStamp = newCommitInfo.getTimeStamp(); |
| CDOBranchPoint head = newBranch.getHead(); |
| |
| InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head); |
| ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, newCommitInfo); |
| commitContext.preWrite(); |
| boolean success = false; |
| |
| try |
| { |
| handleCommitInfoLock.lock(); |
| |
| commitContext.write(new Monitor()); |
| commitContext.commit(new Monitor()); |
| |
| setLastCommitTimeStamp(timeStamp); |
| setLastReplicatedCommitTime(timeStamp); |
| success = true; |
| } |
| finally |
| { |
| handleCommitInfoLock.unlock(); |
| commitContext.postCommit(success); |
| transaction.close(); |
| } |
| } |
| |
| public void handleLockChangeInfo(CDOLockChangeInfo lockChangeInfo) |
| { |
| CDOLockOwner owner = lockChangeInfo.getLockOwner(); |
| if (owner == null) |
| { |
| return; |
| } |
| |
| String durableLockingID = owner.getDurableLockingID(); |
| CDOBranch viewedBranch = lockChangeInfo.getBranch(); |
| InternalLockManager lockManager = getLockingManager(); |
| LockType lockType = lockChangeInfo.getLockType(); |
| |
| InternalView view = null; |
| |
| try |
| { |
| view = SyncingUtil.openViewWithLockArea(replicatorSession, lockManager, viewedBranch, durableLockingID); |
| List<Object> lockables = new LinkedList<Object>(); |
| |
| for (CDOLockState lockState : lockChangeInfo.getLockStates()) |
| { |
| lockables.add(lockState.getLockedObject()); |
| } |
| |
| if (lockChangeInfo.getOperation() == Operation.LOCK) |
| { |
| // If we can't lock immediately, there's a conflict, which means we're in big |
| // trouble: somehow locks were obtained on the clone but not on the master. What to do? |
| // TODO (CD) Consider this problem further |
| long timeout = 0; |
| |
| super.lock(view, lockType, lockables, null, false, timeout); |
| } |
| else if (lockChangeInfo.getOperation() == Operation.UNLOCK) |
| { |
| super.doUnlock(view, lockType, lockables, false); |
| } |
| else |
| { |
| throw new IllegalStateException("Unexpected: " + lockChangeInfo.getOperation()); |
| } |
| } |
| finally |
| { |
| LifecycleUtil.deactivate(view); |
| } |
| } |
| |
| public boolean handleLockArea(LockArea area) |
| { |
| try |
| { |
| StoreThreadLocal.setSession(replicatorSession); |
| getLockingManager().updateLockArea(area); |
| |
| getSessionManager().sendLockNotification(null, CDOLockUtil.createLockChangeInfo()); |
| return true; |
| } |
| finally |
| { |
| StoreThreadLocal.release(); |
| } |
| } |
| |
| /** |
| * Called by ReplicateRepositoryRawRequest.confirming(). |
| */ |
| public void replicateRaw(CDODataInput in, OMMonitor monitor) throws IOException |
| { |
| try |
| { |
| long previousCommitTime = getLastCommitTimeStamp(); |
| |
| int fromBranchID = lastReplicatedBranchID + 1; |
| int toBranchID = in.readInt(); |
| long fromCommitTime = lastReplicatedCommitTime + 1L; |
| long toCommitTime = in.readLong(); |
| |
| StoreThreadLocal.setSession(replicatorSession); |
| IStoreAccessor.Raw accessor = (IStoreAccessor.Raw)StoreThreadLocal.getAccessor(); |
| accessor.rawImport(in, fromBranchID, toBranchID, fromCommitTime, toCommitTime, monitor); |
| |
| replicateRawReviseRevisions(); |
| replicateRawReloadLocks(); |
| replicateRawNotifyClients(lastReplicatedCommitTime, toCommitTime, previousCommitTime); |
| |
| setLastReplicatedBranchID(toBranchID); |
| setLastReplicatedCommitTime(toCommitTime); |
| setLastCommitTimeStamp(toCommitTime); |
| } |
| finally |
| { |
| StoreThreadLocal.release(); |
| } |
| } |
| |
| public void goOnline() |
| { |
| if (getState() == OFFLINE) |
| { |
| LifecycleUtil.activate(synchronizer); |
| // Do not set the state to ONLINE yet; the synchronizer will set it to SYNCING first, |
| // and then to ONLINE after a succesful replication. |
| } |
| } |
| |
| public void goOffline() |
| { |
| if (getState() != OFFLINE) |
| { |
| LifecycleUtil.deactivate(synchronizer); |
| setState(OFFLINE); |
| } |
| } |
| |
| private void replicateRawReviseRevisions() |
| { |
| InternalCDORevisionCache cache = getRevisionManager().getCache(); |
| for (CDORevision revision : cache.getCurrentRevisions()) |
| { |
| cache.removeRevision(revision.getID(), revision); |
| } |
| } |
| |
| private void replicateRawReloadLocks() |
| { |
| getLockingManager().reloadLocks(); |
| } |
| |
| private void replicateRawNotifyClients(long fromCommitTime, long toCommitTime, long previousCommitTime) |
| { |
| InternalCDOCommitInfoManager manager = getCommitInfoManager(); |
| InternalSessionManager sessionManager = getSessionManager(); |
| |
| Map<CDOBranch, TimeRange> branches = replicateRawGetBranches(fromCommitTime, toCommitTime); |
| for (Entry<CDOBranch, TimeRange> entry : branches.entrySet()) |
| { |
| CDOBranch branch = entry.getKey(); |
| TimeRange range = entry.getValue(); |
| fromCommitTime = range.getTime1(); |
| toCommitTime = range.getTime2(); |
| |
| CDOBranchPoint startPoint = branch.getPoint(fromCommitTime); |
| CDOBranchPoint endPoint = branch.getPoint(toCommitTime); |
| CDOChangeSetData changeSet = getChangeSet(startPoint, endPoint); |
| |
| List<CDOPackageUnit> newPackages = Collections.emptyList(); // TODO Notify about new packages |
| List<CDOIDAndVersion> newObjects = changeSet.getNewObjects(); |
| List<CDORevisionKey> changedObjects = changeSet.getChangedObjects(); |
| List<CDOIDAndVersion> detachedObjects = changeSet.getDetachedObjects(); |
| |
| CDOCommitData data = CDOCommitInfoUtil.createCommitData(newPackages, newObjects, changedObjects, detachedObjects); |
| |
| String comment = "<replicate raw commits>"; //$NON-NLS-1$ |
| final CDOCommitInfo commitInfo = manager.createCommitInfo( // |
| branch, toCommitTime, previousCommitTime, SYSTEM_USER_ID, comment, data); |
| |
| CommitNotificationInfo info = new CommitNotificationInfo(); |
| info.setSender(replicatorSession); |
| info.setCommitInfo(commitInfo); |
| info.setClearResourcePathCache(true); |
| |
| sessionManager.sendCommitNotification(info); |
| } |
| |
| CDOLockChangeInfo lockChangeInfo = CDOLockUtil.createLockChangeInfo(); |
| sessionManager.sendLockNotification(replicatorSession, lockChangeInfo); |
| } |
| |
| private Map<CDOBranch, TimeRange> replicateRawGetBranches(long fromCommitTime, long toCommitTime) |
| { |
| final Map<CDOBranch, TimeRange> branches = new HashMap<CDOBranch, TimeRange>(); |
| CDOCommitInfoHandler handler = new CDOCommitInfoHandler() |
| { |
| public void handleCommitInfo(CDOCommitInfo commitInfo) |
| { |
| CDOBranch branch = commitInfo.getBranch(); |
| long timeStamp = commitInfo.getTimeStamp(); |
| TimeRange range = branches.get(branch); |
| if (range == null) |
| { |
| branches.put(branch, new TimeRange(timeStamp)); |
| } |
| else |
| { |
| range.update(timeStamp); |
| } |
| } |
| }; |
| |
| getCommitInfoManager().getCommitInfos(null, fromCommitTime, toCommitTime, handler); |
| return branches; |
| } |
| |
| @Override |
| public void notifyWriteAccessHandlers(ITransaction transaction, CommitContext commitContext, boolean beforeCommit, |
| OMMonitor monitor) |
| { |
| if (beforeCommit && commitContext.getNewPackageUnits().length != 0) |
| { |
| throw new IllegalStateException( |
| "Synchronizable repositories don't support dynamic addition of new packages. Use IRepository.setInitialPackages() instead."); |
| } |
| |
| super.notifyWriteAccessHandlers(transaction, commitContext, beforeCommit, monitor); |
| } |
| |
| @Override |
| public abstract InternalCommitContext createCommitContext(InternalTransaction transaction); |
| |
| protected InternalCommitContext createNormalCommitContext(InternalTransaction transaction) |
| { |
| return super.createCommitContext(transaction); |
| } |
| |
| protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction) |
| { |
| return new WriteThroughCommitContext(transaction); |
| } |
| |
| @Override |
| protected void doBeforeActivate() throws Exception |
| { |
| super.doBeforeActivate(); |
| checkState(synchronizer, "synchronizer"); //$NON-NLS-1$ |
| } |
| |
| @Override |
| protected void doActivate() throws Exception |
| { |
| super.doActivate(); |
| |
| InternalCDORevisionCache cache = getRevisionManager().getCache(); |
| if (cache instanceof AbstractCDORevisionCache) |
| { |
| // Enable branch checks to ensure that no branches from the replicator session are used |
| ((AbstractCDORevisionCache)cache).setBranchManager(getBranchManager()); |
| } |
| |
| InternalStore store = getStore(); |
| if (!store.isFirstStart()) |
| { |
| Map<String, String> map = store.getPersistentProperties(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); |
| if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN)) |
| { |
| setReplicationCountersToLatest(); |
| } |
| else |
| { |
| Set<String> names = new HashSet<String>(); |
| names.add(PROP_LAST_REPLICATED_BRANCH_ID); |
| names.add(PROP_LAST_REPLICATED_COMMIT_TIME); |
| |
| map = store.getPersistentProperties(names); |
| setLastReplicatedBranchID(Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID))); |
| setLastReplicatedCommitTime(Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME))); |
| } |
| } |
| |
| store.removePersistentProperties(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); |
| |
| Type type = getType(); |
| if (type == MASTER) |
| { |
| setState(ONLINE); |
| } |
| else |
| { |
| if (hasBeenReplicated()) |
| { |
| setState(OFFLINE); |
| } |
| else if (type == BACKUP) |
| { |
| if (getLastReplicatedCommitTime() == CDOBranchPoint.UNSPECIFIED_DATE) |
| { |
| boolean usedToBeMaster = getRootResourceID() != null; |
| if (usedToBeMaster) |
| { |
| setLastReplicatedCommitTime(getLastCommitTimeStamp()); |
| } |
| } |
| } |
| |
| startSynchronization(); |
| } |
| } |
| |
| @Override |
| protected void doDeactivate() throws Exception |
| { |
| stopSynchronization(); |
| |
| Map<String, String> map = new HashMap<String, String>(); |
| map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID)); |
| map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime)); |
| map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString()); |
| |
| InternalStore store = getStore(); |
| store.setPersistentProperties(map); |
| |
| super.doDeactivate(); |
| } |
| |
| protected void startSynchronization() |
| { |
| replicatorSession = getSessionManager().openSession(null); |
| replicatorSession.options().setPassiveUpdateEnabled(false); |
| replicatorSession.options().setLockNotificationMode(LockNotificationMode.OFF); |
| |
| synchronizer.setLocalRepository(this); |
| synchronizer.activate(); |
| } |
| |
| protected void stopSynchronization() |
| { |
| if (synchronizer != null) |
| { |
| synchronizer.deactivate(); |
| } |
| } |
| |
| @Override |
| protected void setPostActivateState() |
| { |
| // Do nothing (keep INITIAL) |
| } |
| |
| protected void setReplicationCountersToLatest() |
| { |
| setLastReplicatedBranchID(getStore().getLastBranchID()); |
| setLastReplicatedCommitTime(getStore().getLastNonLocalCommitTime()); |
| } |
| |
| @Override |
| protected void initRootResource() |
| { |
| // Non-MASTER repositories must wait for the first replication to receive their root resource ID |
| if (getType() == MASTER) |
| { |
| super.initRootResource(); |
| } |
| } |
| |
| @Override |
| public LockObjectsResult lock(InternalView view, LockType lockType, List<CDORevisionKey> revisionKeys, |
| boolean recursive, long timeout) |
| { |
| if (view.getBranch().isLocal()) |
| { |
| return super.lock(view, lockType, revisionKeys, recursive, timeout); |
| } |
| |
| if (getState() != ONLINE) |
| { |
| throw new CDOException("Cannot lock in a non-local branch when clone is not connected to master"); |
| } |
| |
| return lockThrough(view, lockType, revisionKeys, false, timeout); |
| } |
| |
| private LockObjectsResult lockOnMaster(InternalView view, LockType type, List<CDORevisionKey> revKeys, |
| boolean recursive, long timeout) throws InterruptedException |
| { |
| // Delegate locking to the master |
| InternalCDOSession remoteSession = getSynchronizer().getRemoteSession(); |
| CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol(); |
| |
| String areaID = view.getDurableLockingID(); |
| if (areaID == null) |
| { |
| throw new IllegalStateException("Durable locking is not enabled for view " + view); |
| } |
| |
| LockObjectsResult masterLockingResult = sessionProtocol.delegateLockObjects(areaID, revKeys, view.getBranch(), type, |
| recursive, timeout); |
| |
| if (masterLockingResult.isSuccessful() && masterLockingResult.isWaitForUpdate()) |
| { |
| if (!getSynchronizer().getRemoteSession().options().isPassiveUpdateEnabled()) |
| { |
| throw new AssertionError( |
| "Master lock result requires clone to wait, but clone does not have passiveUpdates enabled."); |
| } |
| |
| long requiredTimestamp = masterLockingResult.getRequiredTimestamp(); |
| remoteSession.waitForUpdate(requiredTimestamp); |
| } |
| |
| return masterLockingResult; |
| } |
| |
| private LockObjectsResult lockThrough(InternalView view, LockType type, List<CDORevisionKey> keys, boolean recursive, |
| long timeout) |
| { |
| try |
| { |
| LockObjectsResult masterLockingResult = lockOnMaster(view, type, keys, recursive, timeout); |
| if (!masterLockingResult.isSuccessful()) |
| { |
| return masterLockingResult; |
| } |
| |
| LockObjectsResult localLockingResult = super.lock(view, type, keys, recursive, timeout); |
| return localLockingResult; |
| } |
| catch (InterruptedException ex) |
| { |
| throw WrappedException.wrap(ex); |
| } |
| } |
| |
| @Override |
| public UnlockObjectsResult unlock(InternalView view, LockType lockType, List<CDOID> objectIDs, boolean recursive) |
| { |
| if (view.getBranch().isLocal()) |
| { |
| super.unlock(view, lockType, objectIDs, recursive); |
| } |
| |
| if (getState() != ONLINE) |
| { |
| throw new CDOException("Cannot unlock in a non-local branch when clone is not connected to master"); |
| } |
| |
| return unlockThrough(view, lockType, objectIDs, recursive); |
| } |
| |
| private void unlockOnMaster(InternalView view, LockType lockType, List<CDOID> objectIDs, boolean recursive) |
| { |
| InternalCDOSession remoteSession = getSynchronizer().getRemoteSession(); |
| CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol(); |
| |
| String lockAreaID = view.getDurableLockingID(); |
| if (lockAreaID == null) |
| { |
| throw new IllegalStateException("Durable locking is not enabled for view " + view); |
| } |
| |
| sessionProtocol.delegateUnlockObjects(lockAreaID, objectIDs, lockType, recursive); |
| } |
| |
| private UnlockObjectsResult unlockThrough(InternalView view, LockType lockType, List<CDOID> objectIDs, |
| boolean recursive) |
| { |
| unlockOnMaster(view, lockType, objectIDs, recursive); |
| return super.unlock(view, lockType, objectIDs, recursive); |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| private static final class TimeRange |
| { |
| private long time1; |
| |
| private long time2; |
| |
| public TimeRange(long time) |
| { |
| time1 = time; |
| time2 = time; |
| } |
| |
| public void update(long time) |
| { |
| if (time < time1) |
| { |
| time1 = time; |
| } |
| |
| if (time > time2) |
| { |
| time2 = time; |
| } |
| } |
| |
| public long getTime1() |
| { |
| return time1; |
| } |
| |
| public long getTime2() |
| { |
| return time2; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "[" + CDOCommonUtil.formatTimeStamp(time1) + " - " + CDOCommonUtil.formatTimeStamp(time1) + "]"; |
| } |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| protected static final class CommitContextData implements CDOCommitData |
| { |
| private InternalCommitContext commitContext; |
| |
| private CDOChangeKindCache changeKindCache; |
| |
| public CommitContextData(InternalCommitContext commitContext) |
| { |
| this.commitContext = commitContext; |
| } |
| |
| public boolean isEmpty() |
| { |
| return false; |
| } |
| |
| public CDOChangeSetData copy() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void merge(CDOChangeSetData changeSetData) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public List<CDOPackageUnit> getNewPackageUnits() |
| { |
| final InternalCDOPackageUnit[] newPackageUnits = commitContext.getNewPackageUnits(); |
| return new IndexedList<CDOPackageUnit>() |
| { |
| @Override |
| public CDOPackageUnit get(int index) |
| { |
| return newPackageUnits[index]; |
| } |
| |
| @Override |
| public int size() |
| { |
| return newPackageUnits.length; |
| } |
| }; |
| } |
| |
| public List<CDOIDAndVersion> getNewObjects() |
| { |
| final InternalCDORevision[] newObjects = commitContext.getNewObjects(); |
| return new IndexedList<CDOIDAndVersion>() |
| { |
| @Override |
| public CDOIDAndVersion get(int index) |
| { |
| return newObjects[index]; |
| } |
| |
| @Override |
| public int size() |
| { |
| return newObjects.length; |
| } |
| }; |
| } |
| |
| public List<CDORevisionKey> getChangedObjects() |
| { |
| final InternalCDORevisionDelta[] changedObjects = commitContext.getDirtyObjectDeltas(); |
| return new IndexedList<CDORevisionKey>() |
| { |
| @Override |
| public CDORevisionKey get(int index) |
| { |
| return changedObjects[index]; |
| } |
| |
| @Override |
| public int size() |
| { |
| return changedObjects.length; |
| } |
| }; |
| } |
| |
| public List<CDOIDAndVersion> getDetachedObjects() |
| { |
| final CDOID[] detachedObjects = commitContext.getDetachedObjects(); |
| return new IndexedList<CDOIDAndVersion>() |
| { |
| @Override |
| public CDOIDAndVersion get(int index) |
| { |
| return CDOIDUtil.createIDAndVersion(detachedObjects[index], CDOBranchVersion.UNSPECIFIED_VERSION); |
| } |
| |
| @Override |
| public int size() |
| { |
| return detachedObjects.length; |
| } |
| }; |
| } |
| |
| public synchronized Map<CDOID, CDOChangeKind> getChangeKinds() |
| { |
| if (changeKindCache == null) |
| { |
| changeKindCache = new CDOChangeKindCache(this); |
| } |
| |
| return changeKindCache; |
| } |
| |
| public CDOChangeKind getChangeKind(CDOID id) |
| { |
| return getChangeKinds().get(id); |
| } |
| } |
| |
| /** |
| * @author Eike Stepper |
| */ |
| protected final class WriteThroughCommitContext extends TransactionCommitContext |
| { |
| private static final int ARTIFICIAL_VIEW_ID = 0; |
| |
| private CommitTransactionResult result; |
| |
| public WriteThroughCommitContext(InternalTransaction transaction) |
| { |
| super(transaction); |
| } |
| |
| @Override |
| public void preWrite() |
| { |
| // Do nothing |
| } |
| |
| @Override |
| public void write(OMMonitor monitor) |
| { |
| // Do nothing |
| } |
| |
| @Override |
| public void commit(OMMonitor monitor) |
| { |
| // Prepare commit to the master |
| final CDOCommitData commitData = new CommitContextData(this); |
| |
| InternalCDOCommitContext ctx = new InternalCDOCommitContext() |
| { |
| public boolean isPartialCommit() |
| { |
| return false; |
| } |
| |
| public Map<CDOID, CDORevisionDelta> getRevisionDeltas() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public List<CDOPackageUnit> getNewPackageUnits() |
| { |
| return commitData.getNewPackageUnits(); |
| } |
| |
| public Map<CDOID, CDOObject> getNewObjects() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public Collection<CDOLockState> getLocksOnNewObjects() |
| { |
| CDOLockState[] locksOnNewObjectsArr = WriteThroughCommitContext.this.getLocksOnNewObjects(); |
| Collection<CDOLockState> locksOnNewObjects = Arrays.asList(locksOnNewObjectsArr); |
| return locksOnNewObjects; |
| } |
| |
| public Collection<CDOLob<?>> getLobs() |
| { |
| return Collections.emptySet(); // TODO (CD) Did we forget to support this earlier? |
| } |
| |
| public Map<CDOID, CDOObject> getDirtyObjects() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public Map<CDOID, CDOObject> getDetachedObjects() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void preCommit() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void postCommit(CommitTransactionResult result) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public InternalCDOTransaction getTransaction() |
| { |
| return null; |
| } |
| |
| public CDOCommitData getCommitData() |
| { |
| return commitData; |
| } |
| |
| public int getViewID() |
| { |
| return ARTIFICIAL_VIEW_ID; |
| } |
| |
| public String getUserID() |
| { |
| return WriteThroughCommitContext.this.getUserID(); |
| } |
| |
| public boolean isAutoReleaseLocks() |
| { |
| return WriteThroughCommitContext.this.isAutoReleaseLocksEnabled(); |
| } |
| |
| public String getCommitComment() |
| { |
| return WriteThroughCommitContext.this.getCommitComment(); |
| } |
| |
| public CDOBranch getBranch() |
| { |
| return WriteThroughCommitContext.this.getTransaction().getBranch(); |
| } |
| }; |
| |
| // Delegate commit to the master |
| CDOSessionProtocol sessionProtocol = getSynchronizer().getRemoteSession().getSessionProtocol(); |
| result = sessionProtocol.commitDelegation(ctx, monitor); |
| |
| // Stop if commit to master failed |
| String rollbackMessage = result.getRollbackMessage(); |
| if (rollbackMessage != null) |
| { |
| throw new TransactionException(rollbackMessage); |
| } |
| |
| // Prepare data needed for commit result and commit notifications |
| long timeStamp = result.getTimeStamp(); // result is set to null later! |
| addIDMappings(result.getIDMappings()); |
| applyIDMappings(new Monitor()); |
| |
| try |
| { |
| writeThroughCommitLock.lock(); |
| |
| // Commit to the local repository |
| super.preWrite(); |
| super.write(new Monitor()); |
| super.commit(new Monitor()); |
| } |
| finally |
| { |
| writeThroughCommitLock.unlock(); |
| } |
| |
| // Remember commit time in the local repository |
| setLastCommitTimeStamp(timeStamp); |
| setLastReplicatedCommitTime(timeStamp); |
| |
| // Remember commit time in the replicator session. |
| getSynchronizer().getRemoteSession().setLastUpdateTime(timeStamp); |
| } |
| |
| @Override |
| protected long[] createTimeStamp(OMMonitor monitor) |
| { |
| long timeStamp = result.getTimeStamp(); |
| long previousTimeStamp = result.getPreviousTimeStamp(); |
| result = null; |
| |
| InternalRepository repository = getTransaction().getSession().getManager().getRepository(); |
| repository.forceCommitTimeStamp(timeStamp, monitor); |
| |
| return new long[] { timeStamp, previousTimeStamp }; |
| } |
| |
| @Override |
| protected void lockObjects() throws InterruptedException |
| { |
| // Do nothing |
| } |
| |
| private void addIDMappings(Map<CDOID, CDOID> idMappings) |
| { |
| for (Map.Entry<CDOID, CDOID> idMapping : idMappings.entrySet()) |
| { |
| CDOID oldID = idMapping.getKey(); |
| CDOID newID = idMapping.getValue(); |
| addIDMapping(oldID, newID); |
| } |
| } |
| } |
| } |