blob: 2defb487f98dd1118eabf947a314abe2b3f1781d [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2015 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved.
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.smila.common.definitions.AccessAny;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.jobmanager.definitions.JobDefinition;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
/**
* Tests for deletion of temp objects for non-forking workflows.
*/
public class TestJobManagerNonForkingWorkflow extends JobTaskProcessingTestBase {
/** Dummy data to write into the store. */
private static final byte[] DEAD_BEEF = new byte[] { (byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF };
/** The initial worker's name (startAction worker). */
private static final String INPUT_WORKER = "inputWorker";
/** The workflow name. */
private static final String WORKFLOW_NAME = "testWorkflow";
/** The job name. */
private static final String JOB_NAME = "testjob";
/** The index name. */
private static final String PARAM_VALUE = "test";
/** The store name. */
private static final String STORE_NAME = "test";
/** The object store service. */
private ObjectStoreService _defStorage;
/** The jobId (to be able to finish it when test is finished. */
private String _jobId;
/**
* {@inheritDoc}
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_defStorage = getService(ObjectStoreService.class);
for (final String storeName : _objectStoreService.getStoreNames()) {
if ("jobmanager".equals(storeName)) {
_objectStoreService.clearStore(storeName);
} else {
_objectStoreService.removeStore(storeName);
}
}
_defStorage.ensureStore(STORE_NAME);
// default job
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", JOB_NAME);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("workerParameter", PARAM_VALUE);
parametersAny.put("tempStore", STORE_NAME);
parametersAny.put("store", STORE_NAME);
jobAny.put("parameters", parametersAny);
jobAny.put("workflow", WORKFLOW_NAME);
_defPersistence.addJob(new JobDefinition(jobAny));
_jobId = _jobRunEngine.startJob(JOB_NAME);
}
/**
* {@inheritDoc}
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
_jobRunEngine.finishJob(JOB_NAME, _jobId);
_defPersistence.removeJob(JOB_NAME);
}
public void testDeleteTransientBulksOfSuccessfulTask() throws Exception {
Task task = _jobTaskProcessor.getInitialTask(INPUT_WORKER, JOB_NAME);
// create the 1st worker's output
final List<String> objIds1 = createOutputObjects(task);
assertEquals(1, objIds1.size());
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
final ResultDescription resultDescription =
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null);
// finish the 1st worker's task and get the 2nd worker's task
task = getSingleNextTask(task, resultDescription, "intermediateWorker");
// create the 2nd worker's output
final List<String> objIds2 = createOutputObjects(task);
assertEquals(1, objIds2.size());
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0)));
// finish the 2nd worker's task and get the 3rd worker's task
task = getSingleNextTask(task, resultDescription, "finalWorker");
// create the 3rd worker's output
final List<String> objIds3 = createOutputObjects(task);
assertEquals(1, objIds3.size());
// the 1st worker's output does not exist because its bucket is transient
assertFalse(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
// the 2nd worker's output exists because its input bucket is persistent
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0)));
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds3.get(0)));
assertTrue(getNextTasks(task, resultDescription).isEmpty());
}
public void testDoNotDeleteTransientBulksOfTaskOnFatalError() throws Exception {
Task task = _jobTaskProcessor.getInitialTask(INPUT_WORKER, JOB_NAME);
// create the 1st worker's output
final List<String> objIds1 = createOutputObjects(task);
assertEquals(1, objIds1.size());
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
final ResultDescription resultDescSuccessful =
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null);
// finish the 1st worker's task and get the 2nd worker's task
task = getSingleNextTask(task, resultDescSuccessful, "intermediateWorker");
// create the 2nd worker's output
final List<String> objIds2 = createOutputObjects(task);
assertEquals(1, objIds2.size());
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0)));
final ResultDescription resultDescriptionFatalError =
new ResultDescription(TaskCompletionStatus.FATAL_ERROR, null, null, null);
// finish the 2nd worker's task and get the 3rd worker's task
final Task finishTask = task.createFinishTask(resultDescriptionFatalError, getClass().getSimpleName());
final List<Task> nextTasks = _jobTaskProcessor.finishTask(finishTask);
assertEquals(0, nextTasks.size());
// the 1st worker's output exists even if its bucket is transient
// because the task completion status is other than SUCCESSFUL
assertFalse(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
}
public void testDoNotDeleteTransientBulksOfRecoverableTask() throws Exception {
Task task = _jobTaskProcessor.getInitialTask(INPUT_WORKER, JOB_NAME);
// create the 1st worker's output
final List<String> objIds1 = createOutputObjects(task);
assertEquals(1, objIds1.size());
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
final ResultDescription resultDescSuccessful =
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null);
// finish the 1st worker's task and get the 2nd worker's task
task = getSingleNextTask(task, resultDescSuccessful, "intermediateWorker");
// create the 2nd worker's output
final List<String> objIds2 = createOutputObjects(task);
assertEquals(1, objIds2.size());
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0)));
final ResultDescription resultDescRecoverableError =
new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, null, null, null);
// finish the 2nd worker's task as recoverable and get a retry task
task = getSingleNextTask(task, resultDescRecoverableError, "intermediateWorker");
// the 1st worker's output is not deleted because the retry task needs it
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
// finish the 2nd worker's task as successful and get the 3rd worker's task
task = getSingleNextTask(task, resultDescSuccessful, "finalWorker");
// create the 3rd worker's output
final List<String> objIds3 = createOutputObjects(task);
assertEquals(1, objIds3.size());
// the 1st worker's output does not exist because its bucket is transient
assertFalse(_objectStoreService.existsObject(STORE_NAME, objIds1.get(0)));
// the 2nd worker's output exists because its input bucket is persistent
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds2.get(0)));
assertTrue(_objectStoreService.existsObject(STORE_NAME, objIds3.get(0)));
assertTrue(getNextTasks(task, resultDescSuccessful).isEmpty());
}
private List<String> createOutputObjects(final Task task) throws Exception {
final List<String> objectNames = new ArrayList<String>();
for (final List<BulkInfo> bulkInfoList : task.getOutputBulks().values()) {
for (final BulkInfo bulkInfo : bulkInfoList) {
final String objectName = bulkInfo.getObjectName();
_defStorage.putObject(bulkInfo.getStoreName(), objectName, DEAD_BEEF);
_objectStoreService.putObject(bulkInfo.getStoreName(), objectName, DEAD_BEEF);
objectNames.add(objectName);
}
}
return objectNames;
}
}