blob: 193818c554a09c5f174dea068b2871d80ca5f7c1 [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2021 the Eclipse BaSyx Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
******************************************************************************/
package org.eclipse.basyx.submodel.metamodel.map.submodelelement.operation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.poi.ss.formula.functions.T;
import org.eclipse.basyx.submodel.metamodel.api.submodelelement.operation.IAsyncInvocation;
/**
* Local implementation of IAsyncInvocation.
*
* @author conradi, espen
*
*/
public class AsyncInvocation implements IAsyncInvocation {
// Delayer for timeouts
private static ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(0);
private String operationId;
private CompletableFuture<Void> future;
private Object result;
private RuntimeException exception;
@SuppressWarnings("unchecked")
public AsyncInvocation(Operation operation, int timeout, Object... parameters) {
operationId = operation.getIdShort();
Function<Object[], Object> invokable = (Function<Object[], Object>) operation.get(Operation.INVOKABLE);
future = CompletableFuture.supplyAsync(
// Run Operation asynchronously
() -> invokable.apply(parameters))
// Accept either result or throw exception on timeout
.acceptEither(setTimeout(timeout),
// result accepted => write result (or timeout exception)
futureResult -> this.result = futureResult
).exceptionally(throwable -> {
// result not accepted? set operation state
if (throwable.getCause() instanceof OperationExecutionTimeoutException) {
exception = (RuntimeException) throwable.getCause();
} else {
// result not accepted? set operation state
exception = new OperationExecutionErrorException(
"Exception while executing Operation Operation '" + operationId + "'", throwable);
}
return null;
});
}
/**
* Function for scheduling a timeout function with completable futures
*/
private CompletableFuture<T> setTimeout(int timeout) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
delayer.schedule(
() -> timeoutFuture.completeExceptionally(
new OperationExecutionTimeoutException("Operation " + operationId + " timed out")),
timeout, TimeUnit.MILLISECONDS);
return timeoutFuture;
}
@Override
public Object getResult() {
try {
future.get();
} catch (Exception e) {
// Some RuntimeException occured when finishing the future
throw new OperationExecutionErrorException(
"Exception while executing Operation Operation '" + operationId + "'", e.getCause());
}
if (exception instanceof OperationExecutionTimeoutException
|| exception instanceof OperationExecutionErrorException) {
// Future finished with an exception
throw exception;
}
return result;
}
@Override
public boolean isFinished() {
return future.isDone();
}
public String getOperationId() {
return operationId;
}
}