blob: c6c81b470dd1018e55d72872af43711e2246bf59 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial
* implementation
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.jobmanager.JobDefinition;
import org.eclipse.smila.jobmanager.JobManager;
import org.eclipse.smila.jobmanager.JobManagerConstants;
import org.eclipse.smila.jobmanager.JobManagerException;
import org.eclipse.smila.jobmanager.JobRunInfo;
import org.eclipse.smila.jobmanager.JobRunMode;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.jobmanager.internal.AccessAny;
import org.eclipse.smila.jobmanager.persistence.DefinitionPersistence;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/**
* Base class for JobManager run tests: defines methods for managing jobs, workflows and simulating task processing.
*/
public abstract class JobManagerTestBase extends DeclarativeServiceTestCase {
/**
* Test class to simulate standard workflow run.
*
*/
public class StandardJobClient implements Callable<Void> {
/** Wait time in msa thread is waiting to be notified when being hold in a worker. */
private static final int WAIT_TIME_IN_WORKER = 5000;
/** The job name. */
private final String _jobName;
/** The index name. */
private final String _indexName;
/** The temporary store name. */
private final String _tempStoreName;
/** The worker that should trigger a wait(). */
private final String _waitInWorker;
/**
* Constuctor of StandardJobClient.
*
* @param jobName
* the job name
* @param indexName
* the index name
* @param tempStoreName
* the temp storage name.
*/
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName) {
super();
_jobName = jobName;
_indexName = indexName;
_tempStoreName = tempStoreName;
_waitInWorker = null;
}
/**
* Constuctor of StandardJobClient that calls wait() whe a specific worker should "work".
*
* @param jobName
* the job name
* @param indexName
* the index name
* @param tempStoreName
* the temp storage name.
* @param waitInWorker
* the worker where the workflow run should call wait()
*/
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName,
final String waitInWorker) {
super();
_jobName = jobName;
_indexName = indexName;
_tempStoreName = tempStoreName;
_waitInWorker = waitInWorker;
}
/**
* {@inheritDoc}
*/
@Override
public Void call() throws Exception {
final List<BulkInfo> transientObjects = new ArrayList<BulkInfo>();
final List<BulkInfo> persistentObjects = new ArrayList<BulkInfo>();
final Task task1 = getInitialTask(WORKER_1, _jobName);
assertWorkflowRunData(task1, 1, 0);
final ResultDescription inputResult = processInitialTask(task1, _tempStoreName);
// first result bucket is transient
for (final List<BulkInfo> bulkInfoList : task1.getOutputBulks().values()) {
transientObjects.addAll(bulkInfoList);
}
waitIfRequested(WORKER_1);
final Task task2 = getSingleNextTask(task1, inputResult, WORKER_2);
assertWorkflowRunData(task2, 1, 1);
final ResultDescription intermediateResult = processWorker2Task(task2, _indexName, _tempStoreName);
// this buckets is persistent...
waitIfRequested(WORKER_2);
for (final List<BulkInfo> bulkInfoList : task2.getOutputBulks().values()) {
persistentObjects.addAll(bulkInfoList);
}
final Task task3 = getSingleNextTask(task2, intermediateResult, WORKER_3);
assertWorkflowRunData(task3, 1, 1);
final ResultDescription finalResult = processWorker3Task(task3, _indexName, _tempStoreName, false);
for (final List<BulkInfo> bulkInfoList : task3.getOutputBulks().values()) {
persistentObjects.addAll(bulkInfoList);
}
waitIfRequested(WORKER_3);
finishFinalTask(task3, finalResult);
assertObjectsDeleted(transientObjects);
assertObjectsExist(persistentObjects);
assertNoWorkflowRunData(task3);
return null;
}
/**
* Calls wait() if currentWorker is equal to the waitWorker specified in the constructor.
*
* @param currentWorker
* the current "worker"
*/
private void waitIfRequested(final String currentWorker) {
if (currentWorker.equals(_waitInWorker)) {
synchronized (this) {
try {
wait(WAIT_TIME_IN_WORKER);
} catch (final InterruptedException ex) {
; // interrupt is same as notify.
}
}
}
}
}
/**
* Callable that starts a job run.
*/
public final class StartJobCallable implements Callable<String> {
/** The JobManager service. */
private final JobManager _jobManager;
/** The jobname. */
private final String _jobName;
/**
* Constructor.
*
* @param jobManager
* the job manager.
* @param jobName
* the job name.
*/
public StartJobCallable(final JobManager jobManager, final String jobName) {
_jobManager = jobManager;
_jobName = jobName;
}
/**
* {@inheritDoc}
*/
@Override
public String call() throws Exception {
try {
return _jobManager.startJob(_jobName);
} catch (final Exception e) {
assertJobManagerException(e, _jobName);
throw e;
}
}
}
/** first worker. */
public static final String WORKER_1 = "inputWorker";
/** second worker. */
public static final String WORKER_2 = "intermediateWorker";
/** third worker. */
public static final String WORKER_3 = "finalWorker";
/** triggered worker. */
public static final String WORKER_4 = "triggeredWorker";
/** worker with optional parameter. */
public static final String WORKER_OPTIONAL_PARAM = "testOptionalParametersWorker";
/** store name. */
public static final String STORE_NAME = "test-store";
/** worker counter name for input objects. */
public static final String INPUT_OBJECT_COUNT = "inputObjectCount";
/** worker counter name for output objects. */
public static final String OUTPUT_OBJECT_COUNT = "outputObjectCount";
/** The time a worker "works". */
private static final int WORKER_SLEEP_TIME = 10;
/** waiting time between invocations to retrieve updated job run data. */
private static final long WAITING_TIME_FOR_JOB_RUN_DATA = 1000L;
/** JobManager service under test. */
protected JobManager _jobManager;
/** TaskStorage service under test. */
protected TaskStorage _taskStorage;
/** definition persistence to use. */
protected DefinitionPersistence _defPersistence;
/** Store service giving access to object store service. */
protected ObjectStoreService _objectStoreService;
/**
* fetch needed services. {@inheritDoc}
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_objectStoreService = getService(ObjectStoreService.class);
for (final String storeName : _objectStoreService.getStoreNames()) {
if ("jobmanager".equals(storeName)) {
_objectStoreService.clearStore(storeName);
} else {
_objectStoreService.removeStore(storeName);
}
}
_objectStoreService.ensureStore(STORE_NAME);
_jobManager = getService(JobManager.class);
assertNotNull(_jobManager);
_defPersistence = _jobManager.getDefinitionPersistence();
assertNotNull(_defPersistence);
_taskStorage = getService(TaskStorage.class);
assertNotNull(_taskStorage);
}
/**
* {@inheritDoc}
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
_taskStorage.clear(); // clear all tasks
}
/**
* utility method to add jobs.
*
* @param jobName
* The name of the job.
* @param workflow
* The name of the workflow.
* @param parameters
* Parameters for the job.
* @throws Exception
* error occurred while adding jobs.
*/
protected void addJob(final String jobName, final String workflow, final AnyMap parameters) throws Exception {
final AnyMap jobAny = DataFactory.DEFAULT.createAnyMap();
jobAny.put("name", jobName);
jobAny.put("workflow", workflow);
jobAny.put("parameters", parameters);
final JobDefinition jobDef = new JobDefinition(jobAny);
_defPersistence.addJob(jobDef);
final JobDefinition checkDef = _defPersistence.getJob(jobName);
assertNotNull(checkDef);
}
/**
* Utility method to start a job. Asserts the jobId is not null.
*
* @param jobName
* The name of the job to start.
* @return The id of the started job run.
* @throws Exception
* error occurred while starting the job.
*/
protected String startJob(final String jobName) throws Exception {
final String jobId = _jobManager.startJob(jobName);
assertNotNull(jobId);
final AnyMap jobRunData = _jobManager.getJobRunData(jobName, jobId);
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData);
return jobId;
}
/**
* Get an initial task and checks some assertions: the task is not null, the worker name is correct, the input is
* null.
*
* @param workerName
* the worker for which this task should be created.
* @param jobName
* the job name.
* @return the created task.
* @throws JobManagerException
* error
*/
public Task getInitialTask(final String workerName, final String jobName) throws JobManagerException {
final Task task = _jobManager.getInitialTask(workerName, jobName);
assertNotNull(task);
assertEquals(workerName, task.getWorkerName());
assertTrue(task.getInputBulks().isEmpty());
return task;
}
/**
* returns a single task after finishing the doneTask. It is asserted, that exactly one resulting task is produced,
* the worker name is as expected and the input of the next task is not empty.
*
* @param doneTask
* the task to finish
* @param result
* the result description for finishing the task
* @param expectedWorkerName
* the worker name that is expected for the follow-up task
* @return the follow-up task of the doneTask
* @throws JobManagerException
* error
*/
public Task getSingleNextTask(final Task doneTask, final ResultDescription result, final String expectedWorkerName)
throws JobManagerException {
final List<Task> nextTasks = finishTask(doneTask, result);
assertEquals(1, nextTasks.size());
final Task nextTask = nextTasks.get(0);
assertEquals(expectedWorkerName, nextTask.getWorkerName());
assertFalse(nextTask.getInputBulks().isEmpty());
return nextTask;
}
/**
* Returns a list of tasks after finishing the done task.
*
* @param doneTask
* the task to finish
* @param result
* the result description for finishing the task
* @return the follow-up tasks of the doneTask
* @throws JobManagerException
* error
*/
public List<Task> getNextTasks(final Task doneTask, final ResultDescription result) throws JobManagerException {
final List<Task> nextTasks = finishTask(doneTask, result);
assertNotNull(nextTasks);
return nextTasks;
}
/**
* Finishes a final task and checks if there are no resulting follow-up tasks.
*
* @param doneTask
* the task to finish
* @param result
* the result description for the doneTask
* @throws JobManagerException
* error
*/
public void finishFinalTask(final Task doneTask, final ResultDescription result) throws JobManagerException {
final List<Task> nextTasks = finishTask(doneTask, result);
assertNotNull(nextTasks);
if (!nextTasks.isEmpty()) {
fail("Expected empty list but contained task: " + nextTasks.get(0).toAny());
}
}
/** finish a task in job manager. */
private List<Task> finishTask(final Task doneTask, final ResultDescription result) throws JobManagerException {
final Task finishTask = doneTask.createFinishTask(result, getClass().getSimpleName());
final List<Task> nextTasks = _jobManager.finishTask(finishTask);
assertNotNull(nextTasks);
return nextTasks;
}
/**
* Processes a task for the bulk-builder worker.
*
* @param task
* the task to be processed.
* @param expectedStore
* the store name.
* @return a ResultDescription for a successful completed task.
* @throws Exception
* error
*/
public ResultDescription processInitialTask(final Task task, final String expectedStore) throws Exception {
int outputCount = 0;
final BulkInfo addBulk = createBulk(task, "output", "Added records go here");
checkRecordBulk(expectedStore, "insertBucket", addBulk);
outputCount++;
work(WORKER_SLEEP_TIME);
return successResult(0, outputCount);
}
/**
* processes a task for the worker2 worker.
*
* @param task
* the task to be processed.
* @param expectedWorkerParam
* the worker param value.
* @param expectedStore
* the store name.
* @return a ResultDescription for a successful completed task.
* @throws Exception
* error
*/
public ResultDescription processWorker2Task(final Task task, final String expectedWorkerParam,
final String expectedStore) throws Exception {
assertWorkerParam(task, expectedWorkerParam);
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0);
checkRecordBulk(expectedStore, "insertBucket", docsBulk);
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName()));
final BulkInfo processedBulk = createBulk(task, "output", "manipulated records go here");
checkRecordBulk(expectedStore, "processedBucket", processedBulk);
work(WORKER_SLEEP_TIME);
return successResult(1, 1);
}
/**
* processes a task for the index-builder worker.
*
* @param task
* the task to be processed.
* @param expectedWorkerParam
* the worker param value.
* @param expectedStore
* the store name.
* @param alsoCreateDeleteBulk
* if 'true' an additional bulk for "deletedRecords" slot will be created.
* @return a ResultDescription for a successful completed task.
* @throws Exception
* error
*/
public ResultDescription processWorker3Task(final Task task, final String expectedWorkerParam,
final String expectedStore, final boolean alsoCreateDeleteBulk) throws Exception {
assertWorkerParam(task, expectedWorkerParam);
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0);
checkRecordBulk(expectedStore, "processedBucket", docsBulk);
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName()));
createBulk(task, "output", "This is a final bulk.");
int outputCount = 1;
if (alsoCreateDeleteBulk) {
createBulk(task, "deletedRecords", "This is a new delete bulk.");
outputCount++;
}
work(WORKER_SLEEP_TIME);
return successResult(1, outputCount);
}
/**
* @return a ResultDescription for a task with completion status SUCCESS.
*/
protected ResultDescription successResult() {
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", new HashMap<String, Number>());
}
/**
* @return a ResultDescription for a task with completion status SUCCESS and two counters.
*/
protected ResultDescription successResult(final int inputObjectCount, final int outputObjectCount) {
final Map<String, Number> counters = new HashMap<String, Number>();
counters.put(INPUT_OBJECT_COUNT, inputObjectCount);
counters.put(OUTPUT_OBJECT_COUNT, outputObjectCount);
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", counters);
}
/**
* @return a ResultDescription for a task with completion status RECOVERABLE_ERROR.
*/
protected ResultDescription recoverableError() {
return new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, "", "", new HashMap<String, Number>());
}
/**
* @return a ResultDescription for a task with completion status FATAL_ERROR.
*/
protected ResultDescription fatalError() {
return new ResultDescription(TaskCompletionStatus.FATAL_ERROR, "", "", new HashMap<String, Number>());
}
/**
* creates a bulk for the given task, slot name and writes the data into the bulk.
*
* @param task
* the task
* @param slotName
* the name of the worker slot
* @param data
* the data to write in the store
* @return the created bulk
* @throws Exception
* error
*/
protected BulkInfo createBulk(final Task task, final String slotName, final String data) throws Exception {
final BulkInfo bulk = task.getOutputBulks().get(slotName).get(0);
assertNotNull("Bulk for slotname '" + slotName + "' for worker '" + task.getWorkerName() + "' is null.", bulk);
_objectStoreService.putObject(bulk.getStoreName(), bulk.getObjectName(), data.getBytes("utf-8"));
return bulk;
}
/**
* Asserts the expectedWorkerParam is set as worker parameter in the task.
*
* @param task
* the task to check
* @param expectedWorkerParam
* the worker value to check
*/
protected void assertWorkerParam(final Task task, final String expectedWorkerParam) {
assertEquals(expectedWorkerParam, task.getParameters().get("workerParameter"));
}
/**
* Checks a record bulk if it is not null, the store name and objectid is correct.
*
* @param expectedStore
* the expected store name
* @param expectedBucket
* the expected bucket name prefix
* @param recordBulk
* the bulk to check
*/
protected void checkRecordBulk(final String expectedStore, final String expectedBucket, final BulkInfo recordBulk) {
assertNotNull(recordBulk);
assertEquals(expectedStore, recordBulk.getStoreName());
assertTrue(recordBulk.getObjectName().startsWith(expectedBucket));
}
/**
* Checks a index partition or deletes if it is not null, the store name and object id are correct.
*
* @param expectedStore
* the expected store name
* @param expectedBucket
* the expected bucket name prefix
* @param recordBulk
* the bulk to check
*/
protected void checkIndexStore(final String expectedStore, final String expectedBucket, final BulkInfo recordBulk) {
assertNotNull(recordBulk);
assertEquals("index-" + expectedStore, recordBulk.getStoreName());
assertTrue(recordBulk.getObjectName().startsWith(expectedBucket));
}
/**
* asserts that a list of objects are deleted from (i.e. not existing in) the store.
*
* @param objects
* the list of objects to check
* @throws ObjectStoreException
* could not check if object exists.
*/
public void assertObjectsDeleted(final List<BulkInfo> objects) throws ObjectStoreException {
for (final BulkInfo object : objects) {
assertFalse("object '" + object.getObjectName() + "' still exists.",
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName()));
}
}
/**
* asserts that a list of objects exist in the store.
*
* @param objects
* the list of objects to check
* @throws ObjectStoreException
* check failed
*/
public void assertObjectsExist(final List<BulkInfo> objects) throws ObjectStoreException {
for (final BulkInfo object : objects) {
assertTrue("object '" + object.getObjectName() + "' does not exists.",
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName()));
}
}
/**
* returns the number of objects with the given prefix in the given store.
*
* @param storeName
* The store name.
* @param objectPrefix
* The prefix of the objects to be counted.
* @return the number of objects with the prefix objectPrefix in the store storeName.
* @throws ObjectStoreException
* accessing object store service failed.
*/
public int getNumberOfObjects(final String storeName, final String objectPrefix) throws ObjectStoreException {
int count = 0;
for (final StoreObject storeObject : _objectStoreService.getStoreObjectInfos(storeName, objectPrefix)) {
if (storeObject.getSize() > 0) {
count++;
}
}
return count;
}
/**
* pretend to work (i.e. sleep) for a while.
*
* @param milliseconds
* the number of milliseconds to "work".
*/
protected void work(final long milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
/**
* asserts that a job is running.
*
* @param jobName
* he job name
* @param jobId
* the job run id.
* @throws JobManagerException
* error
*/
protected void assertJobRunning(final String jobName, final String jobId) throws JobManagerException {
assertJobRunning(jobName, jobId, JobRunMode.STANDARD);
}
/**
* asserts that a job is running in the given run mode.
*/
protected void assertJobRunning(final String jobName, final String jobId, final JobRunMode mode)
throws JobManagerException {
final AnyMap data = _jobManager.getJobRunData(jobName, jobId);
assertNotNull(data);
assertEquals(jobId, AccessAny.getStringRequired(data, "jobId"));
assertEquals("RUNNING", AccessAny.getStringRequired(data, "state"));
assertEquals(JobRunMode.STANDARD.name(), data.getStringValue("runMode"));
}
/**
* get job run data, assert job id.
*/
protected AnyMap assertJobRunData(final String jobName, final String jobId) throws JobManagerException {
AnyMap data = null;
int count = 0;
final int maxCount = 3;
boolean done = false;
do {
data = _jobManager.getJobRunData(jobName, jobId);
try {
if (data == null
|| data.get(JobManagerConstants.DATA_JOB_ID) == null
|| data.get(JobManagerConstants.WORKFLOW_RUN_COUNTER) == null
|| data.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).get(
JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS) == null) {
// ok, data seems to be just gone from zoo keeper into store.
// let's try that again but with a small delay...
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA);
if (++count > maxCount) {
fail("Too many tries to get job run data.");
}
} else {
done = true;
}
} catch (final Exception e) {
e.printStackTrace();
++count;
}
} while (!done);
assertNotNull(data);
assertEquals(jobId, AccessAny.getStringRequired(data, "jobId"));
return data;
}
/**
* Asserts that a job run has the status "SUCCEEDED".
*
* @param jobName
* the job name
* @param jobId
* the job run id
* @return an Any describing the JobRun data
* @throws JobManagerException
* error
*/
protected AnyMap assertJobRunSucceeded(final String jobName, final String jobId) throws JobManagerException {
final AnyMap data = assertJobRunData(jobName, jobId);
assertEquals("SUCCEEDED", AccessAny.getStringRequired(data, "state"));
// assertTaskCounters(data);
return data;
}
/**
* Asserts that a job run has the status "FAILED".
*
* @param jobName
* the job name
* @param jobId
* the job run id
* @return an Any describing the JobRun data
* @throws JobManagerException
* error
*/
protected AnyMap assertJobRunFailed(final String jobName, final String jobId) throws JobManagerException {
final AnyMap data = assertJobRunData(jobName, jobId);
assertEquals("FAILED", AccessAny.getStringRequired(data, "state"));
assertTaskCounters(data);
return data;
}
/**
* Asserts that a job run has the status "SUCCEEDED" with a given number of successful workflow runs.
*
* @param jobName
* the job name
* @param jobId
* the job run id
* @param noOfWorkflowRuns
* the expected number of workflow runs
* @return job run data
* @throws JobManagerException
* error
*/
protected AnyMap assertJobRunSucceeded(final String jobName, final String jobId, final int noOfWorkflowRuns)
throws JobManagerException {
final AnyMap data = assertJobRunSucceeded(jobName, jobId);
final AnyMap workflowRunData = data.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
assertEquals("jobrun - wrong number of successful workflow runs: " + data.toString(), noOfWorkflowRuns,
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS).intValue());
return data;
}
/**
* Asserts that a job run has the status "CANCELLED".
*
* @param jobName
* the job name
* @param jobRunId
* the job run id
* @return job run data
* @throws JobManagerException
* error
*/
protected AnyMap assertJobRunCanceled(final String jobName, final String jobRunId) throws JobManagerException {
final AnyMap data = assertJobRunData(jobName, jobRunId);
assertEquals(JobState.CANCELED.name(), AccessAny.getStringRequired(data, JobManagerConstants.DATA_JOB_STATE));
return data;
}
/**
* assert tasks counter in job run data.
*/
protected void assertWorkflowCounters(final int startedWorkflows, final int activeWorkflows,
final int successfulWorkflows, final int failedWorkflows, final int canceledWorkflows, final AnyMap jobRunData) {
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
assertEquals(startedWorkflows,
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS).intValue());
assertEquals(activeWorkflows,
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS).intValue());
assertEquals(successfulWorkflows,
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS).intValue());
assertEquals(failedWorkflows,
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_WORKFLOW_RUNS).intValue());
assertEquals(canceledWorkflows,
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELED_WORKFLOW_RUNS).intValue());
}
/**
* assert tasks counter in job run data.
*/
protected void assertTaskCounters(final int createdTasks, final int succeededTasks, final int retriedAfterError,
final int retriedAfterTimeout, final int failedWithoutRetry, final int failedAfterRetry,
final int cancelledTasks, final AnyMap jobRunData) {
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER);
assertEquals(createdTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue());
assertEquals(succeededTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS)
.intValue());
assertEquals(retriedAfterError, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER)
.intValue());
assertEquals(retriedAfterTimeout, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL)
.intValue());
assertEquals(failedAfterRetry, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED)
.intValue());
assertEquals(failedWithoutRetry,
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue());
assertEquals(cancelledTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS)
.intValue());
}
/**
* assert tasks counter sum in job run data.
*/
protected void assertTaskCounters(final AnyMap jobRunData) {
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER);
final int createdTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue();
final int succeededTasks =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS).intValue();
final int retriedByWorker =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER).intValue();
final int retriedByTtl = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL).intValue();
final int failedAfterRetry =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED).intValue();
final int failedByWorker =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue();
final int cancelledTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS).intValue();
final int obsoleteTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS).intValue();
assertEquals(createdTasks, succeededTasks + retriedByWorker + retriedByTtl + failedByWorker + failedAfterRetry
+ cancelledTasks + obsoleteTasks);
}
/**
* asserts that the first job run of the completed runs for the job jobName has the status "FAILED".
*
* @param jobName
* the name of the job to check
* @throws JobManagerException
* error
*/
protected void assertFailedJobRunExists(final String jobName) throws JobManagerException {
final Collection<String> completedRuns = _jobManager.getCompletedJobRunIds(jobName);
assertNotNull(completedRuns);
assertFalse(completedRuns.isEmpty());
final String jobId = completedRuns.iterator().next();
final AnyMap data = _jobManager.getJobRunData(jobName, jobId);
assertNotNull(data);
assertEquals(jobId, AccessAny.getStringRequired(data, "jobId"));
assertEquals("FAILED", AccessAny.getStringRequired(data, "state"));
}
/**
* Asserts the exception is a JobManagerException with the job name in the exception message.
*
* @param ex
* the exception to check
* @param jobName
* the job name to check in the text
*/
protected void assertJobManagerException(final Exception ex, final String jobName) {
assertTrue("This should be a JobManagerException for job '" + jobName + "': " + ex,
ex instanceof JobManagerException);
assertTrue("message should contain quoted job name", ex.toString().contains("'" + jobName + "'"));
}
/**
* Waits for a job to start its workflow runs.
*
* @param jobName
* the name of the job
* @param jobId
* the id of the job run
* @param minimumNumberOfWorkflowRuns
* minimum number of started workflow runs
* @param maxWaitTime
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail.
* @throws Exception
* an exception occurred while trying to get job run data
*/
protected void waitForJobRunStarted(final String jobName, final String jobId,
final int minimumNumberOfWorkflowRuns, final long maxWaitTime) throws Exception {
boolean startedEnoughWorkflowRuns = false;
final long millisStarted = System.currentTimeMillis();
do {
try {
final AnyMap jobRunData = _jobManager.getJobRunData(jobName, jobId);
if (jobRunData != null) {
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
if (workflowRunData != null && workflowRunData != null
&& workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS) != null) {
final int numberOfStartedWorkflowRuns =
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS).intValue();
if (numberOfStartedWorkflowRuns >= minimumNumberOfWorkflowRuns) {
startedEnoughWorkflowRuns = true;
}
}
if (!startedEnoughWorkflowRuns) {
assertTrue("Waited too long for job to start.",
System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA);
}
}
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
} while (!startedEnoughWorkflowRuns);
}
/**
* Waits for a job to be completed.
*/
protected void waitForJobRunCompleted(final String jobName, final String jobId, final long maxWaitTime)
throws Exception {
final long sleepTime = 500L;
final long millisStarted = System.currentTimeMillis();
while (true) {
final Collection<String> completedIds = _jobManager.getCompletedJobRunIds(jobName);
if (completedIds.contains(jobId)) {
return;
}
assertTrue("Waited too long for job to complete", System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(sleepTime);
}
}
/**
* wait until no job run is active for given job.
*
* @param jobName
* the name of the job
* @param maxWaitTime
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail.
*/
protected void waitForNoActiveJobRun(final String jobName, final long maxWaitTime) throws Exception {
JobRunInfo jobRunInfo = null;
final long millisStarted = System.currentTimeMillis();
do {
jobRunInfo = _jobManager.getJobRunInfo(jobName);
if (jobRunInfo != null) {
assertTrue("Waited too long", System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA);
}
} while (jobRunInfo != null);
}
/**
* check worker task counters.
*/
protected void assertWorkerCounters(final AnyMap jobRunData, final String workerName,
final int expectedSuccessful, final int expectedRetriedTimeout, final int expectedRetriedWorker,
final int expectedFailedRetried, final int expectedFailedNotRetried, final int expectedObsolete)
throws Exception {
assertTrue(jobRunData.containsKey("worker"));
final AnyMap workerData = jobRunData.getMap("worker");
assertTrue(workerData.containsKey(workerName));
final AnyMap workerCounter = workerData.getMap(workerName);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, expectedSuccessful);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL,
expectedRetriedTimeout);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER,
expectedRetriedWorker);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED,
expectedFailedRetried);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED,
expectedFailedNotRetried);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS, expectedObsolete);
}
/**
* check worker task counters.
*/
protected void assertWorkerCounters(final AnyMap jobRunData, final String workerName,
final int expectedSuccessful, final int expectedInputCount, final int expectedOutputCount) throws Exception {
assertTrue(jobRunData.containsKey("worker"));
final AnyMap workerData = jobRunData.getMap("worker");
assertTrue(workerData.containsKey(workerName));
final AnyMap workerCounter = workerData.getMap(workerName);
if (expectedSuccessful > 0) {
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS,
expectedSuccessful);
assertEquals(expectedInputCount, workerCounter.getLongValue(INPUT_OBJECT_COUNT).intValue());
assertEquals(expectedOutputCount, workerCounter.getLongValue(OUTPUT_OBJECT_COUNT).intValue());
}
}
/**
* assert worker task counter, if value greater than zero expected.
*/
protected void assertWorkerTaskCounter(final AnyMap workerCounter, final String name, final int expected) {
if (expected > 0) {
assertTrue("missing counter for " + name, workerCounter.containsKey(name));
assertEquals("wrong counter value for " + name, expected, workerCounter.getLongValue(name).intValue());
}
}
/**
* Creates the job's parameters.
*
* @param workerParameterValue
* value for workerParameter
* @param storeName
* store name
* @param tempStoreName
* the name of the temp store
* @return parameter Any
* @throws Exception
* any exception during creation of Any.
*/
protected AnyMap createJobParameters(final String workerParameterValue, final String storeName,
final String tempStoreName) throws Exception {
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put("store", storeName);
parameters.put("tempStore", tempStoreName);
parameters.put("workerParameter", workerParameterValue);
return parameters;
}
/** check if the workflow run data for the task have the correct task and bulk counters. */
protected void assertWorkflowRunData(final Task task, final int activeTasks, final int transientBulks)
throws Exception {
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME);
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID);
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID);
final AnyMap workflowData = _jobManager.getWorkflowRunData(jobName, jobId, workflowId);
assertNotNull(workflowData);
assertEquals(activeTasks, workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_ACTIVE_TASKS)
.intValue());
assertEquals(transientBulks,
workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_TRANSIENT_BULKS).intValue());
}
/** check if the workflow run data for the task have the correct task and bulk counters. */
protected void assertNoWorkflowRunData(final Task task) throws Exception {
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME);
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID);
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID);
try {
final AnyMap workflowData = _jobManager.getWorkflowRunData(jobName, jobId, workflowId);
fail("there should be no workflow run data, but got " + workflowData);
} catch (final JobManagerException ex) {
; // OK
}
}
/**
* @param actualValues
* actual values
* @param expectedValues
* expected values
*/
protected <T> void assertValues(final Collection<T> actualValues, final T... expectedValues) {
assertEquals(expectedValues.length, actualValues.size());
for (final T expectedValue : expectedValues) {
assertTrue("collection does not contain value " + expectedValue, actualValues.contains(expectedValue));
}
}
}