blob: 950ddb7dce3e8ba112f0b1a6ebea0415a63e01ea [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial
* implementation
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.jobmanager.JobRunEngine;
import org.eclipse.smila.jobmanager.JobTaskProcessor;
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants;
import org.eclipse.smila.jobmanager.exceptions.JobManagerException;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
/**
* Extends JobManager basic tests: defines methods for simulating task processing.
*/
public abstract class JobTaskProcessingTestBase extends JobManagerTestBase {
/**
* Test class to simulate standard workflow run.
*
*/
public class StandardJobClient implements Callable<Void> {
/** Wait time in msa thread is waiting to be notified when being hold in a worker. */
private static final int WAIT_TIME_IN_WORKER = 5000;
/** The job name. */
private final String _jobName;
/** The index name. */
private final String _indexName;
/** The temporary store name. */
private final String _tempStoreName;
/** The worker that should trigger a wait(). */
private final String _waitInWorker;
/**
* Constuctor of StandardJobClient.
*
* @param jobName
* the job name
* @param indexName
* the index name
* @param tempStoreName
* the temp storage name.
*/
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName) {
super();
_jobName = jobName;
_indexName = indexName;
_tempStoreName = tempStoreName;
_waitInWorker = null;
}
/**
* Constuctor of StandardJobClient that calls wait() whe a specific worker should "work".
*
* @param jobName
* the job name
* @param indexName
* the index name
* @param tempStoreName
* the temp storage name.
* @param waitInWorker
* the worker where the workflow run should call wait()
*/
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName,
final String waitInWorker) {
super();
_jobName = jobName;
_indexName = indexName;
_tempStoreName = tempStoreName;
_waitInWorker = waitInWorker;
}
/**
* {@inheritDoc}
*/
@Override
public Void call() throws Exception {
final List<BulkInfo> transientObjects = new ArrayList<BulkInfo>();
final List<BulkInfo> persistentObjects = new ArrayList<BulkInfo>();
final Task task1 = getInitialTask(WORKER_1, _jobName);
assertWorkflowRunData(task1, 1, 0);
final ResultDescription inputResult = processInitialTask(task1, _tempStoreName);
// first result bucket is transient
for (final List<BulkInfo> bulkInfoList : task1.getOutputBulks().values()) {
transientObjects.addAll(bulkInfoList);
}
waitIfRequested(WORKER_1);
final Task task2 = getSingleNextTask(task1, inputResult, WORKER_2);
assertWorkflowRunData(task2, 1, 1);
final ResultDescription intermediateResult = processWorker2Task(task2, _indexName, _tempStoreName);
// this buckets is persistent...
waitIfRequested(WORKER_2);
for (final List<BulkInfo> bulkInfoList : task2.getOutputBulks().values()) {
persistentObjects.addAll(bulkInfoList);
}
final Task task3 = getSingleNextTask(task2, intermediateResult, WORKER_3);
assertWorkflowRunData(task3, 1, 1);
final ResultDescription finalResult = processWorker3Task(task3, _indexName, _tempStoreName, false);
for (final List<BulkInfo> bulkInfoList : task3.getOutputBulks().values()) {
persistentObjects.addAll(bulkInfoList);
}
waitIfRequested(WORKER_3);
finishFinalTask(task3, finalResult);
assertObjectsDeleted(transientObjects);
assertObjectsExist(persistentObjects);
assertNoWorkflowRunData(task3);
return null;
}
/**
* Calls wait() if currentWorker is equal to the waitWorker specified in the constructor.
*
* @param currentWorker
* the current "worker"
*/
private void waitIfRequested(final String currentWorker) {
if (currentWorker.equals(_waitInWorker)) {
synchronized (this) {
try {
wait(WAIT_TIME_IN_WORKER);
} catch (final InterruptedException ex) {
; // interrupt is same as notify.
}
}
}
}
}
/**
* Callable that starts a job run.
*/
public final class StartJobCallable implements Callable<String> {
/** The JobManager service. */
private final JobRunEngine _jobRunEngine;
/** The jobname. */
private final String _jobName;
/**
* Constructor.
*
* @param jobManager
* the job manager.
* @param jobName
* the job name.
*/
public StartJobCallable(final JobRunEngine jobManager, final String jobName) {
_jobRunEngine = jobManager;
_jobName = jobName;
}
/**
* {@inheritDoc}
*/
@Override
public String call() throws Exception {
try {
return _jobRunEngine.startJob(_jobName);
} catch (final Exception e) {
assertJobManagerException(e, _jobName);
throw e;
}
}
}
/** first worker. */
public static final String WORKER_1 = "inputWorker";
/** second worker. */
public static final String WORKER_2 = "intermediateWorker";
/** third worker. */
public static final String WORKER_3 = "finalWorker";
/** triggered worker. */
public static final String WORKER_4 = "triggeredWorker";
/** worker with optional parameter. */
public static final String WORKER_OPTIONAL_PARAM = "testOptionalParametersWorker";
/** worker counter name for input objects. */
public static final String INPUT_OBJECT_COUNT = "inputObjectCount";
/** worker counter name for output objects. */
public static final String OUTPUT_OBJECT_COUNT = "outputObjectCount";
/** The time a worker "works". */
private static final int WORKER_SLEEP_TIME = 10;
/** JobTaskProcessor service under test. */
protected JobTaskProcessor _jobTaskProcessor;
/** TaskStorage service under test. */
protected TaskStorage _taskStorage;
/**
* fetch needed services. {@inheritDoc}
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_jobTaskProcessor = getService(JobTaskProcessor.class);
assertNotNull(_jobTaskProcessor);
_taskStorage = getService(TaskStorage.class);
assertNotNull(_taskStorage);
}
/**
* {@inheritDoc}
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
_taskStorage.clear(); // clear all tasks
}
/**
* Get an initial task and checks some assertions: the task is not null, the worker name is correct, the input is
* null.
*
* @param workerName
* the worker for which this task should be created.
* @param jobName
* the job name.
* @return the created task.
* @throws JobManagerException
* error
*/
public Task getInitialTask(final String workerName, final String jobName) throws JobManagerException {
final Task task = _jobTaskProcessor.getInitialTask(workerName, jobName);
assertNotNull(task);
assertEquals(workerName, task.getWorkerName());
assertTrue(task.getInputBulks().isEmpty());
return task;
}
/**
* returns a single task after finishing the doneTask. It is asserted, that exactly one resulting task is produced,
* the worker name is as expected and the input of the next task is not empty.
*
* @param doneTask
* the task to finish
* @param result
* the result description for finishing the task
* @param expectedWorkerName
* the worker name that is expected for the follow-up task
* @return the follow-up task of the doneTask
* @throws JobManagerException
* error
*/
public Task getSingleNextTask(final Task doneTask, final ResultDescription result, final String expectedWorkerName)
throws JobManagerException {
final List<Task> nextTasks = finishTask(doneTask, result);
assertEquals(1, nextTasks.size());
final Task nextTask = nextTasks.get(0);
assertEquals(expectedWorkerName, nextTask.getWorkerName());
assertFalse(nextTask.getInputBulks().isEmpty());
return nextTask;
}
/**
* Returns a list of tasks after finishing the done task.
*
* @param doneTask
* the task to finish
* @param result
* the result description for finishing the task
* @return the follow-up tasks of the doneTask
* @throws JobManagerException
* error
*/
public List<Task> getNextTasks(final Task doneTask, final ResultDescription result) throws JobManagerException {
final List<Task> nextTasks = finishTask(doneTask, result);
assertNotNull(nextTasks);
return nextTasks;
}
/**
* Finishes a final task and checks if there are no resulting follow-up tasks.
*
* @param doneTask
* the task to finish
* @param result
* the result description for the doneTask
* @throws JobManagerException
* error
*/
public void finishFinalTask(final Task doneTask, final ResultDescription result) throws JobManagerException {
final List<Task> nextTasks = finishTask(doneTask, result);
assertNotNull(nextTasks);
if (!nextTasks.isEmpty()) {
fail("Expected empty list but contained task: " + nextTasks.get(0).toAny());
}
}
/** finish a task in job manager. */
private List<Task> finishTask(final Task doneTask, final ResultDescription result) throws JobManagerException {
final Task finishTask = doneTask.createFinishTask(result, getClass().getSimpleName());
final List<Task> nextTasks = _jobTaskProcessor.finishTask(finishTask);
assertNotNull(nextTasks);
return nextTasks;
}
/**
* Processes a task for the bulk-builder worker.
*
* @param task
* the task to be processed.
* @param expectedStore
* the store name.
* @return a ResultDescription for a successful completed task.
* @throws Exception
* error
*/
public ResultDescription processInitialTask(final Task task, final String expectedStore) throws Exception {
int outputCount = 0;
final BulkInfo addBulk = createBulk(task, "output", "Added records go here");
checkRecordBulk(expectedStore, "insertBucket", addBulk);
outputCount++;
work(WORKER_SLEEP_TIME);
return successResult(0, outputCount);
}
/**
* processes a task for the worker2 worker.
*
* @param task
* the task to be processed.
* @param expectedWorkerParam
* the worker param value.
* @param expectedStore
* the store name.
* @return a ResultDescription for a successful completed task.
* @throws Exception
* error
*/
public ResultDescription processWorker2Task(final Task task, final String expectedWorkerParam,
final String expectedStore) throws Exception {
assertWorkerParam(task, expectedWorkerParam);
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0);
checkRecordBulk(expectedStore, "insertBucket", docsBulk);
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName()));
final BulkInfo processedBulk = createBulk(task, "output", "manipulated records go here");
checkRecordBulk(expectedStore, "processedBucket", processedBulk);
work(WORKER_SLEEP_TIME);
return successResult(1, 1);
}
/**
* processes a task for the index-builder worker.
*
* @param task
* the task to be processed.
* @param expectedWorkerParam
* the worker param value.
* @param expectedStore
* the store name.
* @param alsoCreateDeleteBulk
* if 'true' an additional bulk for "deletedRecords" slot will be created.
* @return a ResultDescription for a successful completed task.
* @throws Exception
* error
*/
public ResultDescription processWorker3Task(final Task task, final String expectedWorkerParam,
final String expectedStore, final boolean alsoCreateDeleteBulk) throws Exception {
assertWorkerParam(task, expectedWorkerParam);
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0);
checkRecordBulk(expectedStore, "processedBucket", docsBulk);
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName()));
createBulk(task, "output", "This is a final bulk.");
int outputCount = 1;
if (alsoCreateDeleteBulk) {
createBulk(task, "deletedRecords", "This is a new delete bulk.");
outputCount++;
}
work(WORKER_SLEEP_TIME);
return successResult(1, outputCount);
}
/**
* @return a ResultDescription for a task with completion status SUCCESS.
*/
protected ResultDescription successResult() {
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", new HashMap<String, Number>());
}
/**
* @return a ResultDescription for a task with completion status SUCCESS and two counters.
*/
protected ResultDescription successResult(final int inputObjectCount, final int outputObjectCount) {
final Map<String, Number> counters = new HashMap<String, Number>();
counters.put(INPUT_OBJECT_COUNT, inputObjectCount);
counters.put(OUTPUT_OBJECT_COUNT, outputObjectCount);
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", counters);
}
/**
* creates a bulk for the given task, slot name and writes the data into the bulk.
*
* @param task
* the task
* @param slotName
* the name of the worker slot
* @param data
* the data to write in the store
* @return the created bulk
* @throws Exception
* error
*/
protected BulkInfo createBulk(final Task task, final String slotName, final String data) throws Exception {
final BulkInfo bulk = task.getOutputBulks().get(slotName).get(0);
assertNotNull("Bulk for slotname '" + slotName + "' for worker '" + task.getWorkerName() + "' is null.", bulk);
_objectStoreService.putObject(bulk.getStoreName(), bulk.getObjectName(), data.getBytes("utf-8"));
return bulk;
}
/**
* Asserts the expectedWorkerParam is set as worker parameter in the task.
*
* @param task
* the task to check
* @param expectedWorkerParam
* the worker value to check
*/
protected void assertWorkerParam(final Task task, final String expectedWorkerParam) {
assertEquals(expectedWorkerParam, task.getParameters().getStringValue("workerParameter"));
}
/**
* Checks a record bulk if it is not null, the store name and objectid is correct.
*
* @param expectedStore
* the expected store name
* @param expectedBucket
* the expected bucket name prefix
* @param recordBulk
* the bulk to check
*/
protected void checkRecordBulk(final String expectedStore, final String expectedBucket, final BulkInfo recordBulk) {
assertNotNull(recordBulk);
assertEquals(expectedStore, recordBulk.getStoreName());
assertTrue(recordBulk.getObjectName().startsWith(expectedBucket));
}
/**
* asserts that a list of objects are deleted from (i.e. not existing in) the store.
*
* @param objects
* the list of objects to check
* @throws ObjectStoreException
* could not check if object exists.
*/
protected void assertObjectsDeleted(final List<BulkInfo> objects) throws ObjectStoreException {
for (final BulkInfo object : objects) {
assertFalse("object '" + object.getObjectName() + "' still exists.",
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName()));
}
}
/**
* asserts that a list of objects exist in the store.
*
* @param objects
* the list of objects to check
* @throws ObjectStoreException
* check failed
*/
protected void assertObjectsExist(final List<BulkInfo> objects) throws ObjectStoreException {
for (final BulkInfo object : objects) {
assertTrue("object '" + object.getObjectName() + "' does not exists.",
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName()));
}
}
/**
* pretend to work (i.e. sleep) for a while.
*
* @param milliseconds
* the number of milliseconds to "work".
*/
protected void work(final long milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
/**
* assert worker task counter, if value greater than zero expected.
*/
protected void assertWorkerTaskCounter(final AnyMap workerCounter, final String name, final int expected) {
if (expected > 0) {
assertTrue("missing counter for " + name, workerCounter.containsKey(name));
assertEquals("wrong counter value for " + name, expected, workerCounter.getLongValue(name).intValue());
}
}
/**
* Asserts the exception is a JobManagerException with the job name in the exception message.
*
* @param ex
* the exception to check
* @param jobName
* the job name to check in the text
*/
protected void assertJobManagerException(final Exception ex, final String jobName) {
assertTrue("This should be a JobManagerException for job '" + jobName + "': " + ex,
ex instanceof JobManagerException);
assertTrue("message should contain quoted job name", ex.toString().contains("'" + jobName + "'"));
}
/** check that there is no workflow run data for this task. */
protected void assertNoWorkflowRunData(final Task task) throws Exception {
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME);
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID);
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID);
try {
final AnyMap workflowData = _jobRunDataProvider.getWorkflowRunData(jobName, jobId, workflowId);
fail("there should be no workflow run data, but got " + workflowData);
} catch (final JobManagerException ex) {
; // OK
}
}
/** check if the workflow run data for the task have the correct task and bulk counters. */
protected void assertWorkflowRunData(final Task task, final int activeTasks, final int transientBulks)
throws Exception {
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME);
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID);
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID);
final AnyMap workflowData = _jobRunDataProvider.getWorkflowRunData(jobName, jobId, workflowId);
assertNotNull(workflowData);
assertEquals(activeTasks, workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_ACTIVE_TASKS)
.intValue());
assertEquals(transientBulks,
workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_TRANSIENT_BULKS).intValue());
}
/** check task counters for worker. */
protected void assertTaskManagerCounter(final String workerName, final int expectedTodo,
final int expectedInProgress) throws TaskmanagerException {
final Map<String, TaskCounter> counters = _taskStorage.getTaskCounters();
final TaskCounter counter = counters.get(workerName);
assertEquals(expectedTodo, counter.getTasksTodo());
assertEquals(expectedInProgress, counter.getTasksInProgress());
}
}