Bug 570871

added multi-threaded service execution

Change-Id: I41cd1ed746cdab40feb60d8b374ad008d046ff32
Signed-off-by: Mahmoud Bazzal <mahmood1994ha@gmail.com>
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java
index 4c25288..f5574c2 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/CloudServiceExecutionRunnable.java
@@ -9,27 +9,18 @@
  *
  * Contributors:
  *     Robert Bosch GmbH - initial API and implementation
+ *     Dortmund University of Applied Sciences and Arts - Bug 570871
  ********************************************************************************
  */
 package org.eclipse.app4mc.cloud.manager;
 
-import java.io.File;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
-import org.apache.http.HttpStatus;
 import org.eclipse.app4mc.cloud.manager.ProcessLog.Action;
-import org.eclipse.app4mc.cloud.manager.administration.CloudServiceDefinition;
 import org.eclipse.app4mc.cloud.manager.storage.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.messaging.simp.SimpMessageSendingOperations;
-import org.springframework.util.StringUtils;
-
-import kong.unirest.HttpResponse;
-import kong.unirest.MultipartBody;
-import kong.unirest.Unirest;
 
 public class CloudServiceExecutionRunnable implements Runnable {
 
@@ -39,6 +30,8 @@
 	private String uuid;
 	private String originalFilename;
 	private WorkflowStatus workflowStatus;
+
+	private final Logger logger = LoggerFactory.getLogger(this.getClass());
 		
 	public CloudServiceExecutionRunnable(
 			StorageService storageService, 
@@ -60,7 +53,15 @@
 		try {
 			Path inputFile = this.storageService.load(this.uuid, this.originalFilename);
 			
-			processServiceNode(this.workflowStatus.getServiceRootNode(), inputFile);
+			ServiceNodeProcessingTask rootTask = new ServiceNodeProcessingTask(this.workflowStatus.getServiceRootNode(),
+					inputFile, storageService, messagingTemplate, uuid, workflowStatus);
+
+			logger.info("started root task");
+			rootTask.fork();
+			logger.info("waiting for root task to finish");
+			// join the task to indicate the entire workflow is done
+			rootTask.join();
+			logger.info("finished root task");
 		} finally {
 			this.workflowStatus.done();
 			WorkflowStatusHelper.saveWorkflowStatus(
@@ -96,310 +97,4 @@
 		}
 	}
 	
-	private Path processServiceNode(ServiceNode serviceNode, Path inputFile) {
-		Path nodeResult = inputFile;
-		
-		if (!serviceNode.isStructuralNode()) {
-			nodeResult = executeCloudService(serviceNode, inputFile);
-		}
-
-		Path childInput = nodeResult;
-		for (ServiceNode node : serviceNode.getChildren()) {
-			// if an error occurred stop the workflow
-			if (!this.workflowStatus.getErrors().isEmpty()) {
-				break;
-			}
-			
-			try {
-				if (serviceNode.isStructuralNode()) {
-					// inside a structural node the services are executed in sequential order,
-					// passing the result of the previous service to the next one
-					
-					// TODO if the structural node is not the root node, open a new thread for the execution
-					
-					childInput = processServiceNode(node, childInput);
-				} else {
-					// if the node is a service node, all child services should get the result of
-					// the parent service
-					
-					// TODO if serviceNode.getChildren().size() > 1 open new thread for each child
-					
-					processServiceNode(node, childInput);
-				}
-			} catch (ProcessingFailedException e) {
-				this.workflowStatus.addError(e.getMessage());
-			}
-			
-			if (this.workflowStatus.isCancelled()) {
-				addMessage(this.workflowStatus, "Workflow cancelled by user");
-				break;
-			}
-		}
-		
-		return nodeResult;
-	}
-	
-	private Path executeCloudService(ServiceNode node, Path inputFile) {
-		try {
-			CloudServiceDefinition csd = node.getService();
-			ServiceConfiguration config = node.getServiceConfiguration(); 
-			String serviceName = csd.getName();
-			String baseUrl = csd.getBaseUrl();
-			
-			// upload to service
-			MultipartBody multipartBody = Unirest.post(baseUrl)
-					.field("file", Files.newInputStream(inputFile), inputFile.getFileName().toString());
-			
-			if (config != null) {
-				config.getParameterList().forEach(param -> {
-					if (!StringUtils.isEmpty(param.getValue()) && !param.isManagerParameter()) {
-						if ("multiple".equals(param.getCardinality())) {
-							multipartBody.field(param.getKey(), Arrays.asList(param.getValue().split(",")));
-						} else {
-							multipartBody.field(param.getKey(), param.getValue());
-						}
-					}
-				});
-			}
-			
-			HttpResponse<?> uploadResponse = multipartBody.asEmpty();
-			
-			// extract status link from result
-			String statusUrl = null;
-			if (uploadResponse.getStatus() == 201) {
-				statusUrl = uploadResponse.getHeaders().getFirst("Location");
-				if (StringUtils.isEmpty(statusUrl)) {
-					// fallback check for Link header if Location header is not set
-					statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
-				} else if (!HeaderHelper.isValid(statusUrl, baseUrl)) {
-					throw new ProcessingFailedException(
-							"The status URL from the Location header does not match with the service base URL '" + baseUrl + "': " + statusUrl);
-				}
-			} else if (uploadResponse.getStatus() == 200) {
-				// fallback if return code is 200, then follow up needs to be placed in Link header
-				statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
-			} else {
-				// error
-				Object body = uploadResponse.getBody();
-				if (body != null && !body.toString().isEmpty()) {
-					workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - " + body +" - Workflow stopped!");
-				} else {
-					workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - Workflow stopped!");
-				}
-				return null;
-			}
-			
-			addMessage(workflowStatus, "Upload to " + serviceName + " service succeeded");
-
-			Path result = inputFile;
-			
-			if (workflowStatus.isCancelled()) {
-				return null;
-			}
-
-			// trigger status URL until process is finished or error occured
-			if (statusUrl != null) {
-				HttpResponse<?> statusResponse = Unirest.get(statusUrl).asEmpty();
-				List<String> linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
-				
-				long start = System.currentTimeMillis();
-				long end = System.currentTimeMillis();
-				
-				long timeout = 60_000l;
-				if (config != null) {
-					ServiceConfigurationParameter timeoutParam = config.getParameter("timeout");
-					if (timeoutParam != null) {
-						try {
-							timeout = Long.valueOf(timeoutParam.getValue());
-						} catch (Exception e) {
-							addMessage(workflowStatus, serviceName + " timeout value " + timeoutParam.getValue() + " invalid. Using default of 60_000");
-							timeout = 60_000l;
-						}
-					}
-				}
-				
-				while (linkHeaders.size() <= 1) {
-					try {
-						Thread.sleep(1000);
-					} catch (InterruptedException e) {
-						// Restore interrupted state...
-					    Thread.currentThread().interrupt();
-					}
-
-					statusResponse = Unirest.get(statusUrl).asEmpty();
-					linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
-					
-					end = System.currentTimeMillis();
-					
-					// don't request for more than configured timeout
-					if (timeout > 0 && (end - start) > timeout) {
-						addMessage(workflowStatus, serviceName + " status requested for " + (timeout/1000) + " seconds");
-						break;
-					}
-					
-					if (workflowStatus.isCancelled()) {
-						// TODO trigger cancel at service?
-						return null;
-					}
-					
-					// send a processing message every 5 seconds to the client to indicate that the
-					// service is still progressing
-					if ((((end - start) / 1000) % 5) == 0) {
-						this.messagingTemplate.convertAndSend(
-								"/topic/process-updates/" + this.uuid, 
-								new ProcessLog(Action.PROCESSING, serviceName + " is processing ..."));
-
-					}
-				}
-
-				// create the service result directory relative to the input file
-				Path serviceSubDir = getServiceDir(node);
-				Files.createDirectories(serviceSubDir);
-				
-				String servicePath = storageService.load(workflowStatus.getUuid()).relativize(serviceSubDir).toString();
-				// replace backslashes with forward slashes in case the service is executed on Windows
-				servicePath = servicePath.replaceAll("\\\\", "/");
-				if (!servicePath.isEmpty()) {
-					servicePath += "/";
-				}
-				
-				// first check if there is a result link
-				List<String> resultUrls = HeaderHelper.getUrlsFromLink(linkHeaders, "result", baseUrl);
-				
-				String deleteUrl = null;
-				if (!resultUrls.isEmpty()) {
-
-					addMessage(workflowStatus, serviceName + " processing finished");
-					
-					boolean error = false;
-					for (String resultUrl : resultUrls) {
-						
-						// download file
-						HttpResponse<File> resultResponse = Unirest.get(resultUrl)
-								.asFile(serviceSubDir.resolve("result").toString());
-						
-						Path migrationResult = resultResponse.getBody().toPath();
-						String filename = HeaderHelper.getFilenameFromHeader(resultResponse.getHeaders().getFirst("Content-Disposition"));
-						if (filename != null) {
-							migrationResult = Files.move(migrationResult, serviceSubDir.resolve(filename));
-						}
-						
-						String resultName = resultResponse.getHeaders().getFirst("x-app4mc-result-name");
-						if (StringUtils.isEmpty(resultName)) {
-							resultName = serviceName;
-						}
-
-						addMessage(workflowStatus, resultName + " result downloaded");
-
-						workflowStatus.addResult(
-								resultName + " Result",
-								servicePath + migrationResult.getFileName().toString());
-						
-						// extract header information if result should be used in workflow
-						String useResult = resultResponse.getHeaders().getFirst("x-app4mc-use-result");
-						if (useResult == null || !useResult.toLowerCase().equals("false")) {
-							// the result should be used in the workflow
-							result = migrationResult;
-						}
-
-						if (resultResponse.getStatus() != HttpStatus.SC_OK) {
-							error = true;
-						}
-						
-						if (deleteUrl == null) {
-							// extract delete
-							deleteUrl = HeaderHelper.getUrlFromLink(resultResponse.getHeaders().get("Link"), "delete", baseUrl);
-						}
-					}
-					
-					if (error) {
-						workflowStatus.addError(serviceName + " failed with errors");						
-					} else {
-						addMessage(workflowStatus, serviceName + " successfull");
-					}
-				} else {
-					String errorUrl = HeaderHelper.getUrlFromLink(linkHeaders, "error", baseUrl);
-					if (errorUrl != null) {
-						workflowStatus.addError(serviceName + " processing finished with error");
-
-						// download error file
-						HttpResponse<File> errorResponse = Unirest.get(errorUrl)
-								.asFile(serviceSubDir.resolve("error").toString());
-
-						Path migrationError = errorResponse.getBody().toPath();
-						String filename = HeaderHelper.getFilenameFromHeader(errorResponse.getHeaders().getFirst("Content-Disposition"));
-						if (filename != null) {
-							migrationError = Files.move(migrationError, serviceSubDir.resolve(filename));
-						}
-
-						addMessage(workflowStatus, serviceName + " error result downloaded");
-						
-						workflowStatus.addResult(
-								serviceName + " Error",
-								servicePath + migrationError.getFileName().toString());
-						
-						// extract delete
-						deleteUrl = HeaderHelper.getUrlFromLink(errorResponse.getHeaders().get("Link"), "delete", baseUrl);
-					} else {
-						workflowStatus.addError(serviceName + " has no result and no error");
-					}
-				}
-
-				boolean deleteResult = true;
-				if (config != null) {
-					ServiceConfigurationParameter deleteParam = config.getParameter("deleteResult");
-					if (deleteParam != null) {
-						deleteResult = Boolean.valueOf(deleteParam.getValue());
-					}
-				}
-
-				if (deleteUrl != null && deleteResult) {
-					// delete upload again
-					Unirest.delete(deleteUrl).asEmpty();
-					addMessage(workflowStatus, serviceName + " cleaned up");
-				}
-				
-			} else {
-				// no status URL in the upload response, stop further processing
-				workflowStatus.addMessage("No status URL found for service " + serviceName);
-				return inputFile;
-			}
-			
-			return result;
-
-		} catch (Exception e) {
-			throw new ProcessingFailedException("Error in " + node.getService().getName() + " workflow: " + e.getMessage(), e);
-		}
-	}
-	
-	private Path getServiceDir(ServiceNode node) {
-		Path path = storageService.load(workflowStatus.getUuid());
-		ServiceNode sn = node;
-		ArrayList<String> segments = new ArrayList<>();
-		segments.add(getServiceNodeDirName(sn));
-		while (sn.getParentNode() != null && !sn.getParentNode().getId().equals(WorkflowStatus.ROOT_NODE_ID)) {
-			sn = sn.getParentNode();
-			segments.add(0, getServiceNodeDirName(sn));
-		}
-		
-		for (String segment : segments) {
-			path = path.resolve(segment);
-		}
-		
-		return path;
-	}
-	
-	private String getServiceNodeDirName(ServiceNode node) {
-		if (node.getService() != null) {
-			return "_" + node.getService().getKey().toLowerCase();
-		}
-		return "_" + node.getId().toLowerCase();
-	}
-	
-	private void addMessage(WorkflowStatus workflowStatus, String message) {
-		workflowStatus.addMessage(message);
-		this.messagingTemplate.convertAndSend(
-				"/topic/process-updates/" + this.uuid, 
-				new ProcessLog(Action.UPDATE, message));
-	}
 }
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java
index 3f25fb6..36e6dc8 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNode.java
@@ -30,6 +30,7 @@
 	
 	private ServiceNode parentNode;
 
+	private boolean failed = false;
 	/**
 	 * Create a structural node like for example the root node.
 	 */
@@ -193,4 +194,12 @@
 	public String toString() {
 		return this.id;
 	}
+	
+	public void markFailed() {
+		this.failed = true;
+	}
+	
+	public boolean isFailed() {
+		return this.failed;
+	}
 }
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNodeProcessingTask.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNodeProcessingTask.java
new file mode 100644
index 0000000..92b9b4b
--- /dev/null
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/ServiceNodeProcessingTask.java
@@ -0,0 +1,429 @@
+/*********************************************************************************
+ * Copyright (c) 2020, 2021 Robert Bosch GmbH and others.
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ *     Robert Bosch GmbH - initial API and implementation
+ *     Dortmund University of Applied Sciences and Arts - Bug 570871
+ ********************************************************************************
+ */
+package org.eclipse.app4mc.cloud.manager;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.http.HttpStatus;
+import org.eclipse.app4mc.cloud.manager.ProcessLog.Action;
+import org.eclipse.app4mc.cloud.manager.administration.CloudServiceDefinition;
+import org.eclipse.app4mc.cloud.manager.storage.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.simp.SimpMessageSendingOperations;
+import org.springframework.util.StringUtils;
+
+import kong.unirest.HttpResponse;
+import kong.unirest.MultipartBody;
+import kong.unirest.Unirest;
+
+public class ServiceNodeProcessingTask extends RecursiveTask<Path> {
+
+	private static final long serialVersionUID = 8286285942428911612L;
+
+	private final String uuid;
+	private final ServiceNode serviceNode;
+	private final Path inputFile;
+
+	private final StorageService storageService;
+	private final SimpMessageSendingOperations messagingTemplate;
+	private WorkflowStatus workflowStatus;
+
+	private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+	public ServiceNodeProcessingTask(ServiceNode n, Path inputFile, StorageService storageService,
+			SimpMessageSendingOperations messagingTemplate, String uuid, WorkflowStatus ws) {
+		this.storageService = storageService;
+		this.messagingTemplate = messagingTemplate;
+		this.workflowStatus = ws;
+		this.serviceNode = n;
+		this.inputFile = inputFile;
+		this.uuid = uuid;
+	}
+
+	@Override
+	protected Path compute() {
+		try {
+			if (this.serviceNode.isStructuralNode()) {
+				logger.info("created structural node thread: {}", this.serviceNode.getId());
+				Path retval = processStructuralNode(this.serviceNode, this.inputFile);
+				return retval;
+			} else {
+				logger.info("created branch node thread: {}", this.serviceNode.getId());
+				Path retval = processNonStructuralNode(this.serviceNode, this.inputFile);
+				if (this.workflowStatus.isCancelled()) {
+					addMessage(this.workflowStatus, "Workflow cancelled by user");
+				}
+				return retval;
+			}
+		} catch (ProcessingFailedException e) {
+			this.workflowStatus.addError(e.getMessage());
+			return null;
+		}
+
+	}
+
+	private Path processStructuralNode(ServiceNode node, Path inputFile) {
+		Path nextInput = inputFile;
+		for (ServiceNode n : node.getChildren()) {
+			try {
+				nextInput = processNonStructuralNode(n, nextInput);
+				// if an error occurs without throwing an exception, successor tasks do not get executed
+				if (n.isFailed()) {
+					break;
+				}
+				if (this.workflowStatus.isCancelled()) {
+					addMessage(this.workflowStatus, "Workflow cancelled by user");
+					break;
+				}
+			} catch (ProcessingFailedException e) {
+				this.workflowStatus.addError(e.getMessage());
+				break;
+			}
+		}
+		return nextInput;
+	}
+
+	private Path processNonStructuralNode(ServiceNode node, Path inputFile) {
+		if (node.getChildren().isEmpty()) {
+			return processLeafNode(node, inputFile);
+		} else if (node.getChildren().size() > 1) {
+			try {
+				Path currOutput = executeCloudService(node, inputFile);
+				// if an error occurs without throwing an exception, successor tasks do not get executed
+				if (node.isFailed()) {
+					return null;
+				}
+				List<ServiceNodeProcessingTask> lActiveProcessingTask = new ArrayList<>();
+				for (ServiceNode n : node.getChildren()) {
+					lActiveProcessingTask.add(new ServiceNodeProcessingTask(n, currOutput, storageService,
+							messagingTemplate, uuid, workflowStatus));
+				}
+				logger.info("launching new branches");
+				ForkJoinTask.invokeAll(lActiveProcessingTask);
+				return currOutput;
+			} catch (ProcessingFailedException e) {
+				this.workflowStatus.addError(e.getMessage());
+				return null;
+			}
+		} else {
+			try {
+				Path currOutput = executeCloudService(node, inputFile);
+				// if an error occurs without throwing an exception, successor tasks do not get executed
+				if (node.isFailed()) {
+					return null;
+				}
+				ServiceNode nextNode = node.getChildren().get(0);
+				if (nextNode.isStructuralNode()) {
+					ServiceNodeProcessingTask nextNodeTask = new ServiceNodeProcessingTask(nextNode, currOutput,
+							storageService, messagingTemplate, uuid, workflowStatus);
+					ForkJoinPool.commonPool().invoke(nextNodeTask);
+				} else {
+					processNonStructuralNode(nextNode, currOutput);
+				}
+				return currOutput;
+			} catch (ProcessingFailedException e) {
+				this.workflowStatus.addError(e.getMessage());
+				return null;
+			}
+		}
+	}
+
+	private Path processLeafNode(ServiceNode node, Path inputFile) {
+		return executeCloudService(node, inputFile);
+	}
+
+	private Path executeCloudService(ServiceNode node, Path inputFile) {
+		try {
+			CloudServiceDefinition csd = node.getService();
+			ServiceConfiguration config = node.getServiceConfiguration(); 
+			String serviceName = csd.getName();
+			String baseUrl = csd.getBaseUrl();
+			
+			// upload to service
+			MultipartBody multipartBody = Unirest.post(baseUrl)
+					.field("file", Files.newInputStream(inputFile), inputFile.getFileName().toString());
+			
+			if (config != null) {
+				config.getParameterList().forEach(param -> {
+					if (!StringUtils.isEmpty(param.getValue()) && !param.isManagerParameter()) {
+						if ("multiple".equals(param.getCardinality())) {
+							multipartBody.field(param.getKey(), Arrays.asList(param.getValue().split(",")));
+						} else {
+							multipartBody.field(param.getKey(), param.getValue());
+						}
+					}
+				});
+			}
+			
+			HttpResponse<?> uploadResponse = multipartBody.asEmpty();
+			
+			// extract status link from result
+			String statusUrl = null;
+			if (uploadResponse.getStatus() == 201) {
+				statusUrl = uploadResponse.getHeaders().getFirst("Location");
+				if (StringUtils.isEmpty(statusUrl)) {
+					// fallback check for Link header if Location header is not set
+					statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
+				} else if (!HeaderHelper.isValid(statusUrl, baseUrl)) {
+					throw new ProcessingFailedException(
+							"The status URL from the Location header does not match with the service base URL '" + baseUrl + "': " + statusUrl);
+				}
+			} else if (uploadResponse.getStatus() == 200) {
+				// fallback if return code is 200, then follow up needs to be placed in Link header
+				statusUrl = HeaderHelper.getUrlFromLink(uploadResponse.getHeaders().get("Link"), "status", baseUrl);
+			} else {
+				// error
+				Object body = uploadResponse.getBody();
+				if (body != null && !body.toString().isEmpty()) {
+					workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - " + body +" - Workflow stopped!");
+					node.markFailed();
+				} else {
+					workflowStatus.addError("Upload to " + serviceName + " failed! Error code: " + uploadResponse.getStatus() + " - Workflow stopped!");
+					node.markFailed();
+				}
+				return null;
+			}
+			
+			addMessage(workflowStatus, "Upload to " + serviceName + " service succeeded");
+
+			Path result = inputFile;
+			
+			if (workflowStatus.isCancelled()) {
+				return null;
+			}
+
+			// trigger status URL until process is finished or error occured
+			if (statusUrl != null) {
+				HttpResponse<?> statusResponse = Unirest.get(statusUrl).asEmpty();
+				List<String> linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
+				
+				long start = System.currentTimeMillis();
+				long end = System.currentTimeMillis();
+				
+				long timeout = 60_000l;
+				if (config != null) {
+					ServiceConfigurationParameter timeoutParam = config.getParameter("timeout");
+					if (timeoutParam != null) {
+						try {
+							timeout = Long.valueOf(timeoutParam.getValue());
+						} catch (Exception e) {
+							addMessage(workflowStatus, serviceName + " timeout value " + timeoutParam.getValue() + " invalid. Using default of 60_000");
+							timeout = 60_000l;
+						}
+					}
+				}
+				
+				while (linkHeaders.size() <= 1) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						// Restore interrupted state...
+					    Thread.currentThread().interrupt();
+					}
+
+					statusResponse = Unirest.get(statusUrl).asEmpty();
+					linkHeaders = HeaderHelper.getLinkHeaders(statusResponse);
+					
+					end = System.currentTimeMillis();
+					
+					// don't request for more than configured timeout
+					if (timeout > 0 && (end - start) > timeout) {
+						addMessage(workflowStatus, serviceName + " status requested for " + (timeout/1000) + " seconds");
+						break;
+					}
+					
+					if (workflowStatus.isCancelled()) {
+						// TODO trigger cancel at service?
+						return null;
+					}
+					
+					// send a processing message every 5 seconds to the client to indicate that the
+					// service is still progressing
+					if ((((end - start) / 1000) % 5) == 0) {
+						synchronized (this.messagingTemplate) {
+								this.messagingTemplate.convertAndSend(
+								"/topic/process-updates/" + this.uuid, 
+								new ProcessLog(Action.PROCESSING, serviceName + " is processing ..."));
+
+						}
+					}
+				}
+
+				// create the service result directory relative to the input file
+				Path serviceSubDir = getServiceDir(node);
+				Files.createDirectories(serviceSubDir);
+				
+				String servicePath = storageService.load(workflowStatus.getUuid()).relativize(serviceSubDir).toString();
+				// replace backslashes with forward slashes in case the service is executed on Windows
+				servicePath = servicePath.replaceAll("\\\\", "/");
+				if (!servicePath.isEmpty()) {
+					servicePath += "/";
+				}
+				
+				// first check if there is a result link
+				List<String> resultUrls = HeaderHelper.getUrlsFromLink(linkHeaders, "result", baseUrl);
+				
+				String deleteUrl = null;
+				if (!resultUrls.isEmpty()) {
+
+					addMessage(workflowStatus, serviceName + " processing finished");
+					
+					boolean error = false;
+					for (String resultUrl : resultUrls) {
+						
+						// download file
+						HttpResponse<File> resultResponse = Unirest.get(resultUrl)
+								.asFile(serviceSubDir.resolve("result").toString());
+						
+						Path migrationResult = resultResponse.getBody().toPath();
+						String filename = HeaderHelper.getFilenameFromHeader(resultResponse.getHeaders().getFirst("Content-Disposition"));
+						if (filename != null) {
+							migrationResult = Files.move(migrationResult, serviceSubDir.resolve(filename));
+						}
+						
+						String resultName = resultResponse.getHeaders().getFirst("x-app4mc-result-name");
+						if (StringUtils.isEmpty(resultName)) {
+							resultName = serviceName;
+						}
+
+						addMessage(workflowStatus, resultName + " result downloaded");
+
+						workflowStatus.addResult(
+								resultName + " Result",
+								servicePath + migrationResult.getFileName().toString());
+						
+						// extract header information if result should be used in workflow
+						String useResult = resultResponse.getHeaders().getFirst("x-app4mc-use-result");
+						if (useResult == null || !useResult.toLowerCase().equals("false")) {
+							// the result should be used in the workflow
+							result = migrationResult;
+						}
+
+						if (resultResponse.getStatus() != HttpStatus.SC_OK) {
+							error = true;
+						}
+						
+						if (deleteUrl == null) {
+							// extract delete
+							deleteUrl = HeaderHelper.getUrlFromLink(resultResponse.getHeaders().get("Link"), "delete", baseUrl);
+						}
+					}
+					
+					if (error) {
+						workflowStatus.addError(serviceName + " failed with errors");						
+						node.markFailed();
+					} else {
+						addMessage(workflowStatus, serviceName + " successfull");
+					}
+				} else {
+					String errorUrl = HeaderHelper.getUrlFromLink(linkHeaders, "error", baseUrl);
+					if (errorUrl != null) {
+						workflowStatus.addError(serviceName + " processing finished with error");
+						node.markFailed();
+
+						// download error file
+						HttpResponse<File> errorResponse = Unirest.get(errorUrl)
+								.asFile(serviceSubDir.resolve("error").toString());
+
+						Path migrationError = errorResponse.getBody().toPath();
+						String filename = HeaderHelper.getFilenameFromHeader(errorResponse.getHeaders().getFirst("Content-Disposition"));
+						if (filename != null) {
+							migrationError = Files.move(migrationError, serviceSubDir.resolve(filename));
+						}
+
+						addMessage(workflowStatus, serviceName + " error result downloaded");
+						
+						workflowStatus.addResult(
+								serviceName + " Error",
+								servicePath + migrationError.getFileName().toString());
+						
+						// extract delete
+						deleteUrl = HeaderHelper.getUrlFromLink(errorResponse.getHeaders().get("Link"), "delete", baseUrl);
+					} else {
+						workflowStatus.addError(serviceName + " has no result and no error");
+						node.markFailed();
+					}
+				}
+
+				boolean deleteResult = true;
+				if (config != null) {
+					ServiceConfigurationParameter deleteParam = config.getParameter("deleteResult");
+					if (deleteParam != null) {
+						deleteResult = Boolean.valueOf(deleteParam.getValue());
+					}
+				}
+
+				if (deleteUrl != null && deleteResult) {
+					// delete upload again
+					Unirest.delete(deleteUrl).asEmpty();
+					addMessage(workflowStatus, serviceName + " cleaned up");
+				}
+				
+			} else {
+				// no status URL in the upload response, stop further processing
+				workflowStatus.addMessage("No status URL found for service " + serviceName);
+				return inputFile;
+			}
+			
+			return result;
+
+		} catch (Exception e) {
+			throw new ProcessingFailedException("Error in " + node.getService().getName() + " workflow: " + e.getMessage(), e);
+		}
+	}
+	
+	private Path getServiceDir(ServiceNode node) {
+		Path path = storageService.load(workflowStatus.getUuid());
+		ServiceNode sn = node;
+		ArrayList<String> segments = new ArrayList<>();
+		segments.add(getServiceNodeDirName(sn));
+		while (sn.getParentNode() != null && !sn.getParentNode().getId().equals(WorkflowStatus.ROOT_NODE_ID)) {
+			sn = sn.getParentNode();
+			segments.add(0, getServiceNodeDirName(sn));
+		}
+		
+		for (String segment : segments) {
+			path = path.resolve(segment);
+		}
+		
+		return path;
+	}
+	
+	private String getServiceNodeDirName(ServiceNode node) {
+		if (node.getService() != null) {
+			return "_" + node.getService().getKey().toLowerCase();
+		}
+		return "_" + node.getId().toLowerCase();
+	}
+	
+	private void addMessage(WorkflowStatus workflowStatus, String message) {
+		synchronized(this.messagingTemplate){
+			workflowStatus.addMessage(message);
+			this.messagingTemplate.convertAndSend(
+				"/topic/process-updates/" + this.uuid, 
+				new ProcessLog(Action.UPDATE, message));
+		}
+	}
+
+}
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java
index d42e003..47d17fb 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatus.java
@@ -9,6 +9,7 @@
  *
  * Contributors:
  *     Robert Bosch GmbH - initial API and implementation
