Bug 570871
added multi-threaded service execution
Change-Id: I41cd1ed746cdab40feb60d8b374ad008d046ff32
Signed-off-by: Mahmoud Bazzal <mahmood1994ha@gmail.com>
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java
index 4c25288..f5574c2 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java
@@ -9,27 +9,18 @@
*
* Contributors:
* Robert Bosch GmbH - initial API and implementation
+ * Dortmund University of Applied Sciences and Arts - Bug 570871
********************************************************************************
*/
package org.eclipse.app4mc.cloud.manager;
-import java.io.File;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.http.HttpStatus;
import org.eclipse.app4mc.cloud.manager.ProcessLog.Action;
-import org.eclipse.app4mc.cloud.manager.administration.CloudServiceDefinition;
import org.eclipse.app4mc.cloud.manager.storage.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
-import org.springframework.util.StringUtils;
-
-import kong.unirest.HttpResponse;
-import kong.unirest.MultipartBody;
-import kong.unirest.Unirest;
public class CloudServiceExecutionRunnable implements Runnable {
@@ -39,6 +30,8 @@
private String uuid;
private String originalFilename;
private WorkflowStatus workflowStatus;
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
public CloudServiceExecutionRunnable(
StorageService storageService,
@@ -60,7 +53,15 @@
try {
Path inputFile = this.storageService.load(this.uuid, this.originalFilename);
- processServiceNode(this.workflowStatus.getServiceRootNode(), inputFile);
+ ServiceNodeProcessingTask rootTask = new ServiceNodeProcessingTask(this.workflowStatus.getServiceRootNode(),
+ inputFile, storageService, messagingTemplate, uuid, workflowStatus);
+
+ logger.info("started root task");
+ rootTask.fork();
+ logger.info("waiting for root task to finish");
+ // join the task to indicate the entire workflow is done
+ rootTask.join();
+ logger.info("finished root task");
} finally {
this.workflowStatus.done();
WorkflowStatusHelper.saveWorkflowStatus(
@@ -96,310 +97,4 @@
}
}
- private Path processServiceNode(ServiceNode serviceNode, Path inputFile) {
- Path nodeResult = inputFile;
-
- if (!serviceNode.isStructuralNode()) {
- nodeResult = executeCloudService(serviceNode, inputFile);
- }
-
- Path childInput = nodeResult;
- for (ServiceNode node : serviceNode.getChildren()) {
- // if an error occurred stop the workflow
- if (!this.workflowStatus.getErrors().isEmpty()) {
- break;
- }
-
- try {
- if (serviceNode.isStructuralNode()) {
- // inside a structural node the services are executed in sequential order,
- // passing the result of the previous service to the next one
-
- // TODO if the structural node is not the root node, open a new thread for the execution
-
- childInput = processServiceNode(node, childInput);
- } else {
- // if the node is a service node, all child services should get the result of
- // the parent service
-
- // TODO if serviceNode.getChildren().size() > 1 open new thread for each child
-
- processServiceNode(node, childInput);
- }
- } catch (ProcessingFailedException e) {
- this.workflowStatus.addError(e.getMessage());
- }
-
- if (this.workflowStatus.isCancelled()) {
- addMessage(this.workflowStatus, "Workflow cancelled by user");
- break;
- }
- }
-
- return nodeResult;
- }
-
- private Path executeCloudService(ServiceNode node, Path inputFile) {
- try {
- CloudServiceDefinition csd = node.getService();
- ServiceConfiguration config = node.getServiceConfiguration();
- String serviceName = csd.getName();
- String baseUrl = csd.getBaseUrl();
-
- // upload to service
- MultipartBody multipartBody = Unirest.post(baseUrl)
- .field("file", Files.newInputStream(inputFile), inputFile.getFileName().toString());
-
- if (config != null) {
- config.getParameterList().forEach(param -> {
- if (!StringUtils.isEmpty(param.getValue()) && !param.isManagerParameter()) {
- if ("multiple".equals(param.getCardinality())) {
- multipartBody.field(param.getKey(), Arrays.asList(param.getValue().split(",")));
- } else {
- multipartBody.field(param.getKey(), param.getValue());
- }
- }
- });
- }
-
- HttpResponse<?> uploadResponse = multipartBody.asEmpty();
-
- // extract status link from result
- String statusUrl = null;
- if (uploadResponse.getStatus() == 201) {
- statusUrl = uploadResponse.getHeaders().getFirst("Location");
- if (StringUtils.isEmpty(statusUrl)) {
- // fallback check for Link header if Location header is not set
- statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
- } else if (!HeaderHelper.isValid(statusUrl, baseUrl)) {
- throw new ProcessingFailedException(
- "The status URL from the Location header does not match with the service base URL '" + baseUrl + "': " + statusUrl);
- }
- } else if (uploadResponse.getStatus() == 200) {
- // fallback if return code is 200, then follow up needs to be placed in Link header
- statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
- } else {
- // error
- Object body = uploadResponse.getBody();
- if (body != null && !body.toString().isEmpty()) {
- workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - " + body +" - Workflow stopped!");
- } else {
- workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - Workflow stopped!");
- }
- return null;
- }
-
- addMessage(workflowStatus, "Upload to " + serviceName + " service succeeded");
-
- Path result = inputFile;
-
- if (workflowStatus.isCancelled()) {
- return null;
- }
-
- // trigger status URL until process is finished or error occured
- if (statusUrl != null) {
- HttpResponse<?> statusResponse = Unirest.get(statusUrl).asEmpty();
- List<String> linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
-
- long start = System.currentTimeMillis();
- long end = System.currentTimeMillis();
-
- long timeout = 60_000l;
- if (config != null) {
- ServiceConfigurationParameter timeoutParam = config.getParameter("timeout");
- if (timeoutParam != null) {
- try {
- timeout = Long.valueOf(timeoutParam.getValue());
- } catch (Exception e) {
- addMessage(workflowStatus, serviceName + " timeout value " + timeoutParam.getValue() + " invalid. Using default of 60_000");
- timeout = 60_000l;
- }
- }
- }
-
- while (linkHeaders.size() <= 1) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Restore interrupted state...
- Thread.currentThread().interrupt();
- }
-
- statusResponse = Unirest.get(statusUrl).asEmpty();
- linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
-
- end = System.currentTimeMillis();
-
- // don't request for more than configured timeout
- if (timeout > 0 && (end - start) > timeout) {
- addMessage(workflowStatus, serviceName + " status requested for " + (timeout/1000) + " seconds");
- break;
- }
-
- if (workflowStatus.isCancelled()) {
- // TODO trigger cancel at service?
- return null;
- }
-
- // send a processing message every 5 seconds to the client to indicate that the
- // service is still progressing
- if ((((end - start) / 1000) % 5) == 0) {
- this.messagingTemplate.convertAndSend(
- "/topic/process-updates/" + this.uuid,
- new ProcessLog(Action.PROCESSING, serviceName + " is processing ..."));
-
- }
- }
-
- // create the service result directory relative to the input file
- Path serviceSubDir = getServiceDir(node);
- Files.createDirectories(serviceSubDir);
-
- String servicePath = storageService.load(workflowStatus.getUuid()).relativize(serviceSubDir).toString();
- // replace backslashes with forward slashes in case the service is executed on Windows
- servicePath = servicePath.replaceAll("\\\\", "/");
- if (!servicePath.isEmpty()) {
- servicePath += "/";
- }
-
- // first check if there is a result link
- List<String> resultUrls = HeaderHelper.getUrlsFromLink(linkHeaders, "result", baseUrl);
-
- String deleteUrl = null;
- if (!resultUrls.isEmpty()) {
-
- addMessage(workflowStatus, serviceName + " processing finished");
-
- boolean error = false;
- for (String resultUrl : resultUrls) {
-
- // download file
- HttpResponse<File> resultResponse = Unirest.get(resultUrl)
- .asFile(serviceSubDir.resolve("result").toString());
-
- Path migrationResult = resultResponse.getBody().toPath();
- String filename = HeaderHelper.getFilenameFromHeader(resultResponse.getHeaders().getFirst("Content-Disposition"));
- if (filename != null) {
- migrationResult = Files.move(migrationResult, serviceSubDir.resolve(filename));
- }
-
- String resultName = resultResponse.getHeaders().getFirst("x-app4mc-result-name");
- if (StringUtils.isEmpty(resultName)) {
- resultName = serviceName;
- }
-
- addMessage(workflowStatus, resultName + " result downloaded");
-
- workflowStatus.addResult(
- resultName + " Result",
- servicePath + migrationResult.getFileName().toString());
-
- // extract header information if result should be used in workflow
- String useResult = resultResponse.getHeaders().getFirst("x-app4mc-use-result");
- if (useResult == null || !useResult.toLowerCase().equals("false")) {
- // the result should be used in the workflow
- result = migrationResult;
- }
-
- if (resultResponse.getStatus() != HttpStatus.SC_OK) {
- error = true;
- }
-
- if (deleteUrl == null) {
- // extract delete
- deleteUrl = HeaderHelper.getUrlFromLink(resultResponse.getHeaders().get("Link"), "delete", baseUrl);
- }
- }
-
- if (error) {
- workflowStatus.addError(serviceName + " failed with errors");
- } else {
- addMessage(workflowStatus, serviceName + " successfull");
- }
- } else {
- String errorUrl = HeaderHelper.getUrlFromLink(linkHeaders, "error", baseUrl);
- if (errorUrl != null) {
- workflowStatus.addError(serviceName + " processing finished with error");
-
- // download error file
- HttpResponse<File> errorResponse = Unirest.get(errorUrl)
- .asFile(serviceSubDir.resolve("error").toString());
-
- Path migrationError = errorResponse.getBody().toPath();
- String filename = HeaderHelper.getFilenameFromHeader(errorResponse.getHeaders().getFirst("Content-Disposition"));
- if (filename != null) {
- migrationError = Files.move(migrationError, serviceSubDir.resolve(filename));
- }
-
- addMessage(workflowStatus, serviceName + " error result downloaded");
-
- workflowStatus.addResult(
- serviceName + " Error",
- servicePath + migrationError.getFileName().toString());
-
- // extract delete
- deleteUrl = HeaderHelper.getUrlFromLink(errorResponse.getHeaders().get("Link"), "delete", baseUrl);
- } else {
- workflowStatus.addError(serviceName + " has no result and no error");
- }
- }
-
- boolean deleteResult = true;
- if (config != null) {
- ServiceConfigurationParameter deleteParam = config.getParameter("deleteResult");
- if (deleteParam != null) {
- deleteResult = Boolean.valueOf(deleteParam.getValue());
- }
- }
-
- if (deleteUrl != null && deleteResult) {
- // delete upload again
- Unirest.delete(deleteUrl).asEmpty();
- addMessage(workflowStatus, serviceName + " cleaned up");
- }
-
- } else {
- // no status URL in the upload response, stop further processing
- workflowStatus.addMessage("No status URL found for service " + serviceName);
- return inputFile;
- }
-
- return result;
-
- } catch (Exception e) {
- throw new ProcessingFailedException("Error in " + node.getService().getName() + " workflow: " + e.getMessage(), e);
- }
- }
-
- private Path getServiceDir(ServiceNode node) {
- Path path = storageService.load(workflowStatus.getUuid());
- ServiceNode sn = node;
- ArrayList<String> segments = new ArrayList<>();
- segments.add(getServiceNodeDirName(sn));
- while (sn.getParentNode() != null && !sn.getParentNode().getId().equals(WorkflowStatus.ROOT_NODE_ID)) {
- sn = sn.getParentNode();
- segments.add(0, getServiceNodeDirName(sn));
- }
-
- for (String segment : segments) {
- path = path.resolve(segment);
- }
-
- return path;
- }
-
- private String getServiceNodeDirName(ServiceNode node) {
- if (node.getService() != null) {
- return "_" + node.getService().getKey().toLowerCase();
- }
- return "_" + node.getId().toLowerCase();
- }
-
- private void addMessage(WorkflowStatus workflowStatus, String message) {
- workflowStatus.addMessage(message);
- this.messagingTemplate.convertAndSend(
- "/topic/process-updates/" + this.uuid,
- new ProcessLog(Action.UPDATE, message));
- }
}
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java
index 3f25fb6..36e6dc8 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java
@@ -30,6 +30,7 @@
private ServiceNode parentNode;
+ private boolean failed = false;
/**
* Create a structural node like for example the root node.
*/
@@ -193,4 +194,12 @@
public String toString() {
return this.id;
}
+
+ public void markFailed() {
+ this.failed = true;
+ }
+
+ public boolean isFailed() {
+ return this.failed;
+ }
}
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNodeProcessingTask.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNodeProcessingTask.java
new file mode 100644
index 0000000..92b9b4b
--- /dev/null
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNodeProcessingTask.java
@@ -0,0 +1,429 @@
+/*********************************************************************************
+ * Copyright (c) 2020, 2021 Robert Bosch GmbH and others.
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ * Robert Bosch GmbH - initial API and implementation
+ * Dortmund University of Applied Sciences and Arts - Bug 570871
+ ********************************************************************************
+ */
+package org.eclipse.app4mc.cloud.manager;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.http.HttpStatus;
+import org.eclipse.app4mc.cloud.manager.ProcessLog.Action;
+import org.eclipse.app4mc.cloud.manager.administration.CloudServiceDefinition;
+import org.eclipse.app4mc.cloud.manager.storage.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.simp.SimpMessageSendingOperations;
+import org.springframework.util.StringUtils;
+
+import kong.unirest.HttpResponse;
+import kong.unirest.MultipartBody;
+import kong.unirest.Unirest;
+
+public class ServiceNodeProcessingTask extends RecursiveTask<Path> {
+
+ private static final long serialVersionUID = 8286285942428911612L;
+
+ private final String uuid;
+ private final ServiceNode serviceNode;
+ private final Path inputFile;
+
+ private final StorageService storageService;
+ private final SimpMessageSendingOperations messagingTemplate;
+ private WorkflowStatus workflowStatus;
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ public ServiceNodeProcessingTask(ServiceNode n, Path inputFile, StorageService storageService,
+ SimpMessageSendingOperations messagingTemplate, String uuid, WorkflowStatus ws) {
+ this.storageService = storageService;
+ this.messagingTemplate = messagingTemplate;
+ this.workflowStatus = ws;
+ this.serviceNode = n;
+ this.inputFile = inputFile;
+ this.uuid = uuid;
+ }
+
+ @Override
+ protected Path compute() {
+ try {
+ if (this.serviceNode.isStructuralNode()) {
+ logger.info("created structural node thread: {}", this.serviceNode.getId());
+ Path retval = processStructuralNode(this.serviceNode, this.inputFile);
+ return retval;
+ } else {
+ logger.info("created branch node thread: {}", this.serviceNode.getId());
+ Path retval = processNonStructuralNode(this.serviceNode, this.inputFile);
+ if (this.workflowStatus.isCancelled()) {
+ addMessage(this.workflowStatus, "Workflow cancelled by user");
+ }
+ return retval;
+ }
+ } catch (ProcessingFailedException e) {
+ this.workflowStatus.addError(e.getMessage());
+ return null;
+ }
+
+ }
+
+ private Path processStructuralNode(ServiceNode node, Path inputFile) {
+ Path nextInput = inputFile;
+ for (ServiceNode n : node.getChildren()) {
+ try {
+ nextInput = processNonStructuralNode(n, nextInput);
+ // if an error occurs without throwing an exception, successor tasks do not get executed
+ if (n.isFailed()) {
+ break;
+ }
+ if (this.workflowStatus.isCancelled()) {
+ addMessage(this.workflowStatus, "Workflow cancelled by user");
+ break;
+ }
+ } catch (ProcessingFailedException e) {
+ this.workflowStatus.addError(e.getMessage());
+ break;
+ }
+ }
+ return nextInput;
+ }
+
+ private Path processNonStructuralNode(ServiceNode node, Path inputFile) {
+ if (node.getChildren().isEmpty()) {
+ return processLeafNode(node, inputFile);
+ } else if (node.getChildren().size() > 1) {
+ try {
+ Path currOutput = executeCloudService(node, inputFile);
+ // if an error occurs without throwing an exception, successor tasks do not get executed
+ if (node.isFailed()) {
+ return null;
+ }
+ List<ServiceNodeProcessingTask> lActiveProcessingTask = new ArrayList<>();
+ for (ServiceNode n : node.getChildren()) {
+ lActiveProcessingTask.add(new ServiceNodeProcessingTask(n, currOutput, storageService,
+ messagingTemplate, uuid, workflowStatus));
+ }
+ logger.info("launching new branches");
+ ForkJoinTask.invokeAll(lActiveProcessingTask);
+ return currOutput;
+ } catch (ProcessingFailedException e) {
+ this.workflowStatus.addError(e.getMessage());
+ return null;
+ }
+ } else {
+ try {
+ Path currOutput = executeCloudService(node, inputFile);
+ // if an error occurs without throwing an exception, successor tasks do not get executed
+ if (node.isFailed()) {
+ return null;
+ }
+ ServiceNode nextNode = node.getChildren().get(0);
+ if (nextNode.isStructuralNode()) {
+ ServiceNodeProcessingTask nextNodeTask = new ServiceNodeProcessingTask(nextNode, currOutput,
+ storageService, messagingTemplate, uuid, workflowStatus);
+ ForkJoinPool.commonPool().invoke(nextNodeTask);
+ } else {
+ processNonStructuralNode(nextNode, currOutput);
+ }
+ return currOutput;
+ } catch (ProcessingFailedException e) {
+ this.workflowStatus.addError(e.getMessage());
+ return null;
+ }
+ }
+ }
+
+ private Path processLeafNode(ServiceNode node, Path inputFile) {
+ return executeCloudService(node, inputFile);
+ }
+
+ private Path executeCloudService(ServiceNode node, Path inputFile) {
+ try {
+ CloudServiceDefinition csd = node.getService();
+ ServiceConfiguration config = node.getServiceConfiguration();
+ String serviceName = csd.getName();
+ String baseUrl = csd.getBaseUrl();
+
+ // upload to service
+ MultipartBody multipartBody = Unirest.post(baseUrl)
+ .field("file", Files.newInputStream(inputFile), inputFile.getFileName().toString());
+
+ if (config != null) {
+ config.getParameterList().forEach(param -> {
+ if (!StringUtils.isEmpty(param.getValue()) && !param.isManagerParameter()) {
+ if ("multiple".equals(param.getCardinality())) {
+ multipartBody.field(param.getKey(), Arrays.asList(param.getValue().split(",")));
+ } else {
+ multipartBody.field(param.getKey(), param.getValue());
+ }
+ }
+ });
+ }
+
+ HttpResponse<?> uploadResponse = multipartBody.asEmpty();
+
+ // extract status link from result
+ String statusUrl = null;
+ if (uploadResponse.getStatus() == 201) {
+ statusUrl = uploadResponse.getHeaders().getFirst("Location");
+ if (StringUtils.isEmpty(statusUrl)) {
+ // fallback check for Link header if Location header is not set
+ statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
+ } else if (!HeaderHelper.isValid(statusUrl, baseUrl)) {
+ throw new ProcessingFailedException(
+ "The status URL from the Location header does not match with the service base URL '" + baseUrl + "': " + statusUrl);
+ }
+ } else if (uploadResponse.getStatus() == 200) {
+ // fallback if return code is 200, then follow up needs to be placed in Link header
+ statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
+ } else {
+ // error
+ Object body = uploadResponse.getBody();
+ if (body != null && !body.toString().isEmpty()) {
+ workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - " + body +" - Workflow stopped!");
+ node.markFailed();
+ } else {
+ workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - Workflow stopped!");
+ node.markFailed();
+ }
+ return null;
+ }
+
+ addMessage(workflowStatus, "Upload to " + serviceName + " service succeeded");
+
+ Path result = inputFile;
+
+ if (workflowStatus.isCancelled()) {
+ return null;
+ }
+
+ // trigger status URL until process is finished or error occured
+ if (statusUrl != null) {
+ HttpResponse<?> statusResponse = Unirest.get(statusUrl).asEmpty();
+ List<String> linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
+
+ long start = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
+
+ long timeout = 60_000l;
+ if (config != null) {
+ ServiceConfigurationParameter timeoutParam = config.getParameter("timeout");
+ if (timeoutParam != null) {
+ try {
+ timeout = Long.valueOf(timeoutParam.getValue());
+ } catch (Exception e) {
+ addMessage(workflowStatus, serviceName + " timeout value " + timeoutParam.getValue() + " invalid. Using default of 60_000");
+ timeout = 60_000l;
+ }
+ }
+ }
+
+ while (linkHeaders.size() <= 1) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ }
+
+ statusResponse = Unirest.get(statusUrl).asEmpty();
+ linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
+
+ end = System.currentTimeMillis();
+
+ // don't request for more than configured timeout
+ if (timeout > 0 && (end - start) > timeout) {
+ addMessage(workflowStatus, serviceName + " status requested for " + (timeout/1000) + " seconds");
+ break;
+ }
+
+ if (workflowStatus.isCancelled()) {
+ // TODO trigger cancel at service?
+ return null;
+ }
+
+ // send a processing message every 5 seconds to the client to indicate that the
+ // service is still progressing
+ if ((((end - start) / 1000) % 5) == 0) {
+ synchronized (this.messagingTemplate) {
+ this.messagingTemplate.convertAndSend(
+ "/topic/process-updates/" + this.uuid,
+ new ProcessLog(Action.PROCESSING, serviceName + " is processing ..."));
+
+ }
+ }
+ }
+
+ // create the service result directory relative to the input file
+ Path serviceSubDir = getServiceDir(node);
+ Files.createDirectories(serviceSubDir);
+
+ String servicePath = storageService.load(workflowStatus.getUuid()).relativize(serviceSubDir).toString();
+ // replace backslashes with forward slashes in case the service is executed on Windows
+ servicePath = servicePath.replaceAll("\\\\", "/");
+ if (!servicePath.isEmpty()) {
+ servicePath += "/";
+ }
+
+ // first check if there is a result link
+ List<String> resultUrls = HeaderHelper.getUrlsFromLink(linkHeaders, "result", baseUrl);
+
+ String deleteUrl = null;
+ if (!resultUrls.isEmpty()) {
+
+ addMessage(workflowStatus, serviceName + " processing finished");
+
+ boolean error = false;
+ for (String resultUrl : resultUrls) {
+
+ // download file
+ HttpResponse<File> resultResponse = Unirest.get(resultUrl)
+ .asFile(serviceSubDir.resolve("result").toString());
+
+ Path migrationResult = resultResponse.getBody().toPath();
+ String filename = HeaderHelper.getFilenameFromHeader(resultResponse.getHeaders().getFirst("Content-Disposition"));
+ if (filename != null) {
+ migrationResult = Files.move(migrationResult, serviceSubDir.resolve(filename));
+ }
+
+ String resultName = resultResponse.getHeaders().getFirst("x-app4mc-result-name");
+ if (StringUtils.isEmpty(resultName)) {
+ resultName = serviceName;
+ }
+
+ addMessage(workflowStatus, resultName + " result downloaded");
+
+ workflowStatus.addResult(
+ resultName + " Result",
+ servicePath + migrationResult.getFileName().toString());
+
+ // extract header information if result should be used in workflow
+ String useResult = resultResponse.getHeaders().getFirst("x-app4mc-use-result");
+ if (useResult == null || !useResult.toLowerCase().equals("false")) {
+ // the result should be used in the workflow
+ result = migrationResult;
+ }
+
+ if (resultResponse.getStatus() != HttpStatus.SC_OK) {
+ error = true;
+ }
+
+ if (deleteUrl == null) {
+ // extract delete
+ deleteUrl = HeaderHelper.getUrlFromLink(resultResponse.getHeaders().get("Link"), "delete", baseUrl);
+ }
+ }
+
+ if (error) {
+ workflowStatus.addError(serviceName + " failed with errors");
+ node.markFailed();
+ } else {
+ addMessage(workflowStatus, serviceName + " successfull");
+ }
+ } else {
+ String errorUrl = HeaderHelper.getUrlFromLink(linkHeaders, "error", baseUrl);
+ if (errorUrl != null) {
+ workflowStatus.addError(serviceName + " processing finished with error");
+ node.markFailed();
+
+ // download error file
+ HttpResponse<File> errorResponse = Unirest.get(errorUrl)
+ .asFile(serviceSubDir.resolve("error").toString());
+
+ Path migrationError = errorResponse.getBody().toPath();
+ String filename = HeaderHelper.getFilenameFromHeader(errorResponse.getHeaders().getFirst("Content-Disposition"));
+ if (filename != null) {
+ migrationError = Files.move(migrationError, serviceSubDir.resolve(filename));
+ }
+
+ addMessage(workflowStatus, serviceName + " error result downloaded");
+
+ workflowStatus.addResult(
+ serviceName + " Error",
+ servicePath + migrationError.getFileName().toString());
+
+ // extract delete
+ deleteUrl = HeaderHelper.getUrlFromLink(errorResponse.getHeaders().get("Link"), "delete", baseUrl);
+ } else {
+ workflowStatus.addError(serviceName + " has no result and no error");
+ node.markFailed();
+ }
+ }
+
+ boolean deleteResult = true;
+ if (config != null) {
+ ServiceConfigurationParameter deleteParam = config.getParameter("deleteResult");
+ if (deleteParam != null) {
+ deleteResult = Boolean.valueOf(deleteParam.getValue());
+ }
+ }
+
+ if (deleteUrl != null && deleteResult) {
+ // delete upload again
+ Unirest.delete(deleteUrl).asEmpty();
+ addMessage(workflowStatus, serviceName + " cleaned up");
+ }
+
+ } else {
+ // no status URL in the upload response, stop further processing
+ workflowStatus.addMessage("No status URL found for service " + serviceName);
+ return inputFile;
+ }
+
+ return result;
+
+ } catch (Exception e) {
+ throw new ProcessingFailedException("Error in " + node.getService().getName() + " workflow: " + e.getMessage(), e);
+ }
+ }
+
+ private Path getServiceDir(ServiceNode node) {
+ Path path = storageService.load(workflowStatus.getUuid());
+ ServiceNode sn = node;
+ ArrayList<String> segments = new ArrayList<>();
+ segments.add(getServiceNodeDirName(sn));
+ while (sn.getParentNode() != null && !sn.getParentNode().getId().equals(WorkflowStatus.ROOT_NODE_ID)) {
+ sn = sn.getParentNode();
+ segments.add(0, getServiceNodeDirName(sn));
+ }
+
+ for (String segment : segments) {
+ path = path.resolve(segment);
+ }
+
+ return path;
+ }
+
+ private String getServiceNodeDirName(ServiceNode node) {
+ if (node.getService() != null) {
+ return "_" + node.getService().getKey().toLowerCase();
+ }
+ return "_" + node.getId().toLowerCase();
+ }
+
+ private void addMessage(WorkflowStatus workflowStatus, String message) {
+ synchronized(this.messagingTemplate){
+ workflowStatus.addMessage(message);
+ this.messagingTemplate.convertAndSend(
+ "/topic/process-updates/" + this.uuid,
+ new ProcessLog(Action.UPDATE, message));
+ }
+ }
+
+}
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java
index d42e003..47d17fb 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java
@@ -9,6 +9,7 @@
*
* Contributors:
* Robert Bosch GmbH - initial API and implementation
+ * Dortmund University of Applied Sciences and Arts - Bug 570871
********************************************************************************
*/
package org.eclipse.app4mc.cloud.manager;
@@ -248,16 +249,24 @@
return node;
}
- public ArrayList<String> getMessages() {
- return messages;
+ public List<String> getMessages() {
+ return new ArrayList<>(this.messages);
+ }
+
+ public boolean hasMessages() {
+ return !this.messages.isEmpty();
}
public void addMessage(String message) {
this.messages.add(message);
}
- public ArrayList<String> getErrors() {
- return errors;
+ public List<String> getErrors() {
+ return new ArrayList<>(this.errors);
+ }
+
+ public boolean hasErrors() {
+ return !this.errors.isEmpty();
}
public void addError(String error) {
@@ -265,7 +274,11 @@
}
public HashMap<String, String> getResults() {
- return this.results;
+ return new LinkedHashMap<>(this.results);
+ }
+
+ public boolean hasResults() {
+ return !this.results.isEmpty();
}
public void addResult(String key, String resultFile) {
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java
index 2f487bc..553cc36 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java
@@ -65,16 +65,15 @@
gen.writeBooleanField("done", value.isDone());
}
- if (!value.getMessages().isEmpty()) {
+ if (value.hasMessages()) {
gen.writeObjectField("messages", value.getMessages());
}
- if (!value.getErrors().isEmpty()) {
+ if (value.hasErrors()) {
gen.writeObjectField("errors", value.getErrors());
}
- if (!value.getResults().isEmpty()) {
+ if (value.hasResults()) {
gen.writeObjectField("results", value.getResults());
}
-
gen.writeEndObject();
}
diff --git a/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java b/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java
index c9994d4..19dd243 100644
--- a/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java
+++ b/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java
@@ -9,6 +9,7 @@
*
* Contributors:
* Robert Bosch GmbH - initial API and implementation
+ * Dortmund University of Applied Sciences and Arts - Bug 570871
********************************************************************************
*/
package org.eclipse.app4mc.cloud.manager;
@@ -215,14 +216,14 @@
assertFalse(ws.isCancelled());
assertTrue(ws.isDone());
- ArrayList<String> messages = ws.getMessages();
+ List<String> messages = ws.getMessages();
assertNotNull(messages);
assertEquals(3, messages.size());
assertEquals("Message One", messages.get(0));
assertEquals("Message Two", messages.get(1));
assertEquals("Message Three", messages.get(2));
- ArrayList<String> errors = ws.getErrors();
+ List<String> errors = ws.getErrors();
assertNotNull(errors);
assertEquals(0, errors.size());