/****************************************************************************
 * Copyright (c) 2004 Composent, Inc. and others.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *    Composent, Inc. - initial API and implementation
 *****************************************************************************/
package org.eclipse.ecf.core.sharedobject;

import java.util.*;
import org.eclipse.ecf.core.events.IContainerConnectedEvent;
import org.eclipse.ecf.core.events.IContainerDisconnectedEvent;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.sharedobject.events.*;
import org.eclipse.ecf.core.util.*;
import org.eclipse.ecf.internal.core.sharedobject.*;
import org.eclipse.osgi.util.NLS;

/**
 * Implementation of two-phase commit for transactional replication of shared
 * objects.
 * 
 * @see ISharedObjectTransactionConfig
 * @see ISharedObjectTransactionParticipantsFilter
 * 
 */
public class TwoPhaseCommitEventProcessor implements IEventProcessor, ISharedObjectContainerTransaction {
	BaseSharedObject sharedObject = null;

	byte transactionState = ISharedObjectContainerTransaction.ACTIVE;

	Object lock = new Object();

	List participants = new Vector();

	Map failed = new HashMap();

	int timeout = ISharedObjectTransactionConfig.DEFAULT_TIMEOUT;

	int minFailedToAbort = 0;

	long identifier = 0;

	ISharedObjectTransactionParticipantsFilter participantsFilter = null;

	public TwoPhaseCommitEventProcessor(BaseSharedObject bse, ISharedObjectTransactionConfig config) {
		this.sharedObject = bse;
		if (config == null) {
			config = new TransactionSharedObjectConfiguration();
		}
		this.timeout = config.getTimeout();
		this.participantsFilter = config.getParticipantsFilter();
	}

	protected void trace(String msg) {
		Trace.trace(Activator.PLUGIN_ID, msg);
	}

	protected void traceStack(String msg, Throwable t) {
		Trace.catching(Activator.PLUGIN_ID, SharedObjectDebugOptions.EXCEPTIONS_CATCHING, TwoPhaseCommitEventProcessor.class, "traceStack", t); //$NON-NLS-1$
	}

	protected int getTimeout() {
		return timeout;
	}

	protected int getMinFailedToAbort() {
		return minFailedToAbort;
	}

	protected boolean isPrimary() {
		return getSharedObject().isPrimary();
	}

	protected BaseSharedObject getSharedObject() {
		return sharedObject;
	}

	protected ID getHomeID() {
		return getSharedObject().getHomeContainerID();
	}

	protected void addParticipants(ID[] ids) {
		if (ids != null) {
			for (int i = 0; i < ids.length; i++) {
				trace("addParticipant(" + ids[i] + ")"); //$NON-NLS-1$ //$NON-NLS-2$
				if (!getHomeID().equals(ids[i]))
					participants.add(ids[i]);
			}
		}
	}

	protected void removeParticipant(ID id) {
		if (id != null) {
			trace("removeParticipant(" + id + ")"); //$NON-NLS-1$ //$NON-NLS-2$
			participants.remove(id);
		}
	}