+ *     Dortmund University of Applied Sciences and Arts - Bug 570871
  ********************************************************************************
  */
 package org.eclipse.app4mc.cloud.manager;
@@ -248,16 +249,24 @@
 		return node;		
 	}
 	
-	public ArrayList<String> getMessages() {
-		return messages;
+	public List<String> getMessages() {
+		return new ArrayList<>(this.messages);
+	}
+	
+	public boolean hasMessages() {
+		return !this.messages.isEmpty();
 	}
 	
 	public void addMessage(String message) {
 		this.messages.add(message);
 	}
 	
-	public ArrayList<String> getErrors() {
-		return errors;
+	public List<String> getErrors() {
+		return new ArrayList<>(this.errors);
+	}
+	
+	public boolean hasErrors() {
+		return !this.errors.isEmpty();
 	}
 	
 	public void addError(String error) {
@@ -265,7 +274,11 @@
 	}
 	
 	public HashMap<String, String> getResults() {
-		return this.results;
+		return new LinkedHashMap<>(this.results);
+	}
+	
+	public boolean hasResults() {
+		return !this.results.isEmpty();
 	}
 	
 	public void addResult(String key, String resultFile) {
diff --git a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java
index 2f487bc..553cc36 100644
--- a/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java
+++ b/manager/src/main/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusSerializer.java
@@ -65,16 +65,15 @@
 			gen.writeBooleanField("done", value.isDone());
 		}
 		
-		if (!value.getMessages().isEmpty()) {
+		if (value.hasMessages()) {
 			gen.writeObjectField("messages", value.getMessages());
 		}
-		if (!value.getErrors().isEmpty()) {
+		if (value.hasErrors()) {
 			gen.writeObjectField("errors", value.getErrors());
 		}
-		if (!value.getResults().isEmpty()) {
+		if (value.hasResults()) {
 			gen.writeObjectField("results", value.getResults());
 		}
-		
 		gen.writeEndObject();
 	}
 	
diff --git a/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java b/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java
index c9994d4..19dd243 100644
--- a/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java
+++ b/manager/src/test/java/org/eclipse/app4mc/cloud/manager/WorkflowStatusHelperTest.java
@@ -9,6 +9,7 @@
  *
  * Contributors:
  *     Robert Bosch GmbH - initial API and implementation
+ *     Dortmund University of Applied Sciences and Arts - Bug 570871
  ********************************************************************************
  */
 package org.eclipse.app4mc.cloud.manager;
@@ -215,14 +216,14 @@
 		assertFalse(ws.isCancelled());
 		assertTrue(ws.isDone());
 		
-		ArrayList<String> messages = ws.getMessages();
+		List<String> messages = ws.getMessages();
 		assertNotNull(messages);
 		assertEquals(3, messages.size());
 		assertEquals("Message One", messages.get(0));
 		assertEquals("Message Two", messages.get(1));
 		assertEquals("Message Three", messages.get(2));
 		
-		ArrayList<String> errors = ws.getErrors();
+		List<String> errors = ws.getErrors();
 		assertNotNull(errors);
 		assertEquals(0, errors.size());