blob: d70f14681c253046256d7d560e78a4933fc4e2b8 [file] [log] [blame]
/*
* Copyright (c) 2016 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eike Stepper - initial API and implementation
*/
package org.eclipse.emf.cdo.internal.server;
import org.eclipse.emf.cdo.common.id.CDOID;
import org.eclipse.emf.cdo.common.id.CDOIDUtil;
import org.eclipse.emf.cdo.common.revision.CDORevision;
import org.eclipse.emf.cdo.common.revision.CDORevisionHandler;
import org.eclipse.emf.cdo.common.revision.CDORevisionProvider;
import org.eclipse.emf.cdo.common.revision.CDORevisionUtil;
import org.eclipse.emf.cdo.common.util.CDOException;
import org.eclipse.emf.cdo.server.IStoreAccessor.UnitSupport;
import org.eclipse.emf.cdo.server.IUnit;
import org.eclipse.emf.cdo.server.IUnitManager;
import org.eclipse.emf.cdo.server.IView;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
import org.eclipse.emf.cdo.spi.server.InternalCommitContext;
import org.eclipse.emf.cdo.spi.server.InternalRepository;
import org.eclipse.emf.cdo.spi.server.InternalUnitManager;
import org.eclipse.emf.cdo.spi.server.InternalView;
import org.eclipse.net4j.util.container.Container;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
/**
* @author Eike Stepper
*/
public class UnitManager extends Container<IUnit> implements InternalUnitManager
{
private final InternalRepository repository;
private final Map<CDOID, IUnit> units = CDOIDUtil.createMap();
private final Map<CDOID, UnitInitializer> unitInitializers = CDOIDUtil.createMap();
private final Set<ObjectAttacher> objectAttachers = new HashSet<ObjectAttacher>();
private final ReentrantReadWriteLock managerLock = new ReentrantReadWriteLock();
public UnitManager(InternalRepository repository)
{
this.repository = repository;
}
public final InternalRepository getRepository()
{
return repository;
}
public boolean isUnit(CDOID rootID)
{
checkActive();
ReadLock readLock = managerLock.readLock();
readLock.lock();
try
{
// No need to synchronize on units because all other modifiers hold the manager write lock.
return units.containsKey(rootID);
}
finally
{
readLock.unlock();
}
}
public IUnit createUnit(CDOID rootID, IView view, CDORevisionHandler revisionHandler)
{
checkActive();
WriteLock writeLock = managerLock.writeLock();
UnitInitializer unitInitializer;
boolean hook = false;
////////////////////////////////////
// Phase 1: Register (short, locked)
////////////////////////////////////
writeLock.lock();
try
{
createUnitHook1();
// No need to synchronize on units because all other access holds the manager lock.
if (units.containsKey(rootID))
{
return null;
}
// No need to synchronize on unitInitializers because all other access holds the manager lock.
unitInitializer = unitInitializers.get(rootID);
if (unitInitializer != null)
{
hook = true;
}
else
{
checkNotNested(rootID, view, units.keySet());
checkNotNested(rootID, view, unitInitializers.keySet());
unitInitializer = createUnitInitializer(rootID, view, revisionHandler);
// No need to synchronize on unitInitializers because all other access holds the manager lock.
unitInitializers.put(rootID, unitInitializer);
// Synchronize on objectAttachers because objectAttacherFinishedCommit() doesn't acquire the manager lock!
synchronized (objectAttachers)
{
for (ObjectAttacher objectAttacher : objectAttachers)
{
List<CDOID> ids = objectAttacher.removeUnmappedRevisionsFor(unitInitializer);
if (!ids.isEmpty())
{
unitInitializer.addObjectAttacher(objectAttacher, ids);
}
}
}
}
}
finally
{
writeLock.unlock();
}
if (hook)
{
return unitInitializer.hook(rootID, view, revisionHandler);
}
IUnit unit = null;
try
{
/////////////////////////////////////////////////////
// Phase 2: Initialize (potentially long, not locked)
/////////////////////////////////////////////////////
unit = unitInitializer.initialize();
}
finally
{
///////////////////////////////////
// Phase 3: Publish (short, locked)
///////////////////////////////////
try
{
writeLock.lock();
try
{
// No need to synchronize on unitInitializers because all other access holds the manager lock.
unitInitializers.remove(rootID);
if (unit != null)
{
// No need to synchronize on units because all other access holds the manager lock.
units.put(rootID, unit);
}
}
finally
{
writeLock.unlock();
}
}
finally
{
unitInitializer.notifyHookedInitializers();
}
}
fireElementAddedEvent(unit);
return unit;
}
private void checkNotNested(CDOID rootID, IView view, Set<CDOID> unitIDs)
{
InternalCDORevision rootRevision = (InternalCDORevision)view.getRevision(rootID);
CDOID unitID = getUnit(rootRevision, view, unitIDs);
if (unitID != null)
{
throw new CDOException("Attempt to nest the new unit " + rootID + " in the existing unit " + unitID);
}
Set<CDOID> set = Collections.singleton(rootID);
for (CDOID id : unitIDs)
{
InternalCDORevision revision = (InternalCDORevision)view.getRevision(id);
if (getUnit(revision, view, set) != null)
{
throw new CDOException("Attempt to nest the existing unit " + id + " in the new unit " + rootID);
}
}
}
public IUnit getUnit(CDOID rootID)
{
checkActive();
ReadLock readLock = managerLock.readLock();
readLock.lock();
try
{
// No need to synchronize on units because all other modifiers hold the manager write lock.
return units.get(rootID);
}
finally
{
readLock.unlock();
}
}
public IUnit[] getUnits()
{
checkActive();
return getElements();
}
public IUnit[] getElements()
{
ReadLock readLock = managerLock.readLock();
readLock.lock();
try
{
// No need to synchronize on units because all other modifiers hold the manager write lock.
return units.values().toArray(new IUnit[units.size()]);
}
finally
{
readLock.unlock();
}
}
public InternalObjectAttacher attachObjects(InternalCommitContext commitContext)
{
checkActive();
long timeStamp = commitContext.getTimeStamp();
ObjectAttacher objectAttacher = null;
Map<CDOID, CDOID> unitMappings = CDOIDUtil.createMap();
///////////////////////////////////////////////
// Phase 1: Analyze new objects (short, locked)
///////////////////////////////////////////////
ReadLock readLock = managerLock.readLock();
readLock.lock();
try
{
attachObjectsHook1();
Set<CDOID> rootIDs = new HashSet<CDOID>();
// No need to synchronize on units because all other modifiers hold the manager write lock.
rootIDs.addAll(units.keySet());
// No need to synchronize on unitInitializers because all other modifiers hold the manager write lock.
rootIDs.addAll(unitInitializers.keySet());
List<InternalCDORevision> unmappedRevisions = new ArrayList<InternalCDORevision>();
boolean checkUnits = !rootIDs.isEmpty();
for (InternalCDORevision revision : commitContext.getNewObjects())
{
if (checkUnits)
{
CDOID rootID = getUnit(revision, commitContext, rootIDs);
if (rootID != null)
{
unitMappings.put(revision.getID(), rootID);
continue;
}
}
unmappedRevisions.add(revision);
}
if (!unmappedRevisions.isEmpty())
{
objectAttacher = createObjectAttacher(commitContext, unmappedRevisions);
// Read lock holders must synchronize modifications of the private collections.
synchronized (objectAttachers)
{
objectAttachers.add(objectAttacher);
}
}
}
finally
{
readLock.unlock();
}
//////////////////////////////////////////////////////////
// Phase 2: Map objects to existing units (long, unlocked)
//////////////////////////////////////////////////////////
if (!unitMappings.isEmpty())
{
mapAttachedObjectsToUnits(commitContext, timeStamp, unitMappings);
}
return objectAttacher;
}
/**
* Does not hold any manager lock when called.
*/
public void objectAttacherFinishedCommit(ObjectAttacher objectAttacher)
{
checkActive();
synchronized (objectAttachers)
{
objectAttachers.remove(objectAttacher);
}
}
@Override
protected void doActivate() throws Exception
{
super.doActivate();
UnitSupport storeAccessor = (UnitSupport)repository.getStore().getReader(null);
try
{
List<CDOID> roots = storeAccessor.readUnitRoots();
for (CDOID root : roots)
{
IUnit unit = createUnit(root);
// No need to synchronize on units because all other access call checkActive()
units.put(root, unit);
}
}
finally
{
storeAccessor.release();
}
}
@Override
protected void doDeactivate() throws Exception
{
// No need to synchronize on units because all other access call checkActive()
units.clear();
super.doDeactivate();
}
protected Unit createUnit(CDOID root)
{
return new Unit(root);
}
protected UnitInitializer createUnitInitializer(CDOID rootID, IView view, CDORevisionHandler revisionHandler)
{
return new UnitInitializer(rootID, view, revisionHandler);
}
protected ObjectAttacher createObjectAttacher(InternalCommitContext commitContext,
List<InternalCDORevision> unmappedRevisions)
{
return new ObjectAttacher(commitContext, unmappedRevisions);
}
protected void mapAttachedObjectsToUnits(InternalCommitContext commitContext, long timeStamp,
Map<CDOID, CDOID> unitMappings)
{
UnitSupport storeAccessor = (UnitSupport)commitContext.getAccessor();
storeAccessor.writeUnits(unitMappings, timeStamp);
}
protected void createUnitHook1()
{
}
protected void attachObjectsHook1()
{
}
private static CDOID getUnit(InternalCDORevision revision, CDORevisionProvider revisionProvider, Set<CDOID> rootIDs)
{
if (rootIDs.isEmpty())
{
return null;
}
CDOID id = revision.getID();
if (rootIDs.contains(id))
{
return id;
}
CDORevision parentRevision = CDORevisionUtil.getParentRevision(revision, revisionProvider);
if (parentRevision != null)
{
return getUnit((InternalCDORevision)parentRevision, revisionProvider, rootIDs);
}
return null;
}
/**
* @author Eike Stepper
*/
protected class Unit implements IUnit
{
private final CDOID rootID;
private final Set<IView> views = new HashSet<IView>();
public Unit(CDOID rootID)
{
this.rootID = rootID;
}
public IUnitManager getManager()
{
return UnitManager.this;
}
public CDOID getRootID()
{
return rootID;
}
public boolean isOpen()
{
synchronized (views)
{
return !views.isEmpty();
}
}
public void open(IView view, final CDORevisionHandler revisionHandler)
{
synchronized (views)
{
views.add(view);
}
UnitSupport storeAccessor = (UnitSupport)repository.getStore().getReader(null);
try
{
storeAccessor.readUnit(view, rootID, revisionHandler);
}
finally
{
storeAccessor.release();
}
}
public void close(IView view)
{
synchronized (views)
{
views.remove(view);
}
((InternalView)view).closeUnit(rootID);
}
@Override
public String toString()
{
return "Unit[" + rootID + "]";
}
/**
* Does not hold any manager lock when called.
*/
public void initialize(IView view, long timeStamp, CDORevisionHandler revisionHandler,
Map<ObjectAttacher, List<CDOID>> objectAttachers)
{
UnitSupport storeAccessor = (UnitSupport)repository.getStore().getWriter(null);
try
{
Set<CDOID> initializedIDs = new HashSet<CDOID>();
Object initResult = storeAccessor.initUnit(view, rootID, revisionHandler, initializedIDs, timeStamp);
List<CDOID> ids = new ArrayList<CDOID>();
for (Entry<ObjectAttacher, List<CDOID>> entry : objectAttachers.entrySet())
{
ObjectAttacher objectAttacher = entry.getKey();
if (objectAttacher.awaitFinishedCommit())
{
for (CDOID id : entry.getValue())
{
if (!initializedIDs.contains(id))
{
ids.add(id);
}
}
}
}
storeAccessor.finishUnit(view, rootID, revisionHandler, timeStamp, initResult, ids);
}
finally
{
storeAccessor.release();
}
}
}
/**
* @author Eike Stepper
*/
protected class UnitInitializer implements CDORevisionHandler
{
private final long timeStamp = repository.getTimeStamp();
private final Map<ObjectAttacher, List<CDOID>> concurrentObjectAttachers = new HashMap<ObjectAttacher, List<CDOID>>();
private final CountDownLatch unitInitialized = new CountDownLatch(1);
private final CDOID rootID;
private final IView view;
private final CDORevisionHandler revisionHandler;
private final List<CDORevisionHandler> hookedRevisionHandlers = new CopyOnWriteArrayList<CDORevisionHandler>();
private volatile boolean hasHookedRevisionHandlers;
private Unit unit;
public UnitInitializer(CDOID rootID, IView view, CDORevisionHandler revisionHandler)
{
this.rootID = rootID;
this.view = view;
this.revisionHandler = revisionHandler;
}
public CDOID getRootID()
{
return rootID;
}
/**
* Does not hold any manager lock when called.
*/
public IUnit initialize()
{
unit = new Unit(rootID);
unit.initialize(view, timeStamp, revisionHandler, concurrentObjectAttachers);
return unit;
}
/**
* Does not hold any manager lock when called.
*/
public IUnit hook(CDOID rootID, IView view, final CDORevisionHandler revisionHandler)
{
final Set<CDOID> ids = new HashSet<CDOID>();
hookedRevisionHandlers.add(new CDORevisionHandler()
{
public boolean handleRevision(CDORevision revision)
{
ids.add(revision.getID());
return revisionHandler.handleRevision(revision);
}
});
// It's okay to do this unsynchronized. The worst thing that could happen is that the hooked revision handler is
// missed a few times during UnitInitializer.handleRevision(), but that's okay because it probably missed many
// revisions already and therefore performs an openUnit() subsequently, anyways. After all, hooked revision
// handlers,
// i.e., concurrent createUnit() calls for the same unit, are extremely rare.
hasHookedRevisionHandlers = true;
try
{
// Now wait for the main revision handler to finish.
unitInitialized.await();
}
catch (InterruptedException ex)
{
return null;
}
// Now send the missed revisions.
unit.open(view, new CDORevisionHandler()
{
public boolean handleRevision(CDORevision revision)
{
if (ids.contains(revision.getID()))
{
// This revision has already been sent. Skip to the next one.
return true;
}
return revisionHandler.handleRevision(revision);
}
});
return unit;
}
/**
* Does not hold any manager lock when called.
*/
public void notifyHookedInitializers()
{
unitInitialized.countDown();
}
public boolean handleRevision(CDORevision revision)
{
if (revisionHandler.handleRevision(revision))
{
if (hasHookedRevisionHandlers)
{
for (CDORevisionHandler hookedRevisionHandler : hookedRevisionHandlers)
{
hookedRevisionHandler.handleRevision(revision);
}
}
return true;
}
return false;
}
/**
* Holds the manager write lock when called.
*/
public void addObjectAttacher(ObjectAttacher objectAttacher, List<CDOID> ids)
{
concurrentObjectAttachers.put(objectAttacher, ids);
}
}
/**
* @author Eike Stepper
*/
protected class ObjectAttacher implements InternalObjectAttacher
{
private final InternalCommitContext commitContext;
private final List<InternalCDORevision> unmappedRevisions;
private final CountDownLatch commitFinished = new CountDownLatch(1);
private boolean commitSucceeded;
public ObjectAttacher(InternalCommitContext commitContext, List<InternalCDORevision> unmappedRevisions)
{
this.commitContext = commitContext;
this.unmappedRevisions = unmappedRevisions;
}
/**
* Does not hold any manager lock when called.
*/
public void finishedCommit(boolean success)
{
objectAttacherFinishedCommit(this);
commitSucceeded = success;
commitFinished.countDown();
}
/**
* Holds the manager write lock when called.
*/
public List<CDOID> removeUnmappedRevisionsFor(UnitInitializer unitInitializer)
{
List<CDOID> ids = new ArrayList<CDOID>();
Set<CDOID> rootIDs = Collections.singleton(unitInitializer.getRootID());
for (Iterator<InternalCDORevision> it = unmappedRevisions.iterator(); it.hasNext();)
{
InternalCDORevision revision = it.next();
if (getUnit(revision, commitContext, rootIDs) != null)
{
ids.add(revision.getID());
it.remove();
}
}
return ids;
}
public boolean awaitFinishedCommit()
{
try
{
commitFinished.await();
}
catch (InterruptedException ex)
{
return false;
}
return commitSucceeded;
}
}
}