	protected void addFailed(ID remote, Throwable failure) {
		if (remote != null && failure != null) {
			trace("addFailed(" + remote + "," + failure + ")"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
			failed.put(remote, failure);
		}
	}

	protected ISharedObjectContext getContext() {
		return getSharedObject().getContext();
	}

	/*
	 * Implementation of IEventProcessor. These methods are entry point methods
	 * for BaseSharedObject method dispatch to call
	 */
	/*
	 * (non-Javadoc)
	 * 
	 * @see org.eclipse.ecf.core.util.IEventProcessor#processEvent(org.eclipse.ecf.core.util.Event)
	 */
	public boolean processEvent(Event event) {
		if (event instanceof ISharedObjectActivatedEvent) {
			handleActivated((ISharedObjectActivatedEvent) event);
		} else if (event instanceof IContainerConnectedEvent) {
			handleJoined((IContainerConnectedEvent) event);
		} else if (event instanceof ISharedObjectCreateResponseEvent) {
			handleCreateResponse((ISharedObjectCreateResponseEvent) event);
		} else if (event instanceof IContainerDisconnectedEvent) {
			handleDeparted((IContainerDisconnectedEvent) event);
		} else if (event instanceof ISharedObjectMessageEvent) {
			ISharedObjectMessageEvent some = (ISharedObjectMessageEvent) event;
			Object data = some.getData();
			if (data instanceof ISharedObjectCommitEvent)
				localCommitted();
		}
		// Let other event processors have a shot at this event
		return false;
	}

	protected void handleActivated(ISharedObjectActivatedEvent event) {
		trace("handleActivated(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
		// No other state changes while this is going on
		synchronized (lock) {
			if (isPrimary()) {
				// Primary
				handlePrimaryActivated(event);
			} else {
				handleReplicaActivated(event);
			}
			// Notify any threads waiting on state change
			lock.notifyAll();
		}
	}

	protected void replicateTo(ID[] remotes) {
		getSharedObject().replicateToRemoteContainers(remotes);
	}

	protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) {
		trace("handlePrimaryActivated(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
		// First get current group membership
		if (getContext().getConnectedID() != null) {
			ID[] groupMembers = getContext().getGroupMemberIDs();
			// Now get participants
			ID[] transactionParticipants = null;
			// If there is a participants filter specified then use it and ask
			// it to return an ID [] of participants (given
			// the current group membership
			if (participantsFilter != null) {
				transactionParticipants = participantsFilter.filterParticipants(groupMembers);
			}
			// replicate
			if (transactionParticipants == null) {
				// This means that all current group members should be included
				// as participants
				replicateTo(null);
				transactionParticipants = groupMembers;
			} else {
				// This means the participants filter provided us with an ID []
				// and so we replicate only to that ID []
				replicateTo(transactionParticipants);
			}

			// Add participants to the collection
			addParticipants(transactionParticipants);
			// Now set transaction state to VOTING
			setTransactionState(ISharedObjectContainerTransaction.VOTING);
		} else {
			setTransactionState(ISharedObjectContainerTransaction.COMMITTED);
		}
	}

	private long getNextIdentifier() {
		return identifier++;
	}

	protected void handleReplicaActivated(ISharedObjectActivatedEvent event) {
		trace("handleReplicaActivated(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
		try {
			// Try to respond with create success message back to host
			getContext().sendCreateResponse(getHomeID(), null, getNextIdentifier());
			// If above succeeds, we're now in prepared state
			setTransactionState(ISharedObjectContainerTransaction.PREPARED);
		} catch (Exception except) {
			// If throws exception, we're doomed
			traceStack("handleReplicaActivated(" + event + ")", except); //$NON-NLS-1$ //$NON-NLS-2$
			setTransactionState(ISharedObjectContainerTransaction.ABORTED);
		}
	}

	protected void handleJoined(IContainerConnectedEvent event) {
		trace("handleJoined(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
		// If we are primary then this event matters to us
		if (isPrimary()) {
			// If transactionstate is VOTING then we replicate ourselves to
			// participants
			if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
				synchronized (lock) {
					// First send replicate message *no matter what state we are
					// in*
					ID[] newMember = new ID[] {event.getTargetID()};
					replicateTo(newMember);
					addParticipants(newMember);
				}
			}
		}
	}

	protected void handleCreateResponse(ISharedObjectCreateResponseEvent event) {
		trace("handleCreateResponse(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
		if (isPrimary()) {
			synchronized (lock) {
				Throwable except = event.getException();
				ID remoteID = event.getRemoteContainerID();
				long ident = event.getSequence();
				if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
					if (except == null) {
						removeParticipant(remoteID);
					} else {
						addFailed(remoteID, except);
					}
				} else {
					handleVotingCompletedCreateResponse(remoteID, except, ident);
				}
				lock.notifyAll();
			}
		} else {
			// we don't care as we are note transaction monitor
		}
	}

	protected void handleDeparted(IContainerDisconnectedEvent event) {
		trace("handleDeparted(" + event + ")"); //$NON-NLS-1$ //$NON-NLS-2$
		if (isPrimary()) {
			ID remoteID = event.getTargetID();
			synchronized (lock) {
				if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
					addFailed(remoteID, new Exception("Container " + remoteID //$NON-NLS-1$
							+ " left")); //$NON-NLS-1$
				}
				lock.notifyAll();
			}
		} else {
			// we don't care as we are not transaction monitor
		}
	}

	protected void handleVotingCompletedCreateResponse(ID fromID, Throwable e, long identifier1) {
		trace("handleVotingCompletedCreateResponse(" + fromID + "," + e + "," //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
				+ identifier1 + ")"); //$NON-NLS-1$
		// If remote creation was successful, simply send commit message back.
		if (e == null) {
			try {
				getSharedObject().getContext().sendMessage(fromID, new SharedObjectCommitEvent(getSharedObject().getID()));
			} catch (Exception e2) {
				traceStack("Exception in sendCommit to " + fromID, e2); //$NON-NLS-1$
			}
		} else {
			// Too late to vote no
			handlePostCommitFailure(fromID, e, identifier1);
		}
	}

	protected void handlePostCommitFailure(ID fromID, Throwable e, long identifier1) {
		// Do nothing but report
		trace("handlePostCommitFailure(" + fromID + "," + e + "," + identifier1 //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
				+ ")"); //$NON-NLS-1$
	}

	protected void sendCommit() throws SharedObjectAddAbortException {
		try {
			getContext().sendMessage(null, new SharedObjectCommitEvent(getSharedObject().getID()));
		} catch (Exception e2) {
			doTMAbort(new SharedObjectAddAbortException(Messages.TwoPhaseCommitEventProcessor_Exception_Shared_Object_Add_Abort, e2, getTimeout()));
		}
	}

	public byte getTransactionState() {
		synchronized (lock) {
			return transactionState;
		}
	}

	protected void setTransactionState(byte state) {
		synchronized (lock) {
			transactionState = state;
		}
	}

	public void waitToCommit() throws SharedObjectAddAbortException {
		if (getTransactionState() == ISharedObjectContainerTransaction.COMMITTED)
			return;
		synchronized (lock) {
			long end = System.currentTimeMillis() + getTimeout();
			try {
				while (!isVotingCompleted()) {
					long wait = end - System.currentTimeMillis();
					trace("waitForFinish waiting " + wait + "ms on " //$NON-NLS-1$ //$NON-NLS-2$
							+ getSharedObject().getID());
					if (wait <= 0L)
						throw new SharedObjectAddAbortException(NLS.bind(Messages.TwoPhaseCommitEventProcessor_Exception_Commit_Timeout, new Object[] {getSharedObject().getID(), getHomeID()}), (Throwable) null, getTimeout());
					// Wait right here
					lock.wait(wait);
				}
			} catch (Exception except) {
				// Aborted for some reason. Clean up and throw
				doTMAbort(except);
			}
			// Success. Send commit to remotes and clean up before returning.
			doTMCommit();
		}
	}

	protected void doTMAbort(Throwable except) throws SharedObjectAddAbortException {
		trace("doTMAbort:" + except); //$NON-NLS-1$
		// Set our own state variable to ABORTED
		setTransactionState(ISharedObjectContainerTransaction.ABORTED);
		// Send destroy message here so all remotes get destroyed, and we remove
		// ourselves from local space as well.
		getSharedObject().destroySelf();
		// throw so caller gets exception and can deal with it
		if (except instanceof SharedObjectAddAbortException)
			throw (SharedObjectAddAbortException) except;
		throw new SharedObjectAddAbortException(Messages.TwoPhaseCommitEventProcessor_Exception_Shared_Object_Add_Abort, except, getTimeout());
	}

	protected void doTMCommit() throws SharedObjectAddAbortException {
		trace("doTMCommit"); //$NON-NLS-1$
		// Make sure we are connected. If so then send commit message
		if (getSharedObject().getConnectedID() != null) {
			sendCommit();
		}
		// Call local committed message
		localCommitted();
	}

	protected void localCommitted() {
		trace("localCommitted()"); //$NON-NLS-1$
		// Set state variable to committed.
		setTransactionState(ISharedObjectContainerTransaction.COMMITTED);
		getSharedObject().creationCompleted();
	}

	protected boolean isVotingCompleted() throws SharedObjectAddAbortException {
		// The test here is is we've received any indication of failed
		// participants in the transaction. If so, we throw.
		if (getTransactionState() == ISharedObjectContainerTransaction.COMMITTED)
			return true;
		if (failed.size() > getMinFailedToAbort()) {
			// Abort!
			trace("isVotingCompleted:aborting:failed>" + getMinFailedToAbort() //$NON-NLS-1$
					+ ":failed=" + failed); //$NON-NLS-1$
			throw new SharedObjectAddAbortException(Messages.TwoPhaseCommitEventProcessor_Exception_Shared_Object_Add_Abort, participants, failed, getTimeout());
			// If no problems, and the number of participants to here from is 0,
			// then we're done
		} else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING && participants.size() == 0) {
			// Success!
			trace("isVotingCompleted() returning true"); //$NON-NLS-1$
			return true;
		}
		// Else continue waiting
		trace("isVotingCompleted:false"); //$NON-NLS-1$
		return false;
	}
}
