/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.Collection; | |
import java.util.List; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.jobmanager.JobDefinition; | |
import org.eclipse.smila.jobmanager.WorkflowDefinition; | |
import org.eclipse.smila.jobmanager.internal.AccessAny; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
/** | |
* Test JobManager.finishTask() method. | |
*/ | |
public class TestJobManagerFinishTask extends JobManagerTestBase { | |
/** Dummy data to write into the store. */ | |
private static final byte[] DEAD_BEEF = new byte[] { (byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF }; | |
/** worker name for initial worker (startAction worker). */ | |
private static final String INPUT_WORKER = "inputWorker"; | |
/** workflow name. */ | |
private static final String WORKFLOW_NAME = "testWorkflow"; | |
/** job name. */ | |
private static final String JOB_NAME = "testjob"; | |
/** index name. */ | |
private static final String PARAM_VALUE = "test"; | |
/** store name. */ | |
private static final String STORE_NAME = "test"; | |
/** The object store service. */ | |
private ObjectStoreService _defStorage; | |
/** the jobId (to be able to finish it when test is finished. */ | |
private String _jobId; | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_defStorage = getService(ObjectStoreService.class); | |
for (final String storeName : _objectStoreService.getStoreNames()) { | |
if ("jobmanager".equals(storeName)) { | |
_objectStoreService.clearStore(storeName); | |
} else { | |
_objectStoreService.removeStore(storeName); | |
} | |
} | |
_defStorage.ensureStore(STORE_NAME); | |
// default job | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", JOB_NAME); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("workerParameter", PARAM_VALUE); | |
parametersAny.put("tempStore", STORE_NAME); | |
parametersAny.put("store", STORE_NAME); | |
jobAny.put("parameters", parametersAny); | |
jobAny.put("workflow", WORKFLOW_NAME); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
_jobId = _jobManager.startJob(JOB_NAME); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
_jobManager.finishJob(JOB_NAME, _jobId); | |
_defPersistence.removeJob(JOB_NAME); | |
} | |
/** | |
* Tests finishTask for a workflow that has no following tasks for this worker. | |
* | |
* @throws Exception | |
* an error occurred while creating definitions or during job management. | |
*/ | |
public void testFinishTaskWithoutFollowingTask() throws Exception { | |
final String workflowName = "testFinishTaskWithoutFollowingTaskWorkflow"; | |
// single action workflow | |
final String jobName = "singleTaskJob"; | |
final AnyMap workflowAny = AccessAny.FACTORY.createAnyMap(); | |
workflowAny.put("name", workflowName); | |
final AnyMap startActionAny = AccessAny.FACTORY.createAnyMap(); | |
startActionAny.put("worker", INPUT_WORKER); | |
final AnyMap outPutAny = AccessAny.FACTORY.createAnyMap(); | |
outPutAny.put("output", "docsBucket"); | |
startActionAny.put("output", outPutAny); | |
workflowAny.put("startAction", startActionAny); | |
_defPersistence.addWorkflow(new WorkflowDefinition(workflowAny)); | |
// job | |
final AnyMap jobAny = AccessAny.FACTORY.createAnyMap(); | |
jobAny.put("name", jobName); | |
final AnyMap parametersAny = AccessAny.FACTORY.createAnyMap(); | |
parametersAny.put("workerParameter", PARAM_VALUE); | |
parametersAny.put("tempStore", "tempstore"); | |
parametersAny.put("store", "tempstore"); | |
jobAny.put("parameters", parametersAny); | |
jobAny.put("workflow", workflowName); | |
_defPersistence.addJob(new JobDefinition(jobAny)); | |
// long lastTime = System.nanoTime(); | |
final String jobId = _jobManager.startJob(jobName); | |
// System.out.println("startJob duration (ns):" + (System.nanoTime() - lastTime)); | |
assertNotNull(jobId); | |
// lastTime = System.nanoTime(); | |
final Task currentTask = _jobManager.getInitialTask(INPUT_WORKER, jobName); | |
// System.out.println("getInitialTask duration (ns):" + (System.nanoTime() - lastTime)); | |
assertNotNull(currentTask); | |
final ResultDescription resultDescription = | |
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null); | |
// lastTime = System.nanoTime(); | |
final Collection<Task> followUpTasks = getNextTasks(currentTask, resultDescription); | |
// System.out.println("finishTask duration (ns):" + (System.nanoTime() - lastTime)); | |
assertTrue("follow up tasks should be empty.", followUpTasks.isEmpty()); | |
_jobManager.finishJob(jobName, jobId); | |
_defPersistence.removeJob(jobName); | |
_defPersistence.removeWorkflow(workflowName); | |
} | |
/** | |
* Tests finishTask for a workflow that has a following task for this worker. | |
* | |
* @throws Exception | |
* an error occurred while creating definitions or during job management. | |
*/ | |
public void testFinishTaskWithFollowingTask() throws Exception { | |
final Task currentTask = _jobManager.getInitialTask(INPUT_WORKER, JOB_NAME); | |
assertNotNull(currentTask); | |
final ResultDescription resultDescription = | |
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null); | |
createOutputObjects(currentTask); | |
final Task followUpTask = getSingleNextTask(currentTask, resultDescription, "intermediateWorker"); | |
createOutputObjects(followUpTask); | |
assertEquals("Not the correct bulk.", currentTask.getOutputBulks().get("output"), followUpTask.getInputBulks() | |
.get("input")); | |
assertEquals("Not the correct input bulk amount.", 1, followUpTask.getInputBulks().size()); | |
assertEquals("Not the correct output bulk amount.", 1, followUpTask.getOutputBulks().size()); | |
// we have to complete the run to be able to remove the job run afterwards | |
Collection<Task> tasks = getNextTasks(followUpTask, resultDescription); | |
while (!tasks.isEmpty()) { | |
final Task t = tasks.iterator().next(); | |
createOutputObjects(t); | |
tasks = getNextTasks(t, resultDescription); | |
} | |
} | |
/** | |
* Tests finishTask for a workflow that has no following tasks for this worker because an optional bulk has not been | |
* created. | |
* | |
* @throws Exception | |
* an error occurred while creating definitions or during job management. | |
*/ | |
public void testFinishTaskWithoutOptionalBucketCreatedFollowingTask() throws Exception { | |
final Task currentTask = _jobManager.getInitialTask(INPUT_WORKER, JOB_NAME); | |
assertNotNull(currentTask); | |
final ResultDescription resultDescription = | |
new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null); | |
// do not create output bulks | |
final Collection<Task> followUpTasks = getNextTasks(currentTask, resultDescription); | |
assertTrue("Follow up tasks should be empty.", followUpTasks.isEmpty()); | |
} | |
/** | |
* Writes an pseudo bulk. | |
* | |
* @param currentTask | |
* the current task, for which the output buckets are to be filled. | |
* @throws ObjectStoreException | |
* creation of dummy data failed. | |
*/ | |
private void createOutputObjects(final Task currentTask) throws ObjectStoreException { | |
for (final List<BulkInfo> bulkInfoList : currentTask.getOutputBulks().values()) { | |
for (final BulkInfo bulkInfo : bulkInfoList) { | |
_defStorage.putObject(bulkInfo.getStoreName(), bulkInfo.getObjectName(), DEAD_BEEF); | |
} | |
} | |
} | |
} |