blob: 68e96b98f323b81be42c53a903a9448c520f1ecf [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.Collection;
import java.util.List;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.jobmanager.JobDefinition;
import org.eclipse.smila.jobmanager.WorkflowDefinition;
import org.eclipse.smila.jobmanager.internal.AccessAny;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
/**
* Test JobManager.finishTask() method.
*/
public class TestJobManagerFinishTask extends JobManagerTestBase {
/** Dummy data to write into the store. */
private static final byte[] DEAD_BEEF = new byte[] { (byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF };
/** worker name for initial worker (startAction worker). */
private static final String INPUT_WORKER = "inputWorker";
/** workflow name. */
private static final String WORKFLOW_NAME = "testWorkflow";
/** job name. */
private static final String JOB_NAME = "testjob";
/** index name. */
private static final String PARAM_VALUE = "test";
/** store name. */
private static final String STORE_NAME = "test";
/** The object store service. */
private ObjectStoreService _defStorage;
/** the jobId (to be able to finish it when test is finished. */
private String _jobId;
/**
* {@inheritDoc}
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_defStorage = getService(ObjectStoreService.class);
for (final String storeName : _objectStoreService.getStoreNames()) {
if ("jobmanager".equals(storeName)) {
_objectStoreService.clearStore(storeName);
} else {
_objectStoreService.removeStore(storeName);
}
}
_defStorage.ensureStore(STORE_NAME);
// default job
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", JOB_NAME);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("workerParameter", PARAM_VALUE);
parametersAny.put("tempStore", STORE_NAME);
parametersAny.put("store", STORE_NAME);
jobAny.put("parameters", parametersAny);
jobAny.put("workflow", WORKFLOW_NAME);
_defPersistence.addJob(new JobDefinition(jobAny));
_jobId = _jobManager.startJob(JOB_NAME);
}
/**
* {@inheritDoc}
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
_jobManager.finishJob(JOB_NAME, _jobId);
_defPersistence.removeJob(JOB_NAME);
}
/**
* Tests finishTask for a workflow that has no following tasks for this worker.
*
* @throws Exception
* an error occurred while creating definitions or during job management.
*/
public void testFinishTaskWithoutFollowingTask() throws Exception {
final String workflowName = "testFinishTaskWithoutFollowingTaskWorkflow";
// single action workflow
final String jobName = "singleTaskJob";
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap();
workflowAny.put("name", workflowName);
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap();
startActionAny.put("worker", INPUT_WORKER);
final AnyMap outPutAny = AccessAny.FACTORY.createAnyMap();
outPutAny.put("output", "docsBucket");
startActionAny.put("output", outPutAny);
workflowAny.put("startAction", startActionAny);
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny));
// job
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap();
jobAny.put("name", jobName);
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap();
parametersAny.put("workerParameter", PARAM_VALUE);
parametersAny.put("tempStore", "tempstore");
parametersAny.put("store", "tempstore");
jobAny.put("parameters", parametersAny);
jobAny.put("workflow", workflowName);
_defPersistence.addJob(new JobDefinition(jobAny));
// long lastTime = System.nanoTime();
final String jobId = _jobManager.startJob(jobName);
// System.out.println("startJob duration (ns):" + (System.nanoTime() - lastTime));
assertNotNull(jobId);
// lastTime = System.nanoTime();
final Task currentTask = _jobManager.getInitialTask(INPUT_WORKER, jobName);
// System.out.println("getInitialTask duration (ns):" + (System.nanoTime() - lastTime));
assertNotNull(currentTask);
final ResultDescription resultDescription =
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null);
// lastTime = System.nanoTime();
final Collection<Task> followUpTasks = getNextTasks(currentTask, resultDescription);
// System.out.println("finishTask duration (ns):" + (System.nanoTime() - lastTime));
assertTrue("follow up tasks should be empty.", followUpTasks.isEmpty());
_jobManager.finishJob(jobName, jobId);
_defPersistence.removeJob(jobName);
_defPersistence.removeWorkflow(workflowName);
}
/**
* Tests finishTask for a workflow that has a following task for this worker.
*
* @throws Exception
* an error occurred while creating definitions or during job management.
*/
public void testFinishTaskWithFollowingTask() throws Exception {
final Task currentTask = _jobManager.getInitialTask(INPUT_WORKER, JOB_NAME);
assertNotNull(currentTask);
final ResultDescription resultDescription =
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null);
createOutputObjects(currentTask);
final Task followUpTask = getSingleNextTask(currentTask, resultDescription, "intermediateWorker");
createOutputObjects(followUpTask);
assertEquals("Not the correct bulk.", currentTask.getOutputBulks().get("output"), followUpTask.getInputBulks()
.get("input"));
assertEquals("Not the correct input bulk amount.", 1, followUpTask.getInputBulks().size());
assertEquals("Not the correct output bulk amount.", 1, followUpTask.getOutputBulks().size());
// we have to complete the run to be able to remove the job run afterwards
Collection<Task> tasks = getNextTasks(followUpTask, resultDescription);
while (!tasks.isEmpty()) {
final Task t = tasks.iterator().next();
createOutputObjects(t);
tasks = getNextTasks(t, resultDescription);
}
}
/**
* Tests finishTask for a workflow that has no following tasks for this worker because an optional bulk has not been
* created.
*
* @throws Exception
* an error occurred while creating definitions or during job management.
*/
public void testFinishTaskWithoutOptionalBucketCreatedFollowingTask() throws Exception {
final Task currentTask = _jobManager.getInitialTask(INPUT_WORKER, JOB_NAME);
assertNotNull(currentTask);
final ResultDescription resultDescription =
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null);
// do not create output bulks
final Collection<Task> followUpTasks = getNextTasks(currentTask, resultDescription);
assertTrue("Follow up tasks should be empty.", followUpTasks.isEmpty());
}
/**
* Writes an pseudo bulk.
*
* @param currentTask
* the current task, for which the output buckets are to be filled.
* @throws ObjectStoreException
* creation of dummy data failed.
*/
private void createOutputObjects(final Task currentTask) throws ObjectStoreException {
for (final List<BulkInfo> bulkInfoList : currentTask.getOutputBulks().values()) {
for (final BulkInfo bulkInfo : bulkInfoList) {
_defStorage.putObject(bulkInfo.getStoreName(), bulkInfo.getObjectName(), DEAD_BEEF);
}
}
}
}