More refactoring to EolExecutorService and related classes. TODO: find a way to support executor re-use(?)
diff --git a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/ConcurrentExecutionStatus.java b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/ConcurrentExecutionStatus.java
index 529a7c0..6cb4a6d 100644
--- a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/ConcurrentExecutionStatus.java
+++ b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/ConcurrentExecutionStatus.java
@@ -26,13 +26,13 @@
return exception;
}
- public abstract Object getResult(Object lockObj);
+ protected abstract Object getResult(Object lockObj);
- public Object getResult() {
+ public final Object getResult() {
return getResult(this);
}
- public boolean register() {
+ public final boolean register() {
return register(this);
}
@@ -41,25 +41,25 @@
* @param lockObj
* @return <code>true</code> if registration was successful.
*/
- public abstract boolean register(Object lockObj);
+ protected abstract boolean register(Object lockObj);
- public boolean isInProgress() {
+ public final boolean isInProgress() {
return isInProgress(this);
}
- public abstract boolean isInProgress(Object lockObj);
+ protected abstract boolean isInProgress(Object lockObj);
- public void completeSuccessfully() {
+ public final void completeSuccessfully() {
completeSuccessfully(this);
}
- public abstract void completeSuccessfully(Object lockObj);
+ protected abstract void completeSuccessfully(Object lockObj);
- public void completeWithResult(Object result) {
+ public final void completeWithResult(Object result) {
completeWithResult(this, result);
}
- public abstract void completeWithResult(Object lockObj, Object result);
+ protected abstract void completeWithResult(Object lockObj, Object result);
protected final boolean completeExceptionallyBase(Throwable exception) {
boolean firstFail = !failed;
@@ -89,13 +89,17 @@
*
* @return Whether the completion was successful (<code>true</code>) or exceptional (<code>false</code>).
*/
- public abstract boolean waitForCompletion(Object lockObj, Supplier<Boolean> targetState);
+ protected abstract boolean waitForCompletion(Object lockObj, Supplier<Boolean> targetState);
- public boolean waitForCompletion(Object lockObj) {
+ public final boolean waitForCompletion(Supplier<Boolean> targetState) {
+ return waitForCompletion(this, targetState);
+ }
+
+ protected final boolean waitForCompletion(Object lockObj) {
return waitForCompletion(lockObj, null);
}
- public boolean waitForCompletion() {
- return waitForCompletion(this);
+ public final boolean waitForCompletion() {
+ return waitForCompletion((Object) this);
}
}
diff --git a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/MultiConcurrentExecutionStatus.java b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/MultiConcurrentExecutionStatus.java
index ec9dfab..0d2afe0 100644
--- a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/MultiConcurrentExecutionStatus.java
+++ b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/MultiConcurrentExecutionStatus.java
@@ -102,9 +102,4 @@
}
return !failed;
}
-
- @Override
- public boolean waitForCompletion(Object lockObj) {
- return waitForCompletion(lockObj, null);
- }
}
\ No newline at end of file
diff --git a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/SingleConcurrentExecutionStatus.java b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/SingleConcurrentExecutionStatus.java
index 17e9d61..31efb14 100644
--- a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/SingleConcurrentExecutionStatus.java
+++ b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/concurrent/SingleConcurrentExecutionStatus.java
@@ -29,14 +29,16 @@
private Object currentLock;
@Override
- public Object getResult(Object lockObj) {
+ protected Object getResult(Object lockObj) {
return result;
}
@Override
- public boolean register(Object lockObj) {
+ protected boolean register(Object lockObj) {
if (registerAvailable) {
assert !inProgress;
+ exception = null;
+ result = null;
inProgress = true;
registerAvailable = false;
return true;
@@ -48,11 +50,11 @@
}
@Override
- public boolean isInProgress(Object lockObj) {
+ protected boolean isInProgress(Object lockObj) {
return inProgress;
}
- private void complete(Object lockObj) {
+ void complete(Object lockObj) {
inProgress = false;
if (lockObj != null) synchronized (lockObj) {
lockObj.notifyAll();
@@ -65,12 +67,12 @@
}
@Override
- public void completeSuccessfully(Object lockObj) {
+ protected void completeSuccessfully(Object lockObj) {
complete(lockObj);
}
@Override
- public void completeWithResult(Object lockObj, Object result) {
+ protected void completeWithResult(Object lockObj, Object result) {
this.result = result;
completeSuccessfully(lockObj);
}
@@ -87,7 +89,7 @@
* @return Whether the completion was successful (<code>true</code>) or exceptional (<code>false</code>).
*/
@Override
- public boolean waitForCompletion(final Object lockObj, final Supplier<Boolean> targetState) {
+ protected boolean waitForCompletion(final Object lockObj, final Supplier<Boolean> targetState) {
assert lockObj != null;
currentLock = lockObj;
while (isInProgress(lockObj) && (targetState == null || !targetState.get())) synchronized (lockObj) {
diff --git a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/function/ExceptionHandler.java b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/function/ExceptionHandler.java
index 7ff79c6..a496fce 100644
--- a/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/function/ExceptionHandler.java
+++ b/plugins/org.eclipse.epsilon.common/src/org/eclipse/epsilon/common/function/ExceptionHandler.java
@@ -9,13 +9,20 @@
**********************************************************************/
package org.eclipse.epsilon.common.function;
+import java.util.function.Consumer;
+
/**
*
* @author Sina Madani
* @since 1.6
*/
@FunctionalInterface
-public interface ExceptionHandler<E extends Exception> {
+public interface ExceptionHandler<E extends Exception> extends Consumer<E> {
+
+ @Override
+ default void accept(E ex) {
+ handleException(ex);
+ }
public void handleException(E ex);
diff --git a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/EolThreadFactory.java b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/EolThreadFactory.java
index 2af17da..0db3ac7 100644
--- a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/EolThreadFactory.java
+++ b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/EolThreadFactory.java
@@ -11,8 +11,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.eclipse.epsilon.common.concurrent.ConcurrentExecutionStatus;
+import java.util.function.Consumer;
/**
*
@@ -21,11 +20,11 @@
*/
public class EolThreadFactory implements ThreadFactory {
- protected final AtomicInteger threadCount = new AtomicInteger();
+ protected int threadCount;
protected final int maxThreads;
protected final String namePrefix;
- protected final ConcurrentExecutionStatus executionStatus;
- protected final UncaughtExceptionHandler exceptionHandler = new UncaughtExceptionHandler() {
+ protected final Consumer<Exception> executorExceptionHandler;
+ protected final UncaughtExceptionHandler uncaughtHandler = new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
Exception exception;
@@ -36,7 +35,7 @@
exception = new RuntimeException(e.getClass().getSimpleName()+" in thread "+t.getName(), e);
}
- executionStatus.completeExceptionally(exception);
+ executorExceptionHandler.accept(exception);
}
};
@@ -44,32 +43,32 @@
this(null);
}
- public EolThreadFactory(ConcurrentExecutionStatus status) {
- this(status, Integer.MAX_VALUE);
+ public EolThreadFactory(Consumer<Exception> exceptionHandler) {
+ this(exceptionHandler, Integer.MAX_VALUE);
}
- public EolThreadFactory(ConcurrentExecutionStatus status, int threadLimit) {
- this(status, threadLimit, null);
+ public EolThreadFactory(Consumer<Exception> exceptionHandler, int threadLimit) {
+ this(exceptionHandler, threadLimit, null);
}
- protected EolThreadFactory(ConcurrentExecutionStatus status, int threadLimit, String threadNamePrefix) {
+ protected EolThreadFactory(Consumer<Exception> exceptionHandler, int threadLimit, String threadNamePrefix) {
this.namePrefix = threadNamePrefix != null ? threadNamePrefix : "EOL-Worker";
- this.executionStatus = status;
+ this.executorExceptionHandler = exceptionHandler;
this.maxThreads = threadLimit;
}
protected <T extends Thread> T setThreadProperties(T thread) {
- thread.setName(namePrefix+(threadCount.incrementAndGet()));
+ thread.setName(namePrefix + threadCount);
thread.setDaemon(true);
- if (executionStatus != null) {
- thread.setUncaughtExceptionHandler(exceptionHandler);
+ if (executorExceptionHandler != null) {
+ thread.setUncaughtExceptionHandler(uncaughtHandler);
}
return thread;
}
@Override
public Thread newThread(Runnable target) {
- if (threadCount.get() > maxThreads) {
+ if (++threadCount > maxThreads) {
throw new IllegalStateException("Exceeded maximum number of threads: "+maxThreads);
}
return setThreadProperties(new Thread(target));
diff --git a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolExecutorService.java b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolExecutorService.java
index 66eaade..5a2e2b2 100644
--- a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolExecutorService.java
+++ b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolExecutorService.java
@@ -16,7 +16,6 @@
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.eclipse.epsilon.common.concurrent.ConcurrentExecutionStatus;
-import org.eclipse.epsilon.common.concurrent.SingleConcurrentExecutionStatus;
import org.eclipse.epsilon.eol.exceptions.EolRuntimeException;
/**
@@ -30,13 +29,21 @@
public interface EolExecutorService extends ExecutorService {
/**
- * This method should return an immutable, re-usable and non-null {@link ConcurrentExecutionStatus}.
+ * This method should return a non-null {@link ConcurrentExecutionStatus} representing the
+ * current job in progress.
*
* @return A re-usable status object used to interrupt short-circuiting jobs and
* handling exceptions.
*/
ConcurrentExecutionStatus getExecutionStatus();
+ default void handleException(Exception exception) {
+ if (exception instanceof EolRuntimeException) {
+ exception.getMessage();
+ }
+ getExecutionStatus().completeExceptionally(exception);
+ }
+
/**
* Shuts down this ExecutorService and waits for all jobs to complete.
*
@@ -59,68 +66,85 @@
*/
default <R> List<R> collectResults(Collection<Future<R>> futures) throws EolRuntimeException {
final boolean keepAlive = futures != null;
- if (keepAlive && futures.isEmpty())
- return Collections.emptyList();
-
final ConcurrentExecutionStatus status = getExecutionStatus();
- Throwable statusException = status.getException();
- if (statusException != null) EolRuntimeException.propagateDetailed(statusException);
- if (!status.isInProgress()) status.register();
-
- final List<R> results = keepAlive ? new ArrayList<>(futures.size()) : null;
-
- final Thread blockingThread = Thread.currentThread(),
- compWait = new Thread(() -> {
- try {
- if (keepAlive) for (Future<R> future : futures) {
- if (status.isInProgress()) {
- results.add(future.get());
- }
- else return;
- }
- else {
- shutdown();
- if (!awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
- throw new IllegalStateException("Infinite wait on termination!");
- }
- }
- status.completeSuccessfully();
+ try {
+ Throwable statusException = status.getException();
+ if (statusException != null) {
+ EolRuntimeException.propagateDetailed(statusException);
}
- catch (ExecutionException ex) {
- status.completeExceptionally(ex.getCause());
- }
- catch (Exception ex) {
+ else if (keepAlive && futures.isEmpty()) {
if (status.isInProgress()) {
- status.completeExceptionally(ex);
+ status.completeSuccessfully();
+ }
+ return Collections.emptyList();
+ }
+ else if (!status.isInProgress()) {
+ status.register();
+ }
+
+ final List<R> results = keepAlive ? new ArrayList<>(futures.size()) : null;
+
+ final Thread blockingThread = Thread.currentThread(),
+ compWait = new Thread(() -> {
+ try {
+ if (keepAlive) for (Future<R> future : futures) {
+ if (status.isInProgress()) {
+ results.add(future.get());
+ }
+ else {
+ assert status.getException() != null;
+ return;
+ }
+ }
+ else {
+ shutdown();
+ if (!awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
+ throw new IllegalStateException("Infinite wait on termination!");
+ }
+ }
+ status.completeSuccessfully();
+ }
+ catch (ExecutionException ex) {
+ status.completeExceptionally(ex.getCause());
+ }
+ catch (Exception ex) {
+ if (status.getException() == null || status.isInProgress()) {
+ status.completeExceptionally(ex);
+ }
+ }
+ finally {
+ if (status.isInProgress() && status.getException() == null) {
+ status.completeSuccessfully();
+ }
+ if (blockingThread.getState() == Thread.State.WAITING) {
+ blockingThread.interrupt();
+ }
+ assert !status.isInProgress();
+ }
+ });
+ compWait.setName(getClass().getSimpleName()+"-AwaitCompletion");
+ compWait.start();
+
+ try {
+ if (!status.waitForCompletion()) {
+ statusException = status.getException();
+ compWait.interrupt();
+ shutdownNow();
+ EolRuntimeException.propagateDetailed(statusException);
}
}
finally {
- if (blockingThread.getState() == Thread.State.WAITING) {
- blockingThread.interrupt();
+ if (compWait.isAlive()) try {
+ compWait.join();
}
- assert !status.isInProgress();
+ catch (InterruptedException ie) {}
}
- });
- compWait.setName(getClass().getSimpleName()+"-AwaitCompletion");
- compWait.start();
-
- try {
- if (!status.waitForCompletion()) {
- statusException = status.getException();
- compWait.interrupt();
- shutdownNow();
- EolRuntimeException.propagateDetailed(statusException);
- }
+
+ return results;
}
finally {
- if (compWait.isAlive()) try {
- compWait.join();
- }
- catch (InterruptedException ie) {}
assert !status.isInProgress();
}
-
- return results;
}
/**
@@ -134,62 +158,78 @@
* was called whilst waiting.
*/
default Object shortCircuitCompletion(Collection<? extends Future<?>> jobs) throws EolRuntimeException {
- if (jobs == null || jobs.isEmpty()) return null;
-
final ConcurrentExecutionStatus status = getExecutionStatus();
- final Thread blockingThread = Thread.currentThread(),
- compWait = new Thread(() -> {
- try {
- for (Future<?> future : jobs) {
- if (status.isInProgress()) {
- future.get();
- }
- else return;
- }
- status.completeSuccessfully();
+ try {
+ Throwable statusException = status.getException();
+ if (statusException != null) {
+ EolRuntimeException.propagateDetailed(statusException);
}
- catch (ExecutionException ex) {
- status.completeExceptionally(ex.getCause());
- }
- catch (Exception ex) {
+ else if (jobs == null || jobs.isEmpty()) {
if (status.isInProgress()) {
- status.completeExceptionally(ex);
+ status.completeSuccessfully();
+ }
+ return status.getResult();
+ }
+
+ final Thread blockingThread = Thread.currentThread(),
+ compWait = new Thread(() -> {
+ try {
+ for (Future<?> future : jobs) {
+ if (status.isInProgress()) {
+ future.get();
+ }
+ else return;
+ }
+ status.completeSuccessfully();
+ }
+ catch (ExecutionException ex) {
+ status.completeExceptionally(ex.getCause());
+ }
+ catch (Exception ex) {
+ if (status.isInProgress()) {
+ status.completeExceptionally(ex);
+ }
+ }
+ finally {
+ if (status.isInProgress() && status.getException() == null) {
+ status.completeSuccessfully();
+ }
+ if (blockingThread.getState() == Thread.State.WAITING) {
+ blockingThread.interrupt();
+ }
+ assert !status.isInProgress();
+ }
+ });
+ compWait.setName(getClass().getSimpleName()+"-AwaitCompletion");
+ compWait.start();
+
+ try {
+ boolean success = status.waitForCompletion();
+ compWait.interrupt();
+
+ if (!success) {
+ shutdownNow();
+ EolRuntimeException.propagateDetailed(status.getException());
+ }
+ else {
+ // This is to avoid unnecessary waiting for completion
+ for (Future<?> future : jobs) {
+ future.cancel(true);
+ }
}
}
finally {
- if (blockingThread.getState() == Thread.State.WAITING) {
- blockingThread.interrupt();
+ if (compWait.isAlive()) try {
+ compWait.join();
}
- assert !status.isInProgress();
+ catch (InterruptedException ie) {}
}
- });
- compWait.setName(getClass().getSimpleName()+"-AwaitCompletion");
- compWait.start();
-
- try {
- boolean success = status.waitForCompletion();
- compWait.interrupt();
- if (!success) {
- shutdownNow();
- EolRuntimeException.propagateDetailed(status.getException());
- }
- else {
- // This is to avoid unnecessary waiting for completion
- for (Future<?> future : jobs) {
- future.cancel(true);
- }
- }
+ return status.getResult();
}
finally {
- if (compWait.isAlive()) try {
- compWait.join();
- }
- catch (InterruptedException ie) {}
assert !status.isInProgress();
}
-
- return status.getResult();
}
/**
diff --git a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolForkJoinExecutor.java b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolForkJoinExecutor.java
deleted file mode 100644
index 2ffd789..0000000
--- a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolForkJoinExecutor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*********************************************************************
- * Copyright (c) 2018 The University of York.
- *
- * This program and the accompanying materials are made
- * available under the terms of the Eclipse Public License 2.0
- * which is available at https://www.eclipse.org/legal/epl-2.0/
- *
- * SPDX-License-Identifier: EPL-2.0
-**********************************************************************/
-package org.eclipse.epsilon.eol.execute.concurrent.executors;
-
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinWorkerThread;
-import org.eclipse.epsilon.common.concurrent.ConcurrentExecutionStatus;
-import org.eclipse.epsilon.common.concurrent.SingleConcurrentExecutionStatus;
-import org.eclipse.epsilon.eol.execute.concurrent.EolThreadFactory;
-import org.eclipse.epsilon.eol.execute.concurrent.executors.EolExecutorService;
-
-/**
- *
- * @author Sina Madani
- * @since 1.6
- */
-public class EolForkJoinExecutor extends ForkJoinPool implements EolExecutorService {
-
- static class EolForkJoinThreadFactory extends EolThreadFactory implements ForkJoinWorkerThreadFactory {
-
- public EolForkJoinThreadFactory() {
- super(new SingleConcurrentExecutionStatus());
- }
-
- ConcurrentExecutionStatus getExecStatus() {
- return this.executionStatus;
- }
-
- @Override
- public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
- return setThreadProperties(new ForkJoinWorkerThread(pool){});
- }
- }
-
- public EolForkJoinExecutor() {
- this(Runtime.getRuntime().availableProcessors());
- }
-
- public EolForkJoinExecutor(int parallelism) {
- super(parallelism, new EolForkJoinThreadFactory(), null, false);
- }
-
- @Override
- public ConcurrentExecutionStatus getExecutionStatus() {
- return ((EolForkJoinThreadFactory) getFactory()).getExecStatus();
- }
-}
diff --git a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolThreadPoolExecutor.java b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolThreadPoolExecutor.java
index 5988bdd..11cd47f 100644
--- a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolThreadPoolExecutor.java
+++ b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/concurrent/executors/EolThreadPoolExecutor.java
@@ -10,7 +10,6 @@
package org.eclipse.epsilon.eol.execute.concurrent.executors;
import java.util.concurrent.*;
-import org.eclipse.epsilon.common.concurrent.ConcurrentExecutionStatus;
import org.eclipse.epsilon.common.concurrent.SingleConcurrentExecutionStatus;
import org.eclipse.epsilon.eol.execute.concurrent.EolThreadFactory;
import org.eclipse.epsilon.eol.execute.concurrent.executors.EolExecutorService;
@@ -29,7 +28,7 @@
DEFAULT_CORE_POOL_SIZE = 1,
DEFAULT_MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors();
- protected final ConcurrentExecutionStatus execStatus = new SingleConcurrentExecutionStatus();
+ protected final SingleConcurrentExecutionStatus execStatus = new SingleConcurrentExecutionStatus();
/**
* Unbounded thread pool size with fixed queue capacity.
@@ -71,11 +70,11 @@
*/
public EolThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, (r, e) -> {});
- setThreadFactory(new EolThreadFactory(getExecutionStatus(), maximumPoolSize));
+ setThreadFactory(new EolThreadFactory(this::handleException, maximumPoolSize));
}
@Override
- public ConcurrentExecutionStatus getExecutionStatus() {
+ public SingleConcurrentExecutionStatus getExecutionStatus() {
return execStatus;
}
diff --git a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/EolContextParallel.java b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/EolContextParallel.java
index 1470e7d..e3d70ba 100644
--- a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/EolContextParallel.java
+++ b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/EolContextParallel.java
@@ -22,6 +22,7 @@
import org.eclipse.epsilon.eol.execute.context.FrameStack;
import org.eclipse.epsilon.eol.execute.context.IEolContext;
import org.eclipse.epsilon.eol.execute.operations.contributors.OperationContributorRegistry;
+import org.eclipse.epsilon.eol.exceptions.EolRuntimeException;
import org.eclipse.epsilon.eol.exceptions.concurrent.EolNestedParallelismException;
import org.eclipse.epsilon.eol.execute.concurrent.DelegatePersistentThreadLocal;
import org.eclipse.epsilon.eol.execute.concurrent.PersistentThreadLocal;
@@ -196,7 +197,7 @@
}
@Override
- public Object endParallelTask() {
+ public Object endParallelTask() throws EolRuntimeException {
Object result = IEolContextParallel.super.endParallelTask();
shutdownExecutor();
return result;
diff --git a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/IEolContextParallel.java b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/IEolContextParallel.java
index 14f43ec..3b310eb 100644
--- a/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/IEolContextParallel.java
+++ b/plugins/org.eclipse.epsilon.eol.engine/src/org/eclipse/epsilon/eol/execute/context/concurrent/IEolContextParallel.java
@@ -104,7 +104,7 @@
if (!isParallelisationLegal()) throw new EolNestedParallelismException(entryPoint);
EolExecutorService executor = getExecutorService();
ConcurrentExecutionStatus status = executor != null ? executor.getExecutionStatus() : null;
- if (status != null && (entryPoint != null ? status.isInProgress(entryPoint) : status.isInProgress()))
+ if (status != null && status.isInProgress())
throw new EolNestedParallelismException(entryPoint);
}
@@ -128,10 +128,7 @@
exception.getMessage();
}
if (executor != null) {
- ConcurrentExecutionStatus status = executor.getExecutionStatus();
- if (status != null) {
- status.completeExceptionally(exception);
- }
+ executor.handleException(exception);
}
}
@@ -141,6 +138,8 @@
/**
* Registers the beginning of parallel task on the default EolExecutorService.
+ * The {@link #endParallelTask()} method must be called once finished.
+ *
* @param entryPoint The AST to associate with this task.
* @return {@link #getExecutorService()}
* @throws EolNestedParallelismException If there was already a parallel task in progress.
@@ -159,16 +158,27 @@
}
/**
+ * Must be called once parallel processing has finished.
*
+ * @see #beginParallelTask(ModuleElement)
* @return The result of the task, if any.
+ * @throws EolRuntimeException if the status completed exceptionally.
+ * @throws IllegalStateException if the current job is still executing.
*/
- default Object endParallelTask() {
+ default Object endParallelTask() throws EolRuntimeException {
EolExecutorService executor = getExecutorService();
if (executor != null) {
ConcurrentExecutionStatus status = executor.getExecutionStatus();
if (status != null) {
- status.completeSuccessfully();
- return status.getResult();
+ if (status.isInProgress()) {
+ throw new IllegalStateException("Attempted to end parallel task while execution is in progress!");
+ }
+ else if (status.waitForCompletion()) { // Note: this shouldn't actually wait!
+ return status.getResult();
+ }
+ else {
+ EolRuntimeException.propagateDetailed(status.getException());
+ }
}
}
return null;