/********************************************************************************************************************* | |
* Copyright (c) 2008, 2015 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. | |
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 | |
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.eclipse.smila.common.definitions.AccessAny; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.jobmanager.definitions.JobDefinition; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
/** | |
* Tests for deletion of temp objects for non-forking workflows. | |
*/ | |
public class TestJobManagerNonForkingWorkflow extends JobTaskProcessingTestBase { | |
/** Dummy data to write into the store. */ | |
private static final byte[] DEAD_BEEF = new byte[] { (byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF }; | |
/** The initial worker's name (startAction worker). */ | |
private static final String INPUT_WORKER = "inputWorker"; | |
/** The workflow name. */ | |
private static final String WORKFLOW_NAME = "testWorkflow"; | |
/** The job name. */ | |
private static final String JOB_NAME = "testjob"; | |
/** The index name. */ | |
private static final String PARAM_VALUE = "test"; | |
/** The store name. */ | |
private static final String STORE_NAME = "test"; | |
/** The object store service. */ | |
private ObjectStoreService _defStorage; | |
/** The jobId (to be able to finish it when test is finished. */ | |
private String _jobId; | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_defStorage = getService(ObjectStoreService.class); | |
for (final String storeName : _objectStoreService.getStoreNames()) { | |
if ("jobmanager".equals(storeName)) { | |
_objectStoreService.clearStore(storeName); | |
} else { | |
_objectStoreService.removeStore(storeName); | |
} | |
} | |
_defStorage.ensureStore(STORE_NAME); | |
// default job | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", JOB_NAME); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("workerParameter", PARAM_VALUE); | |
parametersAny.put("tempStore", STORE_NAME); | |
parametersAny.put("store", STORE_NAME); | |
jobAny.put("parameters", parametersAny); | |
jobAny.put("workflow", WORKFLOW_NAME); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
_jobId = _jobRunEngine.startJob(JOB_NAME); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
_jobRunEngine.finishJob(JOB_NAME, _jobId); | |
_defPersistence.removeJob(JOB_NAME); | |
} | |
public void testDeleteTransientBulksOfSuccessfulTask() throws Exception { | |
Task task = _jobTaskProcessor.getInitialTask(INPUT_WORKER, JOB_NAME); | |
// create the 1st worker's output | |
final List<String> objIds1 = createOutputObjects(task); | |
assertEquals(1, objIds1.size()); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
final ResultDescription resultDescription = | |
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null); | |
// finish the 1st worker's task and get the 2nd worker's task | |
task = getSingleNextTask(task, resultDescription, "intermediateWorker"); | |
// create the 2nd worker's output | |
final List<String> objIds2 = createOutputObjects(task); | |
assertEquals(1, objIds2.size()); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0))); | |
// finish the 2nd worker's task and get the 3rd worker's task | |
task = getSingleNextTask(task, resultDescription, "finalWorker"); | |
// create the 3rd worker's output | |
final List<String> objIds3 = createOutputObjects(task); | |
assertEquals(1, objIds3.size()); | |
// the 1st worker's output does not exist because its bucket is transient | |
assertFalse(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
// the 2nd worker's output exists because its input bucket is persistent | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0))); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds3.get(0))); | |
assertTrue(getNextTasks(task, resultDescription).isEmpty()); | |
} | |
public void testDoNotDeleteTransientBulksOfTaskOnFatalError() throws Exception { | |
Task task = _jobTaskProcessor.getInitialTask(INPUT_WORKER, JOB_NAME); | |
// create the 1st worker's output | |
final List<String> objIds1 = createOutputObjects(task); | |
assertEquals(1, objIds1.size()); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
final ResultDescription resultDescSuccessful = | |
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null); | |
// finish the 1st worker's task and get the 2nd worker's task | |
task = getSingleNextTask(task, resultDescSuccessful, "intermediateWorker"); | |
// create the 2nd worker's output | |
final List<String> objIds2 = createOutputObjects(task); | |
assertEquals(1, objIds2.size()); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0))); | |
final ResultDescription resultDescriptionFatalError = | |
new ResultDescription(TaskCompletionStatus.FATAL_ERROR, null, null, null); | |
// finish the 2nd worker's task and get the 3rd worker's task | |
final Task finishTask = task.createFinishTask(resultDescriptionFatalError, getClass().getSimpleName()); | |
final List<Task> nextTasks = _jobTaskProcessor.finishTask(finishTask); | |
assertEquals(0, nextTasks.size()); | |
// the 1st worker's output exists even if its bucket is transient | |
// because the task completion status is other than SUCCESSFUL | |
assertFalse(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
} | |
public void testDoNotDeleteTransientBulksOfRecoverableTask() throws Exception { | |
Task task = _jobTaskProcessor.getInitialTask(INPUT_WORKER, JOB_NAME); | |
// create the 1st worker's output | |
final List<String> objIds1 = createOutputObjects(task); | |
assertEquals(1, objIds1.size()); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
final ResultDescription resultDescSuccessful = | |
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null); | |
// finish the 1st worker's task and get the 2nd worker's task | |
task = getSingleNextTask(task, resultDescSuccessful, "intermediateWorker"); | |
// create the 2nd worker's output | |
final List<String> objIds2 = createOutputObjects(task); | |
assertEquals(1, objIds2.size()); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0))); | |
final ResultDescription resultDescRecoverableError = | |
new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, null, null, null); | |
// finish the 2nd worker's task as recoverable and get a retry task | |
task = getSingleNextTask(task, resultDescRecoverableError, "intermediateWorker"); | |
// the 1st worker's output is not deleted because the retry task needs it | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
// finish the 2nd worker's task as successful and get the 3rd worker's task | |
task = getSingleNextTask(task, resultDescSuccessful, "finalWorker"); | |
// create the 3rd worker's output | |
final List<String> objIds3 = createOutputObjects(task); | |
assertEquals(1, objIds3.size()); | |
// the 1st worker's output does not exist because its bucket is transient | |
assertFalse(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0))); | |
// the 2nd worker's output exists because its input bucket is persistent | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0))); | |
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds3.get(0))); | |
assertTrue(getNextTasks(task, resultDescSuccessful).isEmpty()); | |
} | |
private List<String> createOutputObjects(final Task task) throws Exception { | |
final List<String> objectNames = new ArrayList<String>(); | |
for (final List<BulkInfo> bulkInfoList : task.getOutputBulks().values()) { | |
for (final BulkInfo bulkInfo : bulkInfoList) { | |
final String objectName = bulkInfo.getObjectName(); | |
_defStorage.putObject(bulkInfo.getStoreName(), objectName, DEAD_BEEF); | |
_objectStoreService.putObject(bulkInfo.getStoreName(), objectName, DEAD_BEEF); | |
objectNames.add(objectName); | |
} | |
} | |
return objectNames; | |
} | |
} |