/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.Callable; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.jobmanager.JobDefinition; | |
import org.eclipse.smila.jobmanager.JobManager; | |
import org.eclipse.smila.jobmanager.JobManagerConstants; | |
import org.eclipse.smila.jobmanager.JobManagerException; | |
import org.eclipse.smila.jobmanager.JobRunInfo; | |
import org.eclipse.smila.jobmanager.JobRunMode; | |
import org.eclipse.smila.jobmanager.JobState; | |
import org.eclipse.smila.jobmanager.internal.AccessAny; | |
import org.eclipse.smila.jobmanager.persistence.DefinitionPersistence; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.objectstore.StoreObject; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
/** | |
* Base class for JobManager run tests: defines methods for managing jobs, workflows and simulating task processing. | |
*/ | |
public abstract class JobManagerTestBase extends DeclarativeServiceTestCase { | |
/** | |
* Test class to simulate standard workflow run. | |
* | |
*/ | |
public class StandardJobClient implements Callable<Void> { | |
/** Wait time in msa thread is waiting to be notified when being hold in a worker. */ | |
private static final int WAIT_TIME_IN_WORKER = 5000; | |
/** The job name. */ | |
private final String _jobName; | |
/** The index name. */ | |
private final String _indexName; | |
/** The temporary store name. */ | |
private final String _tempStoreName; | |
/** The worker that should trigger a wait(). */ | |
private final String _waitInWorker; | |
/** | |
* Constuctor of StandardJobClient. | |
* | |
* @param jobName | |
* the job name | |
* @param indexName | |
* the index name | |
* @param tempStoreName | |
* the temp storage name. | |
*/ | |
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName) { | |
super(); | |
_jobName = jobName; | |
_indexName = indexName; | |
_tempStoreName = tempStoreName; | |
_waitInWorker = null; | |
} | |
/** | |
* Constuctor of StandardJobClient that calls wait() whe a specific worker should "work". | |
* | |
* @param jobName | |
* the job name | |
* @param indexName | |
* the index name | |
* @param tempStoreName | |
* the temp storage name. | |
* @param waitInWorker | |
* the worker where the workflow run should call wait() | |
*/ | |
public StandardJobClient(final String jobName, final String indexName, final String tempStoreName, | |
final String waitInWorker) { | |
super(); | |
_jobName = jobName; | |
_indexName = indexName; | |
_tempStoreName = tempStoreName; | |
_waitInWorker = waitInWorker; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public Void call() throws Exception { | |
final List<BulkInfo> transientObjects = new ArrayList<BulkInfo>(); | |
final List<BulkInfo> persistentObjects = new ArrayList<BulkInfo>(); | |
final Task task1 = getInitialTask(WORKER_1, _jobName); | |
assertWorkflowRunData(task1, 1, 0); | |
final ResultDescription inputResult = processInitialTask(task1, _tempStoreName); | |
// first result bucket is transient | |
for (final List<BulkInfo> bulkInfoList : task1.getOutputBulks().values()) { | |
transientObjects.addAll(bulkInfoList); | |
} | |
waitIfRequested(WORKER_1); | |
final Task task2 = getSingleNextTask(task1, inputResult, WORKER_2); | |
assertWorkflowRunData(task2, 1, 1); | |
final ResultDescription intermediateResult = processWorker2Task(task2, _indexName, _tempStoreName); | |
// this buckets is persistent... | |
waitIfRequested(WORKER_2); | |
for (final List<BulkInfo> bulkInfoList : task2.getOutputBulks().values()) { | |
persistentObjects.addAll(bulkInfoList); | |
} | |
final Task task3 = getSingleNextTask(task2, intermediateResult, WORKER_3); | |
assertWorkflowRunData(task3, 1, 1); | |
final ResultDescription finalResult = processWorker3Task(task3, _indexName, _tempStoreName, false); | |
for (final List<BulkInfo> bulkInfoList : task3.getOutputBulks().values()) { | |
persistentObjects.addAll(bulkInfoList); | |
} | |
waitIfRequested(WORKER_3); | |
finishFinalTask(task3, finalResult); | |
assertObjectsDeleted(transientObjects); | |
assertObjectsExist(persistentObjects); | |
assertNoWorkflowRunData(task3); | |
return null; | |
} | |
/** | |
* Calls wait() if currentWorker is equal to the waitWorker specified in the constructor. | |
* | |
* @param currentWorker | |
* the current "worker" | |
*/ | |
private void waitIfRequested(final String currentWorker) { | |
if (currentWorker.equals(_waitInWorker)) { | |
synchronized (this) { | |
try { | |
wait(WAIT_TIME_IN_WORKER); | |
} catch (final InterruptedException ex) { | |
; // interrupt is same as notify. | |
} | |
} | |
} | |
} | |
} | |
/** | |
* Callable that starts a job run. | |
*/ | |
public final class StartJobCallable implements Callable<String> { | |
/** The JobManager service. */ | |
private final JobManager _jobManager; | |
/** The jobname. */ | |
private final String _jobName; | |
/** | |
* Constructor. | |
* | |
* @param jobManager | |
* the job manager. | |
* @param jobName | |
* the job name. | |
*/ | |
public StartJobCallable(final JobManager jobManager, final String jobName) { | |
_jobManager = jobManager; | |
_jobName = jobName; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public String call() throws Exception { | |
try { | |
return _jobManager.startJob(_jobName); | |
} catch (final Exception e) { | |
assertJobManagerException(e, _jobName); | |
throw e; | |
} | |
} | |
} | |
/** first worker. */ | |
public static final String WORKER_1 = "inputWorker"; | |
/** second worker. */ | |
public static final String WORKER_2 = "intermediateWorker"; | |
/** third worker. */ | |
public static final String WORKER_3 = "finalWorker"; | |
/** triggered worker. */ | |
public static final String WORKER_4 = "triggeredWorker"; | |
/** worker with optional parameter. */ | |
public static final String WORKER_OPTIONAL_PARAM = "testOptionalParametersWorker"; | |
/** store name. */ | |
public static final String STORE_NAME = "test-store"; | |
/** worker counter name for input objects. */ | |
public static final String INPUT_OBJECT_COUNT = "inputObjectCount"; | |
/** worker counter name for output objects. */ | |
public static final String OUTPUT_OBJECT_COUNT = "outputObjectCount"; | |
/** The time a worker "works". */ | |
private static final int WORKER_SLEEP_TIME = 10; | |
/** waiting time between invocations to retrieve updated job run data. */ | |
private static final long WAITING_TIME_FOR_JOB_RUN_DATA = 1000L; | |
/** JobManager service under test. */ | |
protected JobManager _jobManager; | |
/** TaskStorage service under test. */ | |
protected TaskStorage _taskStorage; | |
/** definition persistence to use. */ | |
protected DefinitionPersistence _defPersistence; | |
/** Store service giving access to object store service. */ | |
protected ObjectStoreService _objectStoreService; | |
/** | |
* fetch needed services. {@inheritDoc} | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_objectStoreService = getService(ObjectStoreService.class); | |
for (final String storeName : _objectStoreService.getStoreNames()) { | |
if ("jobmanager".equals(storeName)) { | |
_objectStoreService.clearStore(storeName); | |
} else { | |
_objectStoreService.removeStore(storeName); | |
} | |
} | |
_objectStoreService.ensureStore(STORE_NAME); | |
_jobManager = getService(JobManager.class); | |
assertNotNull(_jobManager); | |
_defPersistence = _jobManager.getDefinitionPersistence(); | |
assertNotNull(_defPersistence); | |
_taskStorage = getService(TaskStorage.class); | |
assertNotNull(_taskStorage); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
_taskStorage.clear(); // clear all tasks | |
} | |
/** | |
* utility method to add jobs. | |
* | |
* @param jobName | |
* The name of the job. | |
* @param workflow | |
* The name of the workflow. | |
* @param parameters | |
* Parameters for the job. | |
* @throws Exception | |
* error occurred while adding jobs. | |
*/ | |
protected void addJob(final String jobName, final String workflow, final AnyMap parameters) throws Exception { | |
final AnyMap jobAny = DataFactory.DEFAULT.createAnyMap(); | |
jobAny.put("name", jobName); | |
jobAny.put("workflow", workflow); | |
jobAny.put("parameters", parameters); | |
final JobDefinition jobDef = new JobDefinition(jobAny); | |
_defPersistence.addJob(jobDef); | |
final JobDefinition checkDef = _defPersistence.getJob(jobName); | |
assertNotNull(checkDef); | |
} | |
/** | |
* Utility method to start a job. Asserts the jobId is not null. | |
* | |
* @param jobName | |
* The name of the job to start. | |
* @return The id of the started job run. | |
* @throws Exception | |
* error occurred while starting the job. | |
*/ | |
protected String startJob(final String jobName) throws Exception { | |
final String jobId = _jobManager.startJob(jobName); | |
assertNotNull(jobId); | |
final AnyMap jobRunData = _jobManager.getJobRunData(jobName, jobId); | |
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData); | |
return jobId; | |
} | |
/** | |
* Get an initial task and checks some assertions: the task is not null, the worker name is correct, the input is | |
* null. | |
* | |
* @param workerName | |
* the worker for which this task should be created. | |
* @param jobName | |
* the job name. | |
* @return the created task. | |
* @throws JobManagerException | |
* error | |
*/ | |
public Task getInitialTask(final String workerName, final String jobName) throws JobManagerException { | |
final Task task = _jobManager.getInitialTask(workerName, jobName); | |
assertNotNull(task); | |
assertEquals(workerName, task.getWorkerName()); | |
assertTrue(task.getInputBulks().isEmpty()); | |
return task; | |
} | |
/** | |
* returns a single task after finishing the doneTask. It is asserted, that exactly one resulting task is produced, | |
* the worker name is as expected and the input of the next task is not empty. | |
* | |
* @param doneTask | |
* the task to finish | |
* @param result | |
* the result description for finishing the task | |
* @param expectedWorkerName | |
* the worker name that is expected for the follow-up task | |
* @return the follow-up task of the doneTask | |
* @throws JobManagerException | |
* error | |
*/ | |
public Task getSingleNextTask(final Task doneTask, final ResultDescription result, final String expectedWorkerName) | |
throws JobManagerException { | |
final List<Task> nextTasks = finishTask(doneTask, result); | |
assertEquals(1, nextTasks.size()); | |
final Task nextTask = nextTasks.get(0); | |
assertEquals(expectedWorkerName, nextTask.getWorkerName()); | |
assertFalse(nextTask.getInputBulks().isEmpty()); | |
return nextTask; | |
} | |
/** | |
* Returns a list of tasks after finishing the done task. | |
* | |
* @param doneTask | |
* the task to finish | |
* @param result | |
* the result description for finishing the task | |
* @return the follow-up tasks of the doneTask | |
* @throws JobManagerException | |
* error | |
*/ | |
public List<Task> getNextTasks(final Task doneTask, final ResultDescription result) throws JobManagerException { | |
final List<Task> nextTasks = finishTask(doneTask, result); | |
assertNotNull(nextTasks); | |
return nextTasks; | |
} | |
/** | |
* Finishes a final task and checks if there are no resulting follow-up tasks. | |
* | |
* @param doneTask | |
* the task to finish | |
* @param result | |
* the result description for the doneTask | |
* @throws JobManagerException | |
* error | |
*/ | |
public void finishFinalTask(final Task doneTask, final ResultDescription result) throws JobManagerException { | |
final List<Task> nextTasks = finishTask(doneTask, result); | |
assertNotNull(nextTasks); | |
if (!nextTasks.isEmpty()) { | |
fail("Expected empty list but contained task: " + nextTasks.get(0).toAny()); | |
} | |
} | |
/** finish a task in job manager. */ | |
private List<Task> finishTask(final Task doneTask, final ResultDescription result) throws JobManagerException { | |
final Task finishTask = doneTask.createFinishTask(result, getClass().getSimpleName()); | |
final List<Task> nextTasks = _jobManager.finishTask(finishTask); | |
assertNotNull(nextTasks); | |
return nextTasks; | |
} | |
/** | |
* Processes a task for the bulk-builder worker. | |
* | |
* @param task | |
* the task to be processed. | |
* @param expectedStore | |
* the store name. | |
* @return a ResultDescription for a successful completed task. | |
* @throws Exception | |
* error | |
*/ | |
public ResultDescription processInitialTask(final Task task, final String expectedStore) throws Exception { | |
int outputCount = 0; | |
final BulkInfo addBulk = createBulk(task, "output", "Added records go here"); | |
checkRecordBulk(expectedStore, "insertBucket", addBulk); | |
outputCount++; | |
work(WORKER_SLEEP_TIME); | |
return successResult(0, outputCount); | |
} | |
/** | |
* processes a task for the worker2 worker. | |
* | |
* @param task | |
* the task to be processed. | |
* @param expectedWorkerParam | |
* the worker param value. | |
* @param expectedStore | |
* the store name. | |
* @return a ResultDescription for a successful completed task. | |
* @throws Exception | |
* error | |
*/ | |
public ResultDescription processWorker2Task(final Task task, final String expectedWorkerParam, | |
final String expectedStore) throws Exception { | |
assertWorkerParam(task, expectedWorkerParam); | |
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0); | |
checkRecordBulk(expectedStore, "insertBucket", docsBulk); | |
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName())); | |
final BulkInfo processedBulk = createBulk(task, "output", "manipulated records go here"); | |
checkRecordBulk(expectedStore, "processedBucket", processedBulk); | |
work(WORKER_SLEEP_TIME); | |
return successResult(1, 1); | |
} | |
/** | |
* processes a task for the index-builder worker. | |
* | |
* @param task | |
* the task to be processed. | |
* @param expectedWorkerParam | |
* the worker param value. | |
* @param expectedStore | |
* the store name. | |
* @param alsoCreateDeleteBulk | |
* if 'true' an additional bulk for "deletedRecords" slot will be created. | |
* @return a ResultDescription for a successful completed task. | |
* @throws Exception | |
* error | |
*/ | |
public ResultDescription processWorker3Task(final Task task, final String expectedWorkerParam, | |
final String expectedStore, final boolean alsoCreateDeleteBulk) throws Exception { | |
assertWorkerParam(task, expectedWorkerParam); | |
final BulkInfo docsBulk = task.getInputBulks().get("input").get(0); | |
checkRecordBulk(expectedStore, "processedBucket", docsBulk); | |
assertTrue(_objectStoreService.existsObject(docsBulk.getStoreName(), docsBulk.getObjectName())); | |
createBulk(task, "output", "This is a final bulk."); | |
int outputCount = 1; | |
if (alsoCreateDeleteBulk) { | |
createBulk(task, "deletedRecords", "This is a new delete bulk."); | |
outputCount++; | |
} | |
work(WORKER_SLEEP_TIME); | |
return successResult(1, outputCount); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status SUCCESS. | |
*/ | |
protected ResultDescription successResult() { | |
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status SUCCESS and two counters. | |
*/ | |
protected ResultDescription successResult(final int inputObjectCount, final int outputObjectCount) { | |
final Map<String, Number> counters = new HashMap<String, Number>(); | |
counters.put(INPUT_OBJECT_COUNT, inputObjectCount); | |
counters.put(OUTPUT_OBJECT_COUNT, outputObjectCount); | |
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", counters); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status RECOVERABLE_ERROR. | |
*/ | |
protected ResultDescription recoverableError() { | |
return new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status FATAL_ERROR. | |
*/ | |
protected ResultDescription fatalError() { | |
return new ResultDescription(TaskCompletionStatus.FATAL_ERROR, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* creates a bulk for the given task, slot name and writes the data into the bulk. | |
* | |
* @param task | |
* the task | |
* @param slotName | |
* the name of the worker slot | |
* @param data | |
* the data to write in the store | |
* @return the created bulk | |
* @throws Exception | |
* error | |
*/ | |
protected BulkInfo createBulk(final Task task, final String slotName, final String data) throws Exception { | |
final BulkInfo bulk = task.getOutputBulks().get(slotName).get(0); | |
assertNotNull("Bulk for slotname '" + slotName + "' for worker '" + task.getWorkerName() + "' is null.", bulk); | |
_objectStoreService.putObject(bulk.getStoreName(), bulk.getObjectName(), data.getBytes("utf-8")); | |
return bulk; | |
} | |
/** | |
* Asserts the expectedWorkerParam is set as worker parameter in the task. | |
* | |
* @param task | |
* the task to check | |
* @param expectedWorkerParam | |
* the worker value to check | |
*/ | |
protected void assertWorkerParam(final Task task, final String expectedWorkerParam) { | |
assertEquals(expectedWorkerParam, task.getParameters().get("workerParameter")); | |
} | |
/** | |
* Checks a record bulk if it is not null, the store name and objectid is correct. | |
* | |
* @param expectedStore | |
* the expected store name | |
* @param expectedBucket | |
* the expected bucket name prefix | |
* @param recordBulk | |
* the bulk to check | |
*/ | |
protected void checkRecordBulk(final String expectedStore, final String expectedBucket, final BulkInfo recordBulk) { | |
assertNotNull(recordBulk); | |
assertEquals(expectedStore, recordBulk.getStoreName()); | |
assertTrue(recordBulk.getObjectName().startsWith(expectedBucket)); | |
} | |
/** | |
* Checks a index partition or deletes if it is not null, the store name and object id are correct. | |
* | |
* @param expectedStore | |
* the expected store name | |
* @param expectedBucket | |
* the expected bucket name prefix | |
* @param recordBulk | |
* the bulk to check | |
*/ | |
protected void checkIndexStore(final String expectedStore, final String expectedBucket, final BulkInfo recordBulk) { | |
assertNotNull(recordBulk); | |
assertEquals("index-" + expectedStore, recordBulk.getStoreName()); | |
assertTrue(recordBulk.getObjectName().startsWith(expectedBucket)); | |
} | |
/** | |
* asserts that a list of objects are deleted from (i.e. not existing in) the store. | |
* | |
* @param objects | |
* the list of objects to check | |
* @throws ObjectStoreException | |
* could not check if object exists. | |
*/ | |
public void assertObjectsDeleted(final List<BulkInfo> objects) throws ObjectStoreException { | |
for (final BulkInfo object : objects) { | |
assertFalse("object '" + object.getObjectName() + "' still exists.", | |
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName())); | |
} | |
} | |
/** | |
* asserts that a list of objects exist in the store. | |
* | |
* @param objects | |
* the list of objects to check | |
* @throws ObjectStoreException | |
* check failed | |
*/ | |
public void assertObjectsExist(final List<BulkInfo> objects) throws ObjectStoreException { | |
for (final BulkInfo object : objects) { | |
assertTrue("object '" + object.getObjectName() + "' does not exists.", | |
_objectStoreService.existsObject(object.getStoreName(), object.getObjectName())); | |
} | |
} | |
/** | |
* returns the number of objects with the given prefix in the given store. | |
* | |
* @param storeName | |
* The store name. | |
* @param objectPrefix | |
* The prefix of the objects to be counted. | |
* @return the number of objects with the prefix objectPrefix in the store storeName. | |
* @throws ObjectStoreException | |
* accessing object store service failed. | |
*/ | |
public int getNumberOfObjects(final String storeName, final String objectPrefix) throws ObjectStoreException { | |
int count = 0; | |
for (final StoreObject storeObject : _objectStoreService.getStoreObjectInfos(storeName, objectPrefix)) { | |
if (storeObject.getSize() > 0) { | |
count++; | |
} | |
} | |
return count; | |
} | |
/** | |
* pretend to work (i.e. sleep) for a while. | |
* | |
* @param milliseconds | |
* the number of milliseconds to "work". | |
*/ | |
protected void work(final long milliseconds) { | |
try { | |
Thread.sleep(milliseconds); | |
} catch (final InterruptedException ex) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
/** | |
* asserts that a job is running. | |
* | |
* @param jobName | |
* he job name | |
* @param jobId | |
* the job run id. | |
* @throws JobManagerException | |
* error | |
*/ | |
protected void assertJobRunning(final String jobName, final String jobId) throws JobManagerException { | |
assertJobRunning(jobName, jobId, JobRunMode.STANDARD); | |
} | |
/** | |
* asserts that a job is running in the given run mode. | |
*/ | |
protected void assertJobRunning(final String jobName, final String jobId, final JobRunMode mode) | |
throws JobManagerException { | |
final AnyMap data = _jobManager.getJobRunData(jobName, jobId); | |
assertNotNull(data); | |
assertEquals(jobId, AccessAny.getStringRequired(data, "jobId")); | |
assertEquals("RUNNING", AccessAny.getStringRequired(data, "state")); | |
assertEquals(JobRunMode.STANDARD.name(), data.getStringValue("runMode")); | |
} | |
/** | |
* get job run data, assert job id. | |
*/ | |
protected AnyMap assertJobRunData(final String jobName, final String jobId) throws JobManagerException { | |
AnyMap data = null; | |
int count = 0; | |
final int maxCount = 3; | |
boolean done = false; | |
do { | |
data = _jobManager.getJobRunData(jobName, jobId); | |
try { | |
if (data == null | |
|| data.get(JobManagerConstants.DATA_JOB_ID) == null | |
|| data.get(JobManagerConstants.WORKFLOW_RUN_COUNTER) == null | |
|| data.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).get( | |
JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS) == null) { | |
// ok, data seems to be just gone from zoo keeper into store. | |
// let's try that again but with a small delay... | |
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA); | |
if (++count > maxCount) { | |
fail("Too many tries to get job run data."); | |
} | |
} else { | |
done = true; | |
} | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
++count; | |
} | |
} while (!done); | |
assertNotNull(data); | |
assertEquals(jobId, AccessAny.getStringRequired(data, "jobId")); | |
return data; | |
} | |
/** | |
* Asserts that a job run has the status "SUCCEEDED". | |
* | |
* @param jobName | |
* the job name | |
* @param jobId | |
* the job run id | |
* @return an Any describing the JobRun data | |
* @throws JobManagerException | |
* error | |
*/ | |
protected AnyMap assertJobRunSucceeded(final String jobName, final String jobId) throws JobManagerException { | |
final AnyMap data = assertJobRunData(jobName, jobId); | |
assertEquals("SUCCEEDED", AccessAny.getStringRequired(data, "state")); | |
// assertTaskCounters(data); | |
return data; | |
} | |
/** | |
* Asserts that a job run has the status "FAILED". | |
* | |
* @param jobName | |
* the job name | |
* @param jobId | |
* the job run id | |
* @return an Any describing the JobRun data | |
* @throws JobManagerException | |
* error | |
*/ | |
protected AnyMap assertJobRunFailed(final String jobName, final String jobId) throws JobManagerException { | |
final AnyMap data = assertJobRunData(jobName, jobId); | |
assertEquals("FAILED", AccessAny.getStringRequired(data, "state")); | |
assertTaskCounters(data); | |
return data; | |
} | |
/** | |
* Asserts that a job run has the status "SUCCEEDED" with a given number of successful workflow runs. | |
* | |
* @param jobName | |
* the job name | |
* @param jobId | |
* the job run id | |
* @param noOfWorkflowRuns | |
* the expected number of workflow runs | |
* @return job run data | |
* @throws JobManagerException | |
* error | |
*/ | |
protected AnyMap assertJobRunSucceeded(final String jobName, final String jobId, final int noOfWorkflowRuns) | |
throws JobManagerException { | |
final AnyMap data = assertJobRunSucceeded(jobName, jobId); | |
final AnyMap workflowRunData = data.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); | |
assertEquals("jobrun - wrong number of successful workflow runs: " + data.toString(), noOfWorkflowRuns, | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS).intValue()); | |
return data; | |
} | |
/** | |
* Asserts that a job run has the status "CANCELLED". | |
* | |
* @param jobName | |
* the job name | |
* @param jobRunId | |
* the job run id | |
* @return job run data | |
* @throws JobManagerException | |
* error | |
*/ | |
protected AnyMap assertJobRunCanceled(final String jobName, final String jobRunId) throws JobManagerException { | |
final AnyMap data = assertJobRunData(jobName, jobRunId); | |
assertEquals(JobState.CANCELED.name(), AccessAny.getStringRequired(data, JobManagerConstants.DATA_JOB_STATE)); | |
return data; | |
} | |
/** | |
* assert tasks counter in job run data. | |
*/ | |
protected void assertWorkflowCounters(final int startedWorkflows, final int activeWorkflows, | |
final int successfulWorkflows, final int failedWorkflows, final int canceledWorkflows, final AnyMap jobRunData) { | |
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); | |
assertEquals(startedWorkflows, | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS).intValue()); | |
assertEquals(activeWorkflows, | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS).intValue()); | |
assertEquals(successfulWorkflows, | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS).intValue()); | |
assertEquals(failedWorkflows, | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_WORKFLOW_RUNS).intValue()); | |
assertEquals(canceledWorkflows, | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELED_WORKFLOW_RUNS).intValue()); | |
} | |
/** | |
* assert tasks counter in job run data. | |
*/ | |
protected void assertTaskCounters(final int createdTasks, final int succeededTasks, final int retriedAfterError, | |
final int retriedAfterTimeout, final int failedWithoutRetry, final int failedAfterRetry, | |
final int cancelledTasks, final AnyMap jobRunData) { | |
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER); | |
assertEquals(createdTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue()); | |
assertEquals(succeededTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS) | |
.intValue()); | |
assertEquals(retriedAfterError, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER) | |
.intValue()); | |
assertEquals(retriedAfterTimeout, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL) | |
.intValue()); | |
assertEquals(failedAfterRetry, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED) | |
.intValue()); | |
assertEquals(failedWithoutRetry, | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue()); | |
assertEquals(cancelledTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS) | |
.intValue()); | |
} | |
/** | |
* assert tasks counter sum in job run data. | |
*/ | |
protected void assertTaskCounters(final AnyMap jobRunData) { | |
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER); | |
final int createdTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue(); | |
final int succeededTasks = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS).intValue(); | |
final int retriedByWorker = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER).intValue(); | |
final int retriedByTtl = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL).intValue(); | |
final int failedAfterRetry = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED).intValue(); | |
final int failedByWorker = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue(); | |
final int cancelledTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS).intValue(); | |
final int obsoleteTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS).intValue(); | |
assertEquals(createdTasks, succeededTasks + retriedByWorker + retriedByTtl + failedByWorker + failedAfterRetry | |
+ cancelledTasks + obsoleteTasks); | |
} | |
/** | |
* asserts that the first job run of the completed runs for the job jobName has the status "FAILED". | |
* | |
* @param jobName | |
* the name of the job to check | |
* @throws JobManagerException | |
* error | |
*/ | |
protected void assertFailedJobRunExists(final String jobName) throws JobManagerException { | |
final Collection<String> completedRuns = _jobManager.getCompletedJobRunIds(jobName); | |
assertNotNull(completedRuns); | |
assertFalse(completedRuns.isEmpty()); | |
final String jobId = completedRuns.iterator().next(); | |
final AnyMap data = _jobManager.getJobRunData(jobName, jobId); | |
assertNotNull(data); | |
assertEquals(jobId, AccessAny.getStringRequired(data, "jobId")); | |
assertEquals("FAILED", AccessAny.getStringRequired(data, "state")); | |
} | |
/** | |
* Asserts the exception is a JobManagerException with the job name in the exception message. | |
* | |
* @param ex | |
* the exception to check | |
* @param jobName | |
* the job name to check in the text | |
*/ | |
protected void assertJobManagerException(final Exception ex, final String jobName) { | |
assertTrue("This should be a JobManagerException for job '" + jobName + "': " + ex, | |
ex instanceof JobManagerException); | |
assertTrue("message should contain quoted job name", ex.toString().contains("'" + jobName + "'")); | |
} | |
/** | |
* Waits for a job to start its workflow runs. | |
* | |
* @param jobName | |
* the name of the job | |
* @param jobId | |
* the id of the job run | |
* @param minimumNumberOfWorkflowRuns | |
* minimum number of started workflow runs | |
* @param maxWaitTime | |
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail. | |
* @throws Exception | |
* an exception occurred while trying to get job run data | |
*/ | |
protected void waitForJobRunStarted(final String jobName, final String jobId, | |
final int minimumNumberOfWorkflowRuns, final long maxWaitTime) throws Exception { | |
boolean startedEnoughWorkflowRuns = false; | |
final long millisStarted = System.currentTimeMillis(); | |
do { | |
try { | |
final AnyMap jobRunData = _jobManager.getJobRunData(jobName, jobId); | |
if (jobRunData != null) { | |
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); | |
if (workflowRunData != null && workflowRunData != null | |
&& workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS) != null) { | |
final int numberOfStartedWorkflowRuns = | |
workflowRunData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS).intValue(); | |
if (numberOfStartedWorkflowRuns >= minimumNumberOfWorkflowRuns) { | |
startedEnoughWorkflowRuns = true; | |
} | |
} | |
if (!startedEnoughWorkflowRuns) { | |
assertTrue("Waited too long for job to start.", | |
System.currentTimeMillis() - millisStarted <= maxWaitTime); | |
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA); | |
} | |
} | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
throw e; | |
} | |
} while (!startedEnoughWorkflowRuns); | |
} | |
/** | |
* Waits for a job to be completed. | |
*/ | |
protected void waitForJobRunCompleted(final String jobName, final String jobId, final long maxWaitTime) | |
throws Exception { | |
final long sleepTime = 500L; | |
final long millisStarted = System.currentTimeMillis(); | |
while (true) { | |
final Collection<String> completedIds = _jobManager.getCompletedJobRunIds(jobName); | |
if (completedIds.contains(jobId)) { | |
return; | |
} | |
assertTrue("Waited too long for job to complete", System.currentTimeMillis() - millisStarted <= maxWaitTime); | |
Thread.sleep(sleepTime); | |
} | |
} | |
/** | |
* wait until no job run is active for given job. | |
* | |
* @param jobName | |
* the name of the job | |
* @param maxWaitTime | |
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail. | |
*/ | |
protected void waitForNoActiveJobRun(final String jobName, final long maxWaitTime) throws Exception { | |
JobRunInfo jobRunInfo = null; | |
final long millisStarted = System.currentTimeMillis(); | |
do { | |
jobRunInfo = _jobManager.getJobRunInfo(jobName); | |
if (jobRunInfo != null) { | |
assertTrue("Waited too long", System.currentTimeMillis() - millisStarted <= maxWaitTime); | |
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA); | |
} | |
} while (jobRunInfo != null); | |
} | |
/** | |
* check worker task counters. | |
*/ | |
protected void assertWorkerCounters(final AnyMap jobRunData, final String workerName, | |
final int expectedSuccessful, final int expectedRetriedTimeout, final int expectedRetriedWorker, | |
final int expectedFailedRetried, final int expectedFailedNotRetried, final int expectedObsolete) | |
throws Exception { | |
assertTrue(jobRunData.containsKey("worker")); | |
final AnyMap workerData = jobRunData.getMap("worker"); | |
assertTrue(workerData.containsKey(workerName)); | |
final AnyMap workerCounter = workerData.getMap(workerName); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, expectedSuccessful); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL, | |
expectedRetriedTimeout); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER, | |
expectedRetriedWorker); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED, | |
expectedFailedRetried); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED, | |
expectedFailedNotRetried); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS, expectedObsolete); | |
} | |
/** | |
* check worker task counters. | |
*/ | |
protected void assertWorkerCounters(final AnyMap jobRunData, final String workerName, | |
final int expectedSuccessful, final int expectedInputCount, final int expectedOutputCount) throws Exception { | |
assertTrue(jobRunData.containsKey("worker")); | |
final AnyMap workerData = jobRunData.getMap("worker"); | |
assertTrue(workerData.containsKey(workerName)); | |
final AnyMap workerCounter = workerData.getMap(workerName); | |
if (expectedSuccessful > 0) { | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, | |
expectedSuccessful); | |
assertEquals(expectedInputCount, workerCounter.getLongValue(INPUT_OBJECT_COUNT).intValue()); | |
assertEquals(expectedOutputCount, workerCounter.getLongValue(OUTPUT_OBJECT_COUNT).intValue()); | |
} | |
} | |
/** | |
* assert worker task counter, if value greater than zero expected. | |
*/ | |
protected void assertWorkerTaskCounter(final AnyMap workerCounter, final String name, final int expected) { | |
if (expected > 0) { | |
assertTrue("missing counter for " + name, workerCounter.containsKey(name)); | |
assertEquals("wrong counter value for " + name, expected, workerCounter.getLongValue(name).intValue()); | |
} | |
} | |
/** | |
* Creates the job's parameters. | |
* | |
* @param workerParameterValue | |
* value for workerParameter | |
* @param storeName | |
* store name | |
* @param tempStoreName | |
* the name of the temp store | |
* @return parameter Any | |
* @throws Exception | |
* any exception during creation of Any. | |
*/ | |
protected AnyMap createJobParameters(final String workerParameterValue, final String storeName, | |
final String tempStoreName) throws Exception { | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put("store", storeName); | |
parameters.put("tempStore", tempStoreName); | |
parameters.put("workerParameter", workerParameterValue); | |
return parameters; | |
} | |
/** check if the workflow run data for the task have the correct task and bulk counters. */ | |
protected void assertWorkflowRunData(final Task task, final int activeTasks, final int transientBulks) | |
throws Exception { | |
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME); | |
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID); | |
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID); | |
final AnyMap workflowData = _jobManager.getWorkflowRunData(jobName, jobId, workflowId); | |
assertNotNull(workflowData); | |
assertEquals(activeTasks, workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_ACTIVE_TASKS) | |
.intValue()); | |
assertEquals(transientBulks, | |
workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_TRANSIENT_BULKS).intValue()); | |
} | |
/** check if the workflow run data for the task have the correct task and bulk counters. */ | |
protected void assertNoWorkflowRunData(final Task task) throws Exception { | |
final String jobName = task.getProperties().get(Task.PROPERTY_JOB_NAME); | |
final String jobId = task.getProperties().get(Task.PROPERTY_JOB_RUN_ID); | |
final String workflowId = task.getProperties().get(Task.PROPERTY_WORKFLOW_RUN_ID); | |
try { | |
final AnyMap workflowData = _jobManager.getWorkflowRunData(jobName, jobId, workflowId); | |
fail("there should be no workflow run data, but got " + workflowData); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
} | |
/** | |
* @param actualValues | |
* actual values | |
* @param expectedValues | |
* expected values | |
*/ | |
protected <T> void assertValues(final Collection<T> actualValues, final T... expectedValues) { | |
assertEquals(expectedValues.length, actualValues.size()); | |
for (final T expectedValue : expectedValues) { | |
assertTrue("collection does not contain value " + expectedValue, actualValues.contains(expectedValue)); | |
} | |
} | |
} |