blob: b4cd65892e422c5adba1dbdf52772554b0db6f16 [file] [log] [blame]
/****************************************************************************
* Copyright (c) 2004 Composent, Inc. and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Composent, Inc. - initial API and implementation
*****************************************************************************/
package org.eclipse.ecf.core.sharedobject;
import java.util.*;
import org.eclipse.ecf.core.events.IContainerConnectedEvent;
import org.eclipse.ecf.core.events.IContainerDisconnectedEvent;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.sharedobject.events.*;
import org.eclipse.ecf.core.util.*;
import org.eclipse.ecf.internal.core.sharedobject.*;
import org.eclipse.osgi.util.NLS;
/**
* Implementation of two-phase commit for transactional replication of shared
* objects.
*
* @see ISharedObjectTransactionConfig
* @see ISharedObjectTransactionParticipantsFilter
*
*/
public class TwoPhaseCommitEventProcessor implements IEventProcessor, ISharedObjectContainerTransaction {
BaseSharedObject sharedObject = null;
byte transactionState = ISharedObjectContainerTransaction.ACTIVE;
Object lock = new Object();
List participants = new Vector();
Map failed = new HashMap();
int timeout = ISharedObjectTransactionConfig.DEFAULT_TIMEOUT;
int minFailedToAbort = 0;
long identifier = 0;
ISharedObjectTransactionParticipantsFilter participantsFilter = null;
public TwoPhaseCommitEventProcessor(BaseSharedObject bse, ISharedObjectTransactionConfig config) {
this.sharedObject = bse;
if (config == null) {
config = new TransactionSharedObjectConfiguration();
}
this.timeout = config.getTimeout();
this.participantsFilter = config.getParticipantsFilter();
}
protected void trace(String msg) {
Trace.trace(Activator.PLUGIN_ID, msg);
}
protected void traceStack(String msg, Throwable t) {
Trace.catching(Activator.PLUGIN_ID, SharedObjectDebugOptions.EXCEPTIONS_CATCHING, TwoPhaseCommitEventProcessor.class, "traceStack", t); //$NON-NLS-1$
}
protected int getTimeout() {
return timeout;
}
protected int getMinFailedToAbort() {
return minFailedToAbort;
}
protected boolean isPrimary() {
return getSharedObject().isPrimary();
}
protected BaseSharedObject getSharedObject() {
return sharedObject;
}
protected ID getHomeID() {
return getSharedObject().getHomeContainerID();
}
protected void addParticipants(ID[] ids) {
if (ids != null) {
for (int i = 0; i < ids.length; i++) {
trace("addParticipant(" + ids[i] + ")"); //$NON-NLS-1$ //$NON-NLS-2$
if (!getHomeID().equals(ids[i]))
participants.add(ids[i]);
}
}
}
protected void removeParticipant(ID id) {
if (id != null) {
trace("removeParticipant(" + id + ")"); //$NON-NLS-1$ //$NON-NLS-2$
participants.remove(id);
}
}
protected void addFailed(ID remote, Throwable failure) {
if (remote != null && failure != null) {
trace("addFailed(" + remote + "," + failure + ")"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
failed.put(remote, failure);
}
}
protected ISharedObjectContext getContext() {
return getSharedObject().getContext();
}
/*
* Implementation of IEventProcessor. These methods are entry point methods
* for BaseSharedObject method dispatch to call
*/
/*
* (non-Javadoc)
*
* @see org.eclipse.ecf.core.util.IEventProcessor#processEvent(org.eclipse.ecf.core.util.Event)
*/
public boolean processEvent(Event event) {
if (event instanceof ISharedObjectActivatedEvent) {
handleActivated((ISharedObjectActivatedEvent) event);
} else if (event instanceof IContainerConnectedEvent) {
handleJoined((IContainerConnectedEvent) event);
} else if (event instanceof ISharedObjectCreateResponseEvent) {
handleCreateResponse((ISharedObjectCreateResponseEvent) event);
} else if (event instanceof IContainerDisconnectedEvent) {
handleDeparted((IContainerDisconnectedEvent) event);
} else if (event instanceof ISharedObjectMessageEvent) {
ISharedObjectMessageEvent some = (ISharedObjectMessageEvent) event;
Object data = some.getData();
if (data instanceof ISharedObjectCommitEvent)
localCommitted();
}
// Let other event processors have a shot at this event
return false;
}
protected void handleActivated(ISharedObjectActivatedEvent event) {
trace("handleActivated(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
// No other state changes while this is going on
synchronized (lock) {
if (isPrimary()) {
// Primary
handlePrimaryActivated(event);
} else {
handleReplicaActivated(event);
}
// Notify any threads waiting on state change
lock.notifyAll();
}
}
protected void replicateTo(ID[] remotes) {
getSharedObject().replicateToRemoteContainers(remotes);
}
protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) {
trace("handlePrimaryActivated(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
// First get current group membership
if (getContext().getConnectedID() != null) {
ID[] groupMembers = getContext().getGroupMemberIDs();
// Now get participants
ID[] transactionParticipants = null;
// If there is a participants filter specified then use it and ask
// it to return an ID [] of participants (given
// the current group membership
if (participantsFilter != null) {
transactionParticipants = participantsFilter.filterParticipants(groupMembers);
}
// replicate
if (transactionParticipants == null) {
// This means that all current group members should be included
// as participants
replicateTo(null);
transactionParticipants = groupMembers;
} else {
// This means the participants filter provided us with an ID []
// and so we replicate only to that ID []
replicateTo(transactionParticipants);
}
// Add participants to the collection
addParticipants(transactionParticipants);
// Now set transaction state to VOTING
setTransactionState(ISharedObjectContainerTransaction.VOTING);
} else {
setTransactionState(ISharedObjectContainerTransaction.COMMITTED);
}
}
private long getNextIdentifier() {
return identifier++;
}
protected void handleReplicaActivated(ISharedObjectActivatedEvent event) {
trace("handleReplicaActivated(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
try {
// Try to respond with create success message back to host
getContext().sendCreateResponse(getHomeID(), null, getNextIdentifier());
// If above succeeds, we're now in prepared state
setTransactionState(ISharedObjectContainerTransaction.PREPARED);
} catch (Exception except) {
// If throws exception, we're doomed
traceStack("handleReplicaActivated(" + event + ")", except); //$NON-NLS-1$ //$NON-NLS-2$
setTransactionState(ISharedObjectContainerTransaction.ABORTED);
}
}
protected void handleJoined(IContainerConnectedEvent event) {
trace("handleJoined(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
// If we are primary then this event matters to us
if (isPrimary()) {
// If transactionstate is VOTING then we replicate ourselves to
// participants
if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
synchronized (lock) {
// First send replicate message *no matter what state we are
// in*
ID[] newMember = new ID[] {event.getTargetID()};
replicateTo(newMember);
addParticipants(newMember);
}
}
}
}
protected void handleCreateResponse(ISharedObjectCreateResponseEvent event) {
trace("handleCreateResponse(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
if (isPrimary()) {
synchronized (lock) {
Throwable except = event.getException();
ID remoteID = event.getRemoteContainerID();
long ident = event.getSequence();
if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
if (except == null) {
removeParticipant(remoteID);
} else {
addFailed(remoteID, except);
}
} else {
handleVotingCompletedCreateResponse(remoteID, except, ident);
}
lock.notifyAll();
}
} else {
// we don't care as we are note transaction monitor
}
}
protected void handleDeparted(IContainerDisconnectedEvent event) {
trace("handleDeparted(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
if (isPrimary()) {
ID remoteID = event.getTargetID();
synchronized (lock) {
if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
addFailed(remoteID, new Exception("Container " + remoteID //$NON-NLS-1$
+ " left")); //$NON-NLS-1$
}
lock.notifyAll();
}
} else {
// we don't care as we are not transaction monitor
}
}
protected void handleVotingCompletedCreateResponse(ID fromID, Throwable e, long identifier1) {
trace("handleVotingCompletedCreateResponse(" + fromID + "," + e + "," //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ identifier1 + ")"); //$NON-NLS-1$
// If remote creation was successful, simply send commit message back.
if (e == null) {
try {
getSharedObject().getContext().sendMessage(fromID, new SharedObjectCommitEvent(getSharedObject().getID()));
} catch (Exception e2) {
traceStack("Exception in sendCommit to " + fromID, e2); //$NON-NLS-1$
}
} else {
// Too late to vote no
handlePostCommitFailure(fromID, e, identifier1);
}
}
protected void handlePostCommitFailure(ID fromID, Throwable e, long identifier1) {
// Do nothing but report
trace("handlePostCommitFailure(" + fromID + "," + e + "," + identifier1 //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ ")"); //$NON-NLS-1$
}
protected void sendCommit() throws SharedObjectAddAbortException {
try {
getContext().sendMessage(null, new SharedObjectCommitEvent(getSharedObject().getID()));
} catch (Exception e2) {
doTMAbort(new SharedObjectAddAbortException(Messages.TwoPhaseCommitEventProcessor_Exception_Shared_Object_Add_Abort, e2, getTimeout()));
}
}
public byte getTransactionState() {
synchronized (lock) {
return transactionState;
}
}
protected void setTransactionState(byte state) {
synchronized (lock) {
transactionState = state;
}
}
public void waitToCommit() throws SharedObjectAddAbortException {
if (getTransactionState() == ISharedObjectContainerTransaction.COMMITTED)
return;
synchronized (lock) {
long end = System.currentTimeMillis() + getTimeout();
try {
while (!isVotingCompleted()) {
long wait = end - System.currentTimeMillis();
trace("waitForFinish waiting " + wait + "ms on " //$NON-NLS-1$ //$NON-NLS-2$
+ getSharedObject().getID());
if (wait <= 0L)
throw new SharedObjectAddAbortException(NLS.bind(Messages.TwoPhaseCommitEventProcessor_Exception_Commit_Timeout, new Object[] {getSharedObject().getID(), getHomeID()}), (Throwable) null, getTimeout());
// Wait right here
lock.wait(wait);
}
} catch (Exception except) {
// Aborted for some reason. Clean up and throw
doTMAbort(except);
}
// Success. Send commit to remotes and clean up before returning.
doTMCommit();
}
}
protected void doTMAbort(Throwable except) throws SharedObjectAddAbortException {
trace("doTMAbort:" + except); //$NON-NLS-1$
// Set our own state variable to ABORTED
setTransactionState(ISharedObjectContainerTransaction.ABORTED);
// Send destroy message here so all remotes get destroyed, and we remove
// ourselves from local space as well.
getSharedObject().destroySelf();
// throw so caller gets exception and can deal with it
if (except instanceof SharedObjectAddAbortException)
throw (SharedObjectAddAbortException) except;
throw new SharedObjectAddAbortException(Messages.TwoPhaseCommitEventProcessor_Exception_Shared_Object_Add_Abort, except, getTimeout());
}
protected void doTMCommit() throws SharedObjectAddAbortException {
trace("doTMCommit"); //$NON-NLS-1$
// Make sure we are connected. If so then send commit message
if (getSharedObject().getConnectedID() != null) {
sendCommit();
}
// Call local committed message
localCommitted();
}
protected void localCommitted() {
trace("localCommitted()"); //$NON-NLS-1$
// Set state variable to committed.
setTransactionState(ISharedObjectContainerTransaction.COMMITTED);
getSharedObject().creationCompleted();
}
protected boolean isVotingCompleted() throws SharedObjectAddAbortException {
// The test here is is we've received any indication of failed
// participants in the transaction. If so, we throw.
if (getTransactionState() == ISharedObjectContainerTransaction.COMMITTED)
return true;
if (failed.size() > getMinFailedToAbort()) {
// Abort!
trace("isVotingCompleted:aborting:failed>" + getMinFailedToAbort() //$NON-NLS-1$
+ ":failed=" + failed); //$NON-NLS-1$
throw new SharedObjectAddAbortException(Messages.TwoPhaseCommitEventProcessor_Exception_Shared_Object_Add_Abort, participants, failed, getTimeout());
// If no problems, and the number of participants to here from is 0,
// then we're done
} else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING && participants.size() == 0) {
// Success!
trace("isVotingCompleted() returning true"); //$NON-NLS-1$
return true;
}
// Else continue waiting
trace("isVotingCompleted:false"); //$NON-NLS-1$
return false;
}
}