/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.Callable; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.jobmanager.JobRunEngine; | |
import org.eclipse.smila.jobmanager.JobTaskProcessor; | |
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants; | |
import org.eclipse.smila.jobmanager.exceptions.JobManagerException; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
/** | |
* Extends JobManager basic tests: defines methods for simulating task processing. | |
*/ | |
public abstract class JobTaskProcessingTestBase extends JobManagerTestBase { | |
/** | |
* Test class to simulate standard workflow run. | |
* | |
*/ | |
public class StandardJobClient implements Callable<Void> { | |
/** Wait time in msa thread is waiting to be notified when being hold in a worker. */ | |
private static final int WAIT_TIME_IN_WORKER = 5000; | |
/** The job name. */ | |
private final String _jobName; | |
/** The index name. */ | |
private final String _indexName; | |
/** The temporary store name. */ | |
private final String _tempStoreName; | |
/** The worker that should trigger a wait(). */ | |
private final String _waitInWorker; | |
/** | |
* Constuctor of StandardJobClient. | |
* | |
* @param jobName | |
* the job name | |
* @param indexName | |
* the index name | |
* @param tempStoreName | |
* the temp storage name. | |
*/ | |
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName) { | |
super(); | |
_jobName = jobName; | |
_indexName = indexName; | |
_tempStoreName = tempStoreName; | |
_waitInWorker = null; | |
} | |
/** | |
* Constuctor of StandardJobClient that calls wait() whe a specific worker should "work". | |
* | |
* @param jobName | |
* the job name | |
* @param indexName | |
* the index name | |
* @param tempStoreName | |
* the temp storage name. | |
* @param waitInWorker | |
* the worker where the workflow run should call wait() | |
*/ | |
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName, | |
final String waitInWorker) { | |
super(); | |
_jobName = jobName; | |
_indexName = indexName; | |
_tempStoreName = tempStoreName; | |
_waitInWorker = waitInWorker; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public Void call() throws Exception { | |
final List<BulkInfo> transientObjects = new ArrayList<BulkInfo>(); | |
final List<BulkInfo> persistentObjects = new ArrayList<BulkInfo>(); | |
final Task task1 = getInitialTask(WORKER_1, _jobName); | |
assertWorkflowRunData(task1, 1, 0); | |
final ResultDescription inputResult = processInitialTask(task1, _tempStoreName); | |
// first result bucket is transient | |
for (final List<BulkInfo> bulkInfoList : task1.getOutputBulks().values()) { | |
transientObjects.addAll(bulkInfoList); | |
} | |
waitIfRequested(WORKER_1); | |
final Task task2 = getSingleNextTask(task1, inputResult, WORKER_2); | |
assertWorkflowRunData(task2, 1, 1); | |
final ResultDescription intermediateResult = processWorker2Task(task2, _indexName, _tempStoreName); | |
// this buckets is persistent... | |
waitIfRequested(WORKER_2); | |
for (final List<BulkInfo> bulkInfoList : task2.getOutputBulks().values()) { | |
persistentObjects.addAll(bulkInfoList); | |
} | |
final Task task3 = getSingleNextTask(task2, intermediateResult, WORKER_3); | |
assertWorkflowRunData(task3, 1, 1); | |
final ResultDescription finalResult = processWorker3Task(task3, _indexName, _tempStoreName, false); | |
for (final List<BulkInfo> bulkInfoList : task3.getOutputBulks().values()) { | |
persistentObjects.addAll(bulkInfoList); | |
} | |
waitIfRequested(WORKER_3); | |
finishFinalTask(task3, finalResult); | |
assertObjectsDeleted(transientObjects); | |
assertObjectsExist(persistentObjects); | |
assertNoWorkflowRunData(task3); | |
return null; | |
} | |
/** | |
* Calls wait() if currentWorker is equal to the waitWorker specified in the constructor. | |
* | |
* @param currentWorker | |
* the current "worker" | |
*/ | |
private void waitIfRequested(final String currentWorker) { | |
if (currentWorker.equals(_waitInWorker)) { | |
synchronized (this) { | |
try { | |
wait(WAIT_TIME_IN_WORKER); | |
} catch (final InterruptedException ex) { | |
; // interrupt is same as notify. | |
} | |
} | |
} | |
} | |
} | |
/** | |
* Callable that starts a job run. | |
*/ | |
public final class StartJobCallable implements Callable<String> { | |
/** The JobManager service. */ | |
private final JobRunEngine _jobRunEngine; | |
/** The jobname. */ | |
private final String _jobName; | |
/** | |
* Constructor. | |
* | |
* @param jobManager | |
* the job manager. | |
* @param jobName | |
* the job name. | |
*/ | |
public StartJobCallable(final JobRunEngine jobManager, final String jobName) { | |
_jobRunEngine = jobManager; | |
_jobName = jobName; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public String call() throws Exception { | |
try { | |
return _jobRunEngine.startJob(_jobName); | |
} catch (final Exception e) { | |
assertJobManagerException(e, _jobName); | |
throw e; | |
} | |
} | |
} | |
/** first worker. */ | |
public static final String WORKER_1 = "inputWorker"; | |
/** second worker. */ | |
public static final String WORKER_2 = "intermediateWorker"; | |
/** third worker. */ | |
public static final String WORKER_3 = "finalWorker"; | |
/** triggered worker. */ | |
public static final String WORKER_4 = "triggeredWorker"; | |
/** worker with optional parameter. */ | |
public static final String WORKER_OPTIONAL_PARAM = "testOptionalParametersWorker"; | |
/** worker counter name for input objects. */ | |
public static final String INPUT_OBJECT_COUNT = "inputObjectCount"; | |
/** worker counter name for output objects. */ | |
public static final String OUTPUT_OBJECT_COUNT = "outputObjectCount"; | |
/** The time a worker "works". */ | |
private static final int WORKER_SLEEP_TIME = 10; | |
/** JobTaskProcessor service under test. */ | |
protected JobTaskProcessor _jobTaskProcessor; | |
/** TaskStorage service under test. */ | |
protected TaskStorage _taskStorage; | |
/** | |
* fetch needed services. {@inheritDoc} | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_jobTaskProcessor = getService(JobTaskProcessor.class); | |
assertNotNull(_jobTaskProcessor); | |
_taskStorage = getService(TaskStorage.class); | |
assertNotNull(_taskStorage); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
_taskStorage.clear(); // clear all tasks | |
} | |
/** | |
* Get an initial task and checks some assertions: the task is not null, the worker name is correct, the input is | |
* null. | |
* | |
* @param workerName | |
* the worker for which this task should be created. | |
* @param jobName | |
* the job name. | |
* @return the created task. | |
* @throws JobManagerException | |
* error | |
*/ | |
public Task getInitialTask(final String workerName, final String jobName) throws JobManagerException { | |
final Task task = _jobTaskProcessor.getInitialTask(workerName, jobName); | |
assertNotNull(task); | |
assertEquals(workerName, task.getWorkerName()); | |
assertTrue(task.getInputBulks().isEmpty()); | |
return task; | |
} | |
/** | |
* returns a single task after finishing the doneTask. It is asserted, that exactly one resulting task is produced, | |
* the worker name is as expected and the input of the next task is not empty. | |
* | |
* @param doneTask | |
* the task to finish | |
* @param result | |
* the result description for finishing the task | |
* @param expectedWorkerName | |
* the worker name that is expected for the follow-up task | |
* @return the follow-up task of the doneTask | |
* @throws JobManagerException | |
* error | |
*/ | |
public Task getSingleNextTask(final Task doneTask, final ResultDescription result, final String expectedWorkerName) | |
throws JobManagerException { | |
final List<Task> nextTasks = finishTask(doneTask, result); | |
assertEquals(1, nextTasks.size()); | |
final Task nextTask = nextTasks.get(0); | |
assertEquals(expectedWorkerName, nextTask.getWorkerName()); | |
assertFalse(nextTask.getInputBulks().isEmpty()); | |
return nextTask; | |
} | |
/** | |
* Returns a list of tasks after finishing the done task. | |
* | |
* @param doneTask | |
* the task to finish | |
* @param result | |
* the result description for finishing the task | |
* @return the follow-up tasks of the doneTask | |
* @throws JobManagerException | |
* error | |
*/ | |
public List<Task> getNextTasks(final Task doneTask, final ResultDescription result) throws JobManagerException { | |
final List<Task> nextTasks = finishTask(doneTask, result); | |
assertNotNull(nextTasks); | |
return nextTasks; | |
} | |
/** | |
* Finishes a final task and checks if there are no resulting follow-up tasks. | |
* | |
* @param doneTask | |
* the task to finish | |
* @param result | |
* the result description for the doneTask | |
* @throws JobManagerException | |
* error | |
*/ | |
public void finishFinalTask(final Task doneTask, final ResultDescription result) throws JobManagerException { | |
final List<Task> nextTasks = finishTask(doneTask, result); | |
assertNotNull(nextTasks); | |
if (!nextTasks.isEmpty()) { | |
fail("Expected empty list but contained task: " + nextTasks.get(0).toAny()); | |
} | |
} | |
/** finish a task in job manager. */ | |
private List<Task> finishTask(final Task doneTask, final ResultDescription result) throws JobManagerException { | |
final Task finishTask = doneTask.createFinishTask(result, getClass().getSimpleName()); | |
final List<Task> nextTasks = _jobTaskProcessor.finishTask(finishTask); | |
assertNotNull(nextTasks); | |
return nextTasks; | |
} | |
/** | |
* Processes a task for the bulk-builder worker. | |
* | |
* @param task | |
* the task to be processed. | |
* @param expectedStore | |
* the store name. | |
* @return a ResultDescription for a successful completed task. | |
* @throws Exception | |
* error | |
*/ | |
public ResultDescription processInitialTask(final Task task, final String expectedStore) throws Exception { | |
int outputCount = 0; | |
final BulkInfo addBulk = createBulk(task, "output", "Added records go here"); | |
checkRecordBulk(expectedStore, "insertBucket", addBulk); | |
outputCount++; | |
work(WORKER_SLEEP_TIME); | |
return successResult(0, outputCount); | |
} | |
/** | |
* processes a task for the worker2 worker. | |
* | |
* @param task | |
* the task to be processed. | |
* @param expectedWorkerParam | |
* the worker param value. | |
* @param expectedStore | |
* the store name. | |
* @return a ResultDescription for a successful completed task. | |
* @throws Exception | |
* error | |
*/ | |
public ResultDescription processWorker2Task(final Task task, final String expectedWorkerParam, | |
final String expectedStore) throws Exception { | |
assertWorkerParam(task, expectedWorkerParam); | |
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0); | |
checkRecordBulk(expectedStore, "insertBucket", docsBulk); | |
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName())); | |
final BulkInfo processedBulk = createBulk(task, "output", "manipulated records go here"); | |
checkRecordBulk(expectedStore, "processedBucket", processedBulk); | |
work(WORKER_SLEEP_TIME); | |
return successResult(1, 1); | |
} | |
/** | |
* processes a task for the index-builder worker. | |
* | |
* @param task | |
* the task to be processed. | |
* @param expectedWorkerParam | |
* the worker param value. | |
* @param expectedStore | |
* the store name. | |
* @param alsoCreateDeleteBulk | |
* if 'true' an additional bulk for "deletedRecords" slot will be created. | |
* @return a ResultDescription for a successful completed task. | |
* @throws Exception | |
* error | |
*/ | |
public ResultDescription processWorker3Task(final Task task, final String expectedWorkerParam, | |
final String expectedStore, final boolean alsoCreateDeleteBulk) throws Exception { | |
assertWorkerParam(task, expectedWorkerParam); | |
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0); | |
checkRecordBulk(expectedStore, "processedBucket", docsBulk); | |
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName())); | |
createBulk(task, "output", "This is a final bulk."); | |
int outputCount = 1; | |
if (alsoCreateDeleteBulk) { | |
createBulk(task, "deletedRecords", "This is a new delete bulk."); | |
outputCount++; | |
} | |
work(WORKER_SLEEP_TIME); | |
return successResult(1, outputCount); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status SUCCESS. | |
*/ | |
protected ResultDescription successResult() { | |
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status SUCCESS and two counters. | |
*/ | |
protected ResultDescription successResult(final int inputObjectCount, final int outputObjectCount) { | |
final Map<String, Number> counters = new HashMap<String, Number>(); | |
counters.put(INPUT_OBJECT_COUNT, inputObjectCount); | |
counters.put(OUTPUT_OBJECT_COUNT, outputObjectCount); | |
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", counters); | |
} | |
/** | |
* creates a bulk for the given task, slot name and writes the data into the bulk. | |
* | |
* @param task | |
* the task | |
* @param slotName | |
* the name of the worker slot | |
* @param data | |
* the data to write in the store | |
* @return the created bulk | |
* @throws Exception | |
* error | |
*/ | |
protected BulkInfo createBulk(final Task task, final String slotName, final String data) throws Exception { | |
final BulkInfo bulk = task.getOutputBulks().get(slotName).get(0); | |
assertNotNull("Bulk for slotname '" + slotName + "' for worker '" + task.getWorkerName() + "' is null.", bulk); | |
_objectStoreService.putObject(bulk.getStoreName(), bulk.getObjectName(), data.getBytes("utf-8")); | |
return bulk; | |
} | |
/** | |
* Asserts the expectedWorkerParam is set as worker parameter in the task. | |
* | |
* @param task | |
* the task to check | |
* @param expectedWorkerParam | |
* the worker value to check | |
*/ | |
protected void assertWorkerParam(final Task task, final String expectedWorkerParam) { | |
assertEquals(expectedWorkerParam, task.getParameters().getStringValue("workerParameter")); | |
} | |
/** | |
* Checks a record bulk if it is not null, the store name and objectid is correct. | |
* | |
* @param expectedStore | |
* the expected store name | |
* @param expectedBucket | |
* the expected bucket name prefix | |
* @param recordBulk | |
* the bulk to check | |
*/ | |
protected void checkRecordBulk(final String expectedStore, final String expectedBucket, final BulkInfo recordBulk) { | |
assertNotNull(recordBulk); | |
assertEquals(expectedStore, recordBulk.getStoreName()); | |
assertTrue(recordBulk.getObjectName().startsWith(expectedBucket)); | |
} | |
/** | |
* asserts that a list of objects are deleted from (i.e. not existing in) the store. | |
* | |
* @param objects | |
* the list of objects to check | |
* @throws ObjectStoreException | |
* could not check if object exists. | |
*/ | |
protected void assertObjectsDeleted(final List<BulkInfo> objects) throws ObjectStoreException { | |
for (final BulkInfo object : objects) { | |
assertFalse("object '" + object.getObjectName() + "' still exists.", | |
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName())); | |
} | |
} | |
/** | |
* asserts that a list of objects exist in the store. | |
* | |
* @param objects | |
* the list of objects to check | |
* @throws ObjectStoreException | |
* check failed | |
*/ | |
protected void assertObjectsExist(final List<BulkInfo> objects) throws ObjectStoreException { | |
for (final BulkInfo object : objects) { | |
assertTrue("object '" + object.getObjectName() + "' does not exists.", | |
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName())); | |
} | |
} | |
/** | |
* pretend to work (i.e. sleep) for a while. | |
* | |
* @param milliseconds | |
* the number of milliseconds to "work". | |
*/ | |
protected void work(final long milliseconds) { | |
try { | |
Thread.sleep(milliseconds); | |
} catch (final InterruptedException ex) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
/** | |
* assert worker task counter, if value greater than zero expected. | |
*/ | |
protected void assertWorkerTaskCounter(final AnyMap workerCounter, final String name, final int expected) { | |
if (expected > 0) { | |
assertTrue("missing counter for " + name, workerCounter.containsKey(name)); | |
assertEquals("wrong counter value for " + name, expected, workerCounter.getLongValue(name).intValue()); | |
} | |
} | |
/** | |
* Asserts the exception is a JobManagerException with the job name in the exception message. | |
* | |
* @param ex | |
* the exception to check | |
* @param jobName | |
* the job name to check in the text | |
*/ | |
protected void assertJobManagerException(final Exception ex, final String jobName) { | |
assertTrue("This should be a JobManagerException for job '" + jobName + "': " + ex, | |
ex instanceof JobManagerException); | |
assertTrue("message should contain quoted job name", ex.toString().contains("'" + jobName + "'")); | |
} | |
/** check that there is no workflow run data for this task. */ | |
protected void assertNoWorkflowRunData(final Task task) throws Exception { | |
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME); | |
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID); | |
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID); | |
try { | |
final AnyMap workflowData = _jobRunDataProvider.getWorkflowRunData(jobName, jobId, workflowId); | |
fail("there should be no workflow run data, but got " + workflowData); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
} | |
/** check if the workflow run data for the task have the correct task and bulk counters. */ | |
protected void assertWorkflowRunData(final Task task, final int activeTasks, final int transientBulks) | |
throws Exception { | |
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME); | |
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID); | |
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID); | |
final AnyMap workflowData = _jobRunDataProvider.getWorkflowRunData(jobName, jobId, workflowId); | |
assertNotNull(workflowData); | |
assertEquals(activeTasks, workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_ACTIVE_TASKS) | |
.intValue()); | |
assertEquals(transientBulks, | |
workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_TRANSIENT_BULKS).intValue()); | |
} | |
/** check task counters for worker. */ | |
protected void assertTaskManagerCounter(final String workerName, final int expectedTodo, | |
final int expectedInProgress) throws TaskmanagerException { | |
final Map<String, TaskCounter> counters = _taskStorage.getTaskCounters(); | |
final TaskCounter counter = counters.get(workerName); | |
assertEquals(expectedTodo, counter.getTasksTodo()); | |
assertEquals(expectedInProgress, counter.getTasksInProgress()); | |
} | |
} |