blob: e7f3f93c09dd5b9b41724b64ca1f55800c5f6cef [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2010, 2017 IBM Corporation and others.
*
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package org.eclipse.equinox.coordinator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import org.eclipse.osgi.util.NLS;
import org.osgi.framework.Bundle;
import org.osgi.service.coordinator.Coordination;
import org.osgi.service.coordinator.CoordinationException;
import org.osgi.service.coordinator.CoordinationPermission;
import org.osgi.service.coordinator.Participant;
import org.osgi.service.log.LogService;
public class CoordinationImpl {
// Holds a strong reference to the CoordinationWeakReference object associated
// with this CoordinationImpl. Serves no other purpose. Needs no guarding.
CoordinationWeakReference reference;
private volatile Throwable failure;
private volatile boolean terminated;
private volatile boolean ending = false;
private Date deadline;
private CoordinationImpl enclosingCoordination;
private Thread thread;
private long totalTimeout;
private TimerTask timerTask;
private final CoordinatorImpl coordinator;
private final long id;
private final String name;
private final List<Participant> participants;
// Store a referent to be used by clients other than the initiator. It must
// not be a reference to the referent returned to the initiator.
private final CoordinationReferent referent;
private final Map<Class<?>, Object> variables;
public CoordinationImpl(long id, String name, long timeout, CoordinatorImpl coordinator) {
validateName(name);
validateTimeout(timeout);
this.id = id;
this.name = name;
totalTimeout = timeout;
this.coordinator = coordinator;
participants = Collections.synchronizedList(new ArrayList<Participant>());
variables = new HashMap<Class<?>, Object>();
// Not an escaping 'this' reference. It will not escape the thread calling the constructor.
referent = new CoordinationReferent(this);
}
public void addParticipant(Participant participant) throws CoordinationException {
// This method requires the PARTICIPATE permission.
coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
if (participant == null)
throw new NullPointerException(NLS.bind(Messages.NullParameter, "participant")); //$NON-NLS-1$
/* The caller has permission. Check to see if the participant is already
* participating in another coordination. Do this in a loop in case the
* participant must wait for the other coordination to finish. The loop
* will exit under the following circumstances.
*
* (1) This coordination is terminated.
* (2) The participant is already participating in another coordination
* using the same thread as this one.
* (3) This thread is interrupted.
* (4) The participant is not participating in another coordination.
*/
while (true) {
CoordinationImpl coordination;
synchronized (this) {
// Check to see if this coordination has already terminated. If so,
// throw the appropriate exception.
checkTerminated();
coordination = coordinator.addParticipant(participant, this);
if (coordination == null) {
// The same participant is not currently participating in
// any coordination. Add it to this coordination and break
// out of the loop.
participants.add(participant);
break;
} else if (coordination == this) {
// The same participant is being added twice to this
// coordination. Nothing to do.
break;
} else {
// This means the participant is already participating in another
// coordination. Check to see if it's on the same thread.
Thread t = coordination.getThread();
// If thread is null, the coordination is not associated with
// any thread, and there's nothing to compare. If the coordination
// is using this thread, then we can't block due to risk of deadlock.
if (t == Thread.currentThread()) {
throw new CoordinationException(NLS.bind(Messages.Deadlock, new Object[]{participant, getName(), getId()}), referent, CoordinationException.DEADLOCK_DETECTED);
}
}
}
// The participant is already participating in another coordination
// that's not using this thread. Block until that coordination has
// finished. A decision was made here to use a timeout and incur the
// expense of waking up and rejoining in order to make a reasonably
// timely exit if this coordination terminates.
try {
coordination.join(1000);
} catch (InterruptedException e) {
String message = NLS.bind(Messages.LockInterrupted, new Object[]{participant, name, id, coordination.getName(), coordination.getId()});
coordinator.getLogService().log(LogService.LOG_DEBUG, message, e);
// This thread was interrupted while waiting for the coordination
// to terminate.
throw new CoordinationException(message, referent, CoordinationException.LOCK_INTERRUPTED, e);
}
}
}
public void end() throws CoordinationException {
coordinator.checkPermission(CoordinationPermission.INITIATE, name);
// Terminating the coordination must be atomic.
synchronized (this) {
/*
* Set the ending flag to avoid spurious failures for orphans
* It appears the VM can aggressively puts objects on the queue if the last call is done in a finally
* Coordination c = coordinator.begin("name", 0);
* try {
* ...
* } finally {
* c.end()
* }
* In some cases it appears that while in the finally call to c.end()
* that c can become put on the queue for GC.
* This makes it eligible for orphan processing which will cause
* issues below when calling methods that invoke
* CoordinationWeakReference.processOrphanedCoordinations()
* We set an ending flag so that we can detect this
*/
ending = true;
// If this coordination is associated with a thread, an additional
// check is required.
if (thread != null) {
// Coordinations may only be ended by the same thread that
// pushed them onto the stack, if any.
if (thread != Thread.currentThread()) {
throw new CoordinationException(NLS.bind(Messages.EndingThreadNotSame, new Object[]{name, id, thread, Thread.currentThread()}), referent, CoordinationException.WRONG_THREAD);
}
// Unwind the stack in case there are other coordinations higher
// up than this one. See bug 421487 for why peek() may be null.
for (Coordination peeked = coordinator.peek(); !(peeked == null || referent.equals(peeked)); peeked = coordinator.peek()) {
try {
peeked.end();
}
catch (CoordinationException e) {
peeked = coordinator.peek();
if (peeked != null)
peeked.fail(e);
}
}
// A coordination is removed from the thread local stack only when being ended.
// This must occur even if the coordination is already terminated due to a
// failure.
coordinator.pop();
}
terminate();
}
// Notify participants this coordination has ended. Track whether or
// not a partial ending has occurred.
Exception exception = null;
Participant exceptionParticipant = null;
// No additional synchronization is needed here because the participant
// list will not be modified post termination.
List<Participant> participantsToNotify = new ArrayList<Participant>(this.participants);
Collections.reverse(participantsToNotify);
for (Participant participant : participantsToNotify) {
try {
participant.ended(referent);
} catch (Exception e) {
coordinator.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.ParticipantEndedError, new Object[]{participant, name, id}), e);
// Only the first exception will be propagated.
if (exception == null) {
exception = e;
exceptionParticipant = participant;
}
}
}
synchronized (this) {
// Notify everything joined to this coordination that it has finished.
notifyAll();
}
// If a partial ending has occurred, throw the required exception.
if (exception != null) {
throw new CoordinationException(NLS.bind(Messages.CoordinationPartiallyEnded, new Object[]{name, id, exceptionParticipant}), referent, CoordinationException.PARTIALLY_ENDED, exception);
}
}
public long extendTimeout(long timeInMillis) throws CoordinationException {
coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
validateTimeout(timeInMillis);
// We don't want this coordination to terminate before the new timer is
// in place.
synchronized (this) {
// Check to see if this coordination has already terminated. If so,
// throw the appropriate exception.
checkTerminated();
// If there was no previous timeout set, return 0 indicating that no
// extension has taken place.
if (timerTask == null)
return 0;
// Passing anything less than zero as well as zero itself will return the
// existing deadline. The deadline will not be null if timerTask is not null.
if (timeInMillis == 0)
return deadline.getTime();
long maxTimeout = coordinator.getMaxTimeout();
long newTotalTimeout = totalTimeout + timeInMillis;
// If there is no maximum timeout, there's no need to track the total timeout.
if (maxTimeout != 0) {
// If the max timeout has already been reached, return 0 indicating that no
// extension has taken place.
if (totalTimeout == maxTimeout)
return 0;
// If the extension would exceed the maximum timeout, add as much time
// as possible.
else if (newTotalTimeout > maxTimeout) {
totalTimeout = maxTimeout;
// Adjust the requested extension amount with the allowable amount.
timeInMillis = newTotalTimeout - maxTimeout;
}
// Otherwise, accept the full extension.
else
totalTimeout = newTotalTimeout;
}
// Cancel the current timeout.
boolean cancelled = timerTask.cancel();
if (!cancelled) {
// This means the previous task has run and is waiting to get a lock on
// this coordination. We can't throw an exception yet because we can't
// know which one to use (ALREADY_ENDED or FAILED). Once the lock is
// released, the running task may fail this coordination due to a timeout,
// or something else might be waiting to fail this coordination for other
// reasons or to end it. We simply don't know who will win the race.
try {
// Wait until this coordination terminates.
join(0);
// Now determine how it terminated and throw the appropriate exception.
checkTerminated();
}
catch (InterruptedException e) {
throw new CoordinationException(NLS.bind(Messages.InterruptedTimeoutExtension, new Object[]{totalTimeout, getName(), getId(), timeInMillis}), referent, CoordinationException.UNKNOWN, e);
}
}
// Create the new timeout.
timerTask = new CoordinationTimerTask(this);
// Extend the current deadline.
deadline = new Date(deadline.getTime() + timeInMillis);
// Schedule the new timeout.
coordinator.schedule(timerTask, deadline);
// Return the new deadline.
return deadline.getTime();
}
}
public boolean fail(Throwable reason) {
coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
// The reason must not be null.
if (reason == null)
throw new NullPointerException(NLS.bind(Messages.MissingFailureCause, getName(), getId()));
// Terminating the coordination must be atomic.
synchronized (this) {
// If this coordination is terminated, return false. Do not throw a
// CoordinationException as in other methods.
if (terminated)
return false;
// This coordination has not already terminated, so terminate now.
terminate();
// Store the reason for the failure.
failure = reason;
}
// Notify participants this coordination has failed.
// No additional synchronization is needed here because the participant
// list will not be modified post termination.
List<Participant> participantsToNotify = new ArrayList<Participant>(this.participants);
Collections.reverse(participantsToNotify);
for (Participant participant : participantsToNotify) {
try {
participant.failed(referent);
} catch (Exception e) {
coordinator.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.ParticipantFailedError, new Object[]{participant, name, id}), e);
}
}
synchronized (this) {
// Notify everything joined to this coordination that it has finished.
notifyAll();
}
// Return true to indicate this call resulted in the coordination's failure.
return true;
}
public Bundle getBundle() {
coordinator.checkPermission(CoordinationPermission.ADMIN, name);
return coordinator.getBundle();
}
public synchronized Coordination getEnclosingCoordination() {
coordinator.checkPermission(CoordinationPermission.ADMIN, name);
if (enclosingCoordination == null)
return null;
return enclosingCoordination.getReferent();
}
public Throwable getFailure() {
coordinator.checkPermission(CoordinationPermission.INITIATE, name);
return failure;
}
public long getId() {
return id;
}
public String getName() {
return name;
}
public List<Participant> getParticipants() {
// This method requires the ADMIN permission.
coordinator.checkPermission(CoordinationPermission.INITIATE, name);
// Return a mutable snapshot.
synchronized (participants) {
return new ArrayList<Participant>(participants);
}
}
public synchronized Thread getThread() {
coordinator.checkPermission(CoordinationPermission.ADMIN, name);
return thread;
}
public Map<Class<?>, Object> getVariables() {
coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
return variables;
}
public boolean isTerminated() {
return terminated;
}
public boolean isEnding() {
return ending;
}
public void join(final long timeInMillis) throws InterruptedException {
coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
validateTimeout(timeInMillis);
// How much system time has elapsed across all waits.
long elapsed = 0;
// The system time at the start of the wait.
long start = System.currentTimeMillis();
// Wait until this coordination has terminated. Guard against spurious
// wakeups using the termination status.
synchronized (this) {
while (!terminated) {
// Wait for the desired amount of time minus any time that has already elapsed.
wait(timeInMillis - elapsed);
// Only track elapsed time if a definite interval was specified.
if (timeInMillis != 0) {
// Update the elapsed time.
elapsed = System.currentTimeMillis() - start;
// If the allotted wait time has fully expired, we're done.
if (elapsed >= timeInMillis) // Don't allow a wait of zero here!
break;
}
}
}
}
public Coordination push() throws CoordinationException {
coordinator.checkPermission(CoordinationPermission.INITIATE, name);
synchronized (this) {
checkTerminated();
coordinator.push(this);
}
return referent;
}
LogTracker getLogService() {
return coordinator.getLogService();
}
// Return the referent to be used by clients other than the initiator.
CoordinationReferent getReferent() {
return referent;
}
synchronized void setTimerTask(TimerTask timerTask) {
this.timerTask = timerTask;
deadline = new Date(System.currentTimeMillis() + totalTimeout);
coordinator.schedule(timerTask, deadline);
}
synchronized void setThreadAndEnclosingCoordination(Thread t, CoordinationImpl c) {
thread = t;
enclosingCoordination = c;
}
private void checkTerminated() throws CoordinationException {
// If this coordination is not terminated, simply return.
if (!terminated)
return;
// The coordination has terminated. Figure out which type of exception
// must be thrown.
if (failure != null) {
// The fail() method was called indicating the coordination failed.
throw new CoordinationException(NLS.bind(Messages.CoordinationFailed, name, id), referent, CoordinationException.FAILED, failure);
}
// The coordination did not fail, so it either partially ended or
// ended successfully.
throw new CoordinationException(NLS.bind(Messages.CoordinationEnded, name, id), referent, CoordinationException.ALREADY_ENDED);
}
private void terminate() throws CoordinationException {
checkTerminated();
terminated = true;
// Cancel the timeout. Purge the task if it was, in fact, canceled.
if (timerTask != null && timerTask.cancel()) {
coordinator.purge();
}
coordinator.terminate(this, participants);
}
private static void validateName(String name) {
boolean valid = true;
if (name == null || name.length() == 0)
valid = false;
else {
boolean period = false;
for (char c : name.toCharArray()) {
if (Character.isLetterOrDigit(c) || c == '_' || c == '-') {
period = false;
} else if (c == '.' && !period) {
period = true;
} else {
valid = false;
break;
}
}
}
if (!valid)
throw new IllegalArgumentException(NLS.bind(Messages.InvalidCoordinationName, name));
}
private static void validateTimeout(long timeout) {
if (timeout < 0)
throw new IllegalArgumentException(NLS.bind(Messages.InvalidTimeInterval, timeout));
}
}