blob: 61b9c56b33c8d3dd096e6e2eb769015881331886 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* <pre>
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation,
* Andreas Schank (Attensity Europe GmbH) - additional tests
* </pre>
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.Any.ValueType;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.jobmanager.JobRunDataProvider;
import org.eclipse.smila.jobmanager.JobRunEngine;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.jobmanager.JobTaskProcessor;
import org.eclipse.smila.jobmanager.definitions.DefinitionPersistence;
import org.eclipse.smila.jobmanager.definitions.JobDefinition;
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants;
import org.eclipse.smila.jobmanager.exceptions.JobManagerException;
import org.eclipse.smila.jobmanager.taskgenerator.TaskGeneratorException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskList;
import org.eclipse.smila.taskmanager.TaskManager;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.test.taskgenerator.ExceptionalTaskGenerator;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/**
* Test for TaskManager service.
*
*/
public class TestTaskManager extends DeclarativeServiceTestCase {
/** keep alive schedule time in ms. */
protected static final int KEEP_ALIVE_SCHEDULE_TIME = 1000;
/** waiting time in ms. */
protected static final int WAITING_TIME = 1000;
/** waiting time between invocations to retrieve updated job run data. */
protected static final long WAITING_TIME_FOR_JOB_RUN_DATA = 1000L;
/** Time to live in ms. */
protected static final long TIME_TO_LIVE = 5500L;
/** Definition Persistence reference. */
protected static DefinitionPersistence s_defPersistence;
/** the inprogress queue. */
protected static final String TASK_SECTION_INPROGRESS = "inprogress";
/** tempStore param name. */
private static final String TEMP_STORE_PARAM = "tempStore";
/** store param name. */
private static final String STORE_PARAM = "store";
/** test store name. */
private static final String STORE_NAME = "test-store";
/** output slot for workers. */
private static final String OUTPUT_SLOT = "output";
/** name of input worker. */
private static final String INPUT_WORKER = "input-worker";
/** name of intermediate worker. */
private static final String INTERMEDIATE_WORKER = "intermediate-worker";
/** name of intermediate worker that requests a completion task. */
private static final String INTERMEDIATE_COMPLETION_WORKER = "intermediate-completion-worker";
/** name of final worker. */
private static final String FINAL_WORKER = "final-worker";
/** reference to task manager service. */
private static TaskManager s_taskManager;
/** reference to jobdata provider service. */
private static JobRunDataProvider s_jobDataProvider;
/** reference to jobmanager task processor. */
private static JobTaskProcessor s_jobTaskProcessor;
/** reference to jobrun engine service. */
private static JobRunEngine s_jobRunEngine;
/** keep alive tasks. */
protected List<KeepAliveEntry> _tasksToBeKeptAlive = Collections
.synchronizedList(new ArrayList<KeepAliveEntry>());
/** reference to object store service. */
private ObjectStoreService _store;
/** keep alive executor. */
private final ScheduledExecutorService _keepAliveExecutor = Executors.newSingleThreadScheduledExecutor();
/** keep worker name with task. */
protected class KeepAliveEntry {
/** the worker name to send the keep alive for. */
private final String _workerName;
/** the task to be kept alive. */
private final Task _task;
/**
* Creates a new KeepAliveEntry.
*
* @param workerName
* the worker name
* @param task
* the task to be kept alive for workerName
*/
public KeepAliveEntry(final String workerName, final Task task) {
_workerName = workerName;
_task = task;
}
}
/**
* Keeps tasks alive (on a very simple way).
*/
protected class KeepAlive implements Runnable {
/** task list reference. */
private final List<KeepAliveEntry> _taskList;
/**
* Constructor.
*
* @param taskList
* reference to task list
*/
public KeepAlive(final List<KeepAliveEntry> taskList) {
_taskList = taskList;
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
try {
final List<KeepAliveEntry> tasksCopy = new ArrayList<KeepAliveEntry>(_taskList.size());
synchronized (_taskList) {
for (final KeepAliveEntry task : _taskList) {
tasksCopy.add(task);
}
}
for (final KeepAliveEntry keepAliveEntry : tasksCopy) {
try {
s_taskManager.keepAlive(keepAliveEntry._workerName, keepAliveEntry._task.getTaskId());
if (_log.isDebugEnabled()) {
_log.debug("keeping alive task " + keepAliveEntry._task.getTaskId() + " for "
+ keepAliveEntry._workerName);
}
} catch (final TaskmanagerException e) {
; // ignore
e.printStackTrace();
}
}
} catch (final Throwable e) {
; // ignore
e.printStackTrace();
}
}
}
/** {@inheritDoc} */
@Override
protected void setUp() throws Exception {
super.setUp();
_store = getService(ObjectStoreService.class);
_store.ensureStore(STORE_NAME);
if (s_taskManager == null) {
s_taskManager = getService(TaskManager.class);
}
if (s_jobDataProvider == null) {
s_jobDataProvider = getService(JobRunDataProvider.class);
}
if (s_jobRunEngine == null) {
s_jobRunEngine = getService(JobRunEngine.class);
}
if (s_jobTaskProcessor == null) {
s_jobTaskProcessor = getService(JobTaskProcessor.class);
}
if (s_defPersistence == null) {
s_defPersistence = getService(DefinitionPersistence.class);
}
_keepAliveExecutor.scheduleAtFixedRate(new KeepAlive(_tasksToBeKeptAlive), KEEP_ALIVE_SCHEDULE_TIME,
KEEP_ALIVE_SCHEDULE_TIME, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@Override
protected void tearDown() throws Exception {
// get rid of any finishing tasks, that may have been left unheeded.
simulateFinishTaskWorker();
_keepAliveExecutor.shutdownNow();
_store.removeStore(STORE_NAME);
super.tearDown();
}
/**
* @throws Exception
* unexpected error
*/
public void testService() throws Exception {
final TaskManager taskManager = getService(TaskManager.class);
assertNotNull(taskManager);
}
/**
* Tests exception thrown when requests are made for an invalid worker name.
*
* @throws Exception
* any exception that is not expected here.
*/
public void testInvalidWorkerName() throws Exception {
try {
s_taskManager.getTask("INVALID_WORKER", null);
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.workerName, ex.getCauseCode());
}
try {
s_taskManager.finishTask("INVALID_WORKER", "taskId", successResult());
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.workerName, ex.getCauseCode());
}
try {
s_taskManager.keepAlive("INVALID_WORKER", "taskId");
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.workerName, ex.getCauseCode());
}
}
/**
* tests requests with incorrect task id.
*
* @throws Exception
* an exception occurred.
*/
public void testInvalidTaskId() throws Exception {
try {
s_taskManager.finishTask(INTERMEDIATE_WORKER, "taskId", successResult());
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode());
}
try {
s_taskManager.keepAlive(INTERMEDIATE_WORKER, "taskId");
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode());
}
}
/**
* Tests handling of a recoverable error.
*
* @throws Exception
* any exception that is not expected.
*/
public void testRecoverableError() throws Exception {
final String jobName = "testRecoverableError";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
finishTask(intermediateTask1, recoverableError());
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask2);
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId()));
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks());
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks());
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask2, successResult());
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData);
assertWorkerCounters(INTERMEDIATE_WORKER, 1, 0, 1, 0, 0, 0, jobRunData);
}
/**
* Tests handling of a fatal error.
*
* @throws Exception
* any exception that is not expected.
*/
public void testFatalError() throws Exception {
final String jobName = "testFatalError";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
finishTask(intermediateTask1, fatalError());
assertTaskCounters(2, 1, 0, 0, 1, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertWorkerCounters(INTERMEDIATE_WORKER, 0, 0, 0, 0, 1, 0, jobRunData);
}
/**
* Tests handling of a rollback because of an exceeded time-to-live.
*
* @throws Exception
* any exception that is not expected.
*/
public void testTimeToLiveRollback() throws Exception {
final String jobName = "testTimeToLiveRollback";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
Thread.sleep(2 * TIME_TO_LIVE);
simulateFinishTaskWorker();
assertTaskCounters(3, 1, 0, 1, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
try {
keepAlive(intermediateTask1);
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode());
}
try {
finishTask(intermediateTask1, successResult());
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode());
}
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask2);
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId()));
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks());
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks());
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask2, successResult());
assertTaskCounters(4, 2, 0, 1, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertWorkerCounters(INTERMEDIATE_WORKER, 1, 1, 0, 0, 0, 0, jobRunData);
assertTaskCounters(4, 3, 0, 1, 0, 0, 0, jobRunData);
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
}
/**
* Tests handling of a keep alive.
*
* @throws Exception
* any exception that is not expected.
*/
public void testKeepAlive() throws Exception {
final String jobName = "testKeepAlive";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate");
for (int i = 0; i < 2 * TIME_TO_LIVE / WAITING_TIME; i++) {
keepAlive(intermediateTask);
Thread.sleep(WAITING_TIME);
}
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
finishTask(intermediateTask, successResult());
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
}
/**
* Tests handling of a recoverable error.
*
* @throws Exception
* any exception that is not expected.
*/
public void testRecoverableErrorAutocommit() throws Exception {
final String jobName = "testRecoverableErrorAutocommit";
startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, recoverableError());
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask, successResult());
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
}
/**
* Tests handling of a success for autocommit bulk.
*
* @throws Exception
* any exception that is not expected.
*/
public void testTimeToLiveAutocommit() throws Exception {
final String jobName = "testTimeToLiveAutocommit";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
Thread.sleep(2 * TIME_TO_LIVE);
simulateFinishTaskWorker();
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask, successResult());
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
}
/**
* Tests handling of a fatal error for autocommit bulk.
*
* @throws Exception
* any exception that is not expected.
*/
public void testFatalErrorAutocommit() throws Exception {
final String jobName = "testFatalErrorAutocommit";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, fatalError());
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
}
/**
* test for postpone a task.
*/
public void testPostponeTask() throws Exception {
final String jobName = "testPostponeTask";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
Thread.sleep(100);
Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask);
final String createdTime = intermediateTask.getProperties().get(Task.PROPERTY_CREATED_TIME);
assertTrue(intermediateTask.getProperties().containsKey(Task.PROPERY_TASK_AGE));
final long initialTaskAge = Long.parseLong(intermediateTask.getProperties().get(Task.PROPERY_TASK_AGE));
assertTrue(initialTaskAge >= 100);
long postponedTaskAge = 0;
for (int i = 0; i < 100; i++) {
finishTask(intermediateTask, postponeResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
final Task postponedTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(postponedTask);
assertEquals(intermediateTask.getTaskId(), postponedTask.getTaskId());
assertEquals(intermediateTask.getParameters(), postponedTask.getParameters());
// check how often task has been postponed.
assertEquals(Integer.toString(i + 1), postponedTask.getProperties().get(Task.PROPERTY_POSTPONED));
// check that creation time has not been changed after postponing.
assertEquals(createdTime, postponedTask.getProperties().get(Task.PROPERTY_CREATED_TIME));
assertTrue(postponedTask.getProperties().containsKey(Task.PROPERY_TASK_AGE));
postponedTaskAge = Long.parseLong(postponedTask.getProperties().get(Task.PROPERY_TASK_AGE));
assertTrue(postponedTaskAge >= initialTaskAge);
intermediateTask = postponedTask;
}
assertTrue(postponedTaskAge > initialTaskAge);
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask, successResult());
assertTaskCounters(3, 2, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
s_jobRunEngine.finishJob(jobName, jobId);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData);
assertWorkerCounters(INTERMEDIATE_WORKER, 1, 0, 0, 0, 0, 0, jobRunData);
}
/**
* Test for cancelJob, task manager should remove the respective tasks.
*
* @throws Exception
* any exception that is not expected.
*/
public void testCancel() throws Exception {
final String jobName = "testCancel";
final String jobId = startDummyJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, OUTPUT_SLOT);
cancelJob(jobName, jobId);
try {
finishTask(initialTask, successResult());
fail("Should not happen, finish on removed task is not possible.");
} catch (final BadParameterTaskmanagerException bpte) {
assertNotNull(bpte);
}
AnyMap jobRunData = assertJobRunCanceled(jobName, jobId);
assertTaskCounters(1, 0, 0, 0, 0, 0, 1, jobRunData);
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNull(intermediateTask);
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(1, 0, 0, 0, 0, 0, 1, jobRunData);
}
/**
* Cancel one of two simultaneously running jobs and check if just the correct tasks are removed.
*
* @throws Exception
* any exception that is not expected.
*/
public void testCancelOneOfTwo() throws Exception {
final String jobName1 = "testCancelOneOfTwo1";
final String jobName2 = "testCancelOneOfTwo2";
final String jobId1 = startDummyJob(jobName1);
final String jobId2 = startDummyJob(jobName2);
try {
assertJobRunning(jobName1, jobId1);
assertJobRunning(jobName2, jobId2);
final Task initialTask1 = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName1);
createBulk(initialTask1, OUTPUT_SLOT, OUTPUT_SLOT);
final Task initialTask2 = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName2);
createBulk(initialTask2, OUTPUT_SLOT, OUTPUT_SLOT);
cancelJob(jobName1, jobId1);
try {
finishTask(initialTask1, successResult());
fail("should not work");
} catch (final TaskmanagerException ex) {
assertNotNull(ex);
}
finishTask(initialTask2, successResult());
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
assertNotNull(intermediateTask);
final AnyMap jobRunData1 = assertJobRunCanceled(jobName1, jobId1);
assertTaskCounters(1, 0, 0, 0, 0, 0, 1, jobRunData1);
final AnyMap jobRunData2 = s_jobDataProvider.getJobRunData(jobName2, jobId2);
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, jobRunData2);
} finally {
cancelJob(jobName2, jobId2);
}
}
/**
* test if no tasks can appear in a qualified queue after finishing a qualified task.
*/
public void testNoTodoTasksAfterFinishingQualifiedQueue() throws Exception {
try {
final String workerName = "qualfiedWorker";
final String qualifier = "q1";
final Task task1 = new Task("1", workerName);
task1.setQualifier(qualifier);
s_taskManager.addTask(task1);
final Task task2 = new Task("2", workerName);
task2.setQualifier(qualifier);
s_taskManager.addTask(task2);
final Task inProgressTask = s_taskManager.getTask(workerName, null, Arrays.asList(qualifier));
assertNotNull(inProgressTask);
assertTaskManagerCounter(workerName, 1, 1);
s_taskManager.finishTasks(workerName, Arrays.asList(qualifier), new ResultDescription(
TaskCompletionStatus.OBSOLETE, null, null, null));
assertTaskManagerCounter(workerName, 0, 1);
s_taskManager.finishTask(workerName, inProgressTask.getTaskId(), new ResultDescription(
TaskCompletionStatus.POSTPONE, null, null, null));
assertTaskManagerCounter(workerName, 0, 0);
final Task task3 = new Task("3", workerName);
task3.setQualifier(qualifier);
s_taskManager.addTask(task3);
assertTaskManagerCounter(workerName, 0, 0);
assertNull(s_taskManager.getTask(workerName, null, Arrays.asList(qualifier)));
} finally {
simulateFinishTaskWorker();
}
}
/**
* test completion workflow run with recoverable error during completion.
*/
public void testCompletionTasksForCompletionRequestingWorkersWithIntermediateRecoverableError() throws Exception {
final String jobName = getName();
final String jobId = startDummyCompletionJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
finishTask(intermediateTask1, recoverableError());
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask2);
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId()));
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks());
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks());
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask2, successResult());
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData);
s_jobRunEngine.finishJob(jobName, jobId);
// check that we get a completion task (but only for out intermediate worker!)
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(completionTask);
assertTrue(completionTask.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK));
assertEquals(Boolean.TRUE,
Boolean.valueOf(completionTask.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK)));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// check the correct state of the job
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState());
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(5, 3, 1, 0, 0, 0, 0, jobRunData);
// first recoverable error on completion task...
finishTask(completionTask, recoverableError());
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(6, 3, 2, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task completionTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(completionTask2);
assertEquals(Boolean.TRUE,
Boolean.valueOf(completionTask2.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK)));
finishTask(completionTask2, successResult());
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(6, 4, 2, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 2, 0, 2, 0, 0, 0, jobRunData);
assertEquals(
Long.valueOf(1),
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue(
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS));
assertEquals(
JobState.SUCCEEDED,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
}
/**
* test completion workflow run with timeout error during completion.
*/
public void testCompletionTasksForCompletionRequestingWorkersWithTimeoutDuringCompletion() throws Exception {
final String jobName = getName();
final String jobId = startDummyCompletionJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask);
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask, successResult());
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData);
s_jobRunEngine.finishJob(jobName, jobId);
// check that we get a completion task (but only for out intermediate worker!)
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(completionTask);
assertTrue(completionTask.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK));
assertEquals(Boolean.TRUE,
Boolean.valueOf(completionTask.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK)));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// check the correct state of the job
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState());
// sleep a bit:
Thread.sleep(2 * TIME_TO_LIVE);
// first recoverable error on completion task...
try {
finishTask(completionTask, recoverableError());
fail("task must have timed out.");
} catch (final BadParameterTaskmanagerException e) {
;// ignore
}
// run the finish task worker.
simulateFinishTaskWorker();
// second try:
final Task completionTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(completionTask2);
assertTrue(completionTask2.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK));
assertEquals(Boolean.TRUE,
Boolean.valueOf(completionTask2.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK)));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// check the correct state of the job
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState());
finishTask(completionTask2, successResult());
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(5, 4, 0, 1, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 2, 1, 0, 0, 0, 0, jobRunData);
assertEquals(
Long.valueOf(1),
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue(
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS));
assertEquals(
JobState.SUCCEEDED,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
}
/**
* test completion workflow run with fatal error during completion.
*/
public void testCompletionTasksForCompletionRequestingWorkersWithFatalErrorDuringCompletion() throws Exception {
final String jobName = getName();
final String jobId = startDummyCompletionJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
finishTask(intermediateTask1, recoverableError());
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask2);
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId()));
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks());
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks());
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask2, successResult());
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData);
s_jobRunEngine.finishJob(jobName, jobId);
// check that we get a completion task (but only for out intermediate worker!)
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(completionTask);
assertTrue(completionTask.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK));
assertEquals(Boolean.TRUE,
Boolean.valueOf(completionTask.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK)));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// check the correct state of the job
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState());
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(5, 3, 1, 0, 0, 0, 0, jobRunData);
// first recoverable error on completion task...
finishTask(completionTask, fatalError());
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(5, 3, 1, 0, 1, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
// we won�t get another task here...
final Task completionTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNull(completionTask2);
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(5, 3, 1, 0, 1, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 1, 0, 1, 0, 1, 0, jobRunData);
assertEquals(
Long.valueOf(1),
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue(
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS));
// still succeeded.
assertEquals(
JobState.SUCCEEDED,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
}
/**
* test completion workflow run with exception during completion task generation.
*/
public void testCompletionTasksForCompletionRequestingWorkersWithExceptionDuringTaskGeneration() throws Exception {
final String jobName = getName();
final String jobId = startDummyCompletionJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
finishTask(intermediateTask1, recoverableError());
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask2);
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId()));
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks());
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks());
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask2, successResult());
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData);
// set the task generator to throw an exception
ExceptionalTaskGenerator.setExceptionOnNextCompletionTask(true);
try {
s_jobRunEngine.finishJob(jobName, jobId);
fail("exception requested.");
} catch (final JobManagerException e) {
assertTrue(e.getCause() instanceof TaskGeneratorException);
}
// Job must have failed.
// check that we will get no completion task (but only for out intermediate worker!)
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// check the correct state of the job
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertEquals(
Long.valueOf(1),
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue(
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS));
assertEquals(
Long.valueOf(1),
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue(
JobManagerConstants.DATA_JOB_NO_OF_FAILED_WORKFLOW_RUNS));
// failed.
assertEquals(
JobState.FAILED,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
}
/**
* test completion workflow run with unchecked exception during completion task generation.
*/
public void testCompletionTasksForCompletionRequestingWorkersWithUncheckedExceptionDuringTaskGeneration()
throws Exception {
final String jobName = getName();
final String jobId = startDummyCompletionJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask1);
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
finishTask(intermediateTask1, recoverableError());
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNotNull(intermediateTask2);
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId()));
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks());
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks());
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask2, successResult());
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId));
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null);
createBulk(finalTask, OUTPUT_SLOT, "final");
finishTask(finalTask, successResult());
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData);
// set the task generator to throw an exception
ExceptionalTaskGenerator.setUncheckedExceptionOnNextCompletionTask(true);
try {
s_jobRunEngine.finishJob(jobName, jobId);
fail("exception requested.");
} catch (final IllegalArgumentException e) {
;//
}
// check that we will get no completion task (but only for out intermediate worker!)
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertEquals(
JobState.COMPLETING,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
// Job must have still be cancelable
s_jobRunEngine.cancelJob(jobName, jobId);
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null));
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// check the correct state of the job
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
// canceled
assertEquals(
JobState.CANCELED,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
}
/**
* test that no completion workflow run is started when the jobrun would fail.
*/
public void testNoCompletionWorkflowForFailedJobRun() throws Exception {
final String jobName = getName();
final String jobId = startDummyCompletionJob(jobName);
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
// fail hard.
finishTask(initialTask, fatalError());
// no standard task
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNull(intermediateTask1);
// finish this failed run.
s_jobRunEngine.finishJob(jobName, jobId);
// check that we get no (completion) tasks whatsoever
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null);
assertNull(completionTask);
assertNull(s_taskManager.getTask(INPUT_WORKER, null));
assertNull(s_taskManager.getTask(FINAL_WORKER, null));
// but the job run will get finished, but this time with FAILED status.
waitForJobRunFinished(jobName, jobId, WAITING_TIME);
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
assertWorkerCounters(INPUT_WORKER, 0, 0, 0, 0, 1, 0, jobRunData);
// if the worker has any entries they must all be 0:
if (jobRunData.getMap("worker").containsKey(INTERMEDIATE_COMPLETION_WORKER)) {
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 0, 0, 0, 0, 0, 0, jobRunData);
}
final AnyMap workflowRuns = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
// if the completion workflow runs are available, they must be 0.
if (workflowRuns.containsKey(JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)) {
assertEquals(Long.valueOf(0),
workflowRuns.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS));
}
assertEquals(Long.valueOf(1),
workflowRuns.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS));
assertEquals(Long.valueOf(0),
workflowRuns.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS));
// still succeeded.
assertEquals(
JobState.FAILED,
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue(
JobManagerConstants.DATA_JOB_STATE)));
}
/**
* Test handling of a throttled job: 1 throttled job, workers ask in parallel for tasks.
*/
public void testThrottledJob() throws Exception {
final int expectedNoOfTasks = 3;
final long taskDelay = 1000;
final String jobName = "testThrottledJob";
final String jobId = startThrottledJob(jobName, taskDelay, INTERMEDIATE_WORKER);
try {
createTasksForIntermediateWorker(expectedNoOfTasks, jobName, jobId);
long previousTaskTime = 0;
int taskCount = 0;
while (taskCount < expectedNoOfTasks) {
final long getTaskTime = System.currentTimeMillis();
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
if (intermediateTask != null) {
assertEquals(jobName, intermediateTask.getProperties().get(Task.PROPERTY_JOB_NAME));
final long interval = System.currentTimeMillis() - previousTaskTime;
assertTrue("task #" + taskCount + " delivered to early after only " + interval + "ms. Task properties: "
+ intermediateTask.getProperties(), interval >= taskDelay);
previousTaskTime = getTaskTime;
_tasksToBeKeptAlive.add(new KeepAliveEntry(INTERMEDIATE_WORKER, intermediateTask));
taskCount++;
}
}
} finally {
cancelJob(jobName, jobId);
}
}
/**
* Test handling of a throttled job: 1 throttled job, workers ask in sequence for tasks.
*/
public void testThrottledJobWithFinishedTasks() throws Exception {
final int expectedNoOfTasks = 3;
final long taskDelay = 1000;
final String jobName = "testThrottledJobWithFinishedTasks";
final String jobId = startThrottledJob(jobName, taskDelay, INTERMEDIATE_WORKER);
try {
createTasksForIntermediateWorker(expectedNoOfTasks, jobName, jobId);
long previousTaskTime = 0;
int taskCount = 0;
while (taskCount < expectedNoOfTasks) {
final long getTaskTime = System.currentTimeMillis();
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
if (intermediateTask != null) {
final long interval = System.currentTimeMillis() - previousTaskTime;
assertFalse("task #" + taskCount + " delivered to early after only " + interval + "ms. Task properties: "
+ intermediateTask.getProperties(), interval < taskDelay);
previousTaskTime = getTaskTime;
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate");
finishTask(intermediateTask, successResult());
taskCount++;
}
}
} finally {
cancelJob(jobName, jobId);
}
}
/**
* Test handling of a throttled job: throttled job runs at the same time as a normal job.
*/
public void testThrottledVsNormalJob() throws Exception {
final int expectedNoOfThrottledTasks = 3;
final int expectedNoOfNormalTasks = 100;
final long taskDelay = 1000;
final String throttledJobName = "testThrottledVsNormalJob-throttled";
final String throttledJobId = startThrottledJob(throttledJobName, taskDelay, INTERMEDIATE_WORKER);
final String normalJobName = "testThrottledVsNormalJob-normal";
final String normalJobId = startDummyJob(normalJobName);
try {
createTasksForIntermediateWorker(expectedNoOfThrottledTasks, throttledJobName, throttledJobId);
createTasksForIntermediateWorker(expectedNoOfNormalTasks, normalJobName, normalJobId);
long previousThrottledTaskTime = 0;
int throttledTaskCount = 0;
int normalTaskCount = 0;
while (throttledTaskCount < expectedNoOfThrottledTasks || normalTaskCount < expectedNoOfNormalTasks) {
final long getTaskTime = System.currentTimeMillis();
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
if (intermediateTask != null) {
final boolean isThrottledTask =
intermediateTask.getProperties().get(Task.PROPERTY_JOB_NAME).equals(throttledJobName);
if (isThrottledTask) {
throttledTaskCount++;
assertFalse("too many throttled tasks", throttledTaskCount > expectedNoOfThrottledTasks);
final long interval = System.currentTimeMillis() - previousThrottledTaskTime;
assertFalse("task #" + throttledTaskCount + " delivered to early after only " + interval
+ "ms. Task properties: " + intermediateTask.getProperties(), interval < taskDelay);
previousThrottledTaskTime = getTaskTime;
} else {
normalTaskCount++;
assertFalse("too many normal tasks", normalTaskCount > expectedNoOfNormalTasks);
}
_tasksToBeKeptAlive.add(new KeepAliveEntry(INTERMEDIATE_WORKER, intermediateTask));
}
}
} finally {
cancelJob(throttledJobName, throttledJobId);
cancelJob(normalJobName, normalJobId);
}
}
/**
* Test handling of a throttled job: multiple throttled job run at the same time with different delays.
*/
public void testMultipleThrottledJobs() throws Exception {
final int expectedNoOfTasks = 3;
final long[] taskDelay = new long[] { 1000, 1500, 500 };
final String[] jobName = new String[3];
final String[] jobId = new String[3];
for (int i = 0; i < jobName.length; i++) {
jobName[i] = "testMultipleThrottledJobs-" + i;
jobId[i] = startThrottledJob(jobName[i], taskDelay[i], INTERMEDIATE_WORKER);
}
try {
for (int j = 0; j < expectedNoOfTasks; j++) {
for (int i = 0; i < jobName.length; i++) {
createTasksForIntermediateWorker(1, jobName[i], jobId[i]);
}
}
final long[] previousTaskTime = new long[jobName.length];
final int[] taskCount = new int[jobName.length];
while (_tasksToBeKeptAlive.size() < expectedNoOfTasks * jobName.length) {
final long getTaskTime = System.currentTimeMillis();
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null);
if (intermediateTask != null) {
final String taskJobName = intermediateTask.getProperties().get(Task.PROPERTY_JOB_NAME);
final int jobIndex = Arrays.binarySearch(jobName, taskJobName);
assertTrue("got task for unknown job " + taskJobName, jobIndex >= 0);
taskCount[jobIndex]++;
assertFalse("too many tasks for job " + taskJobName, taskCount[jobIndex] > expectedNoOfTasks);
final long interval = System.currentTimeMillis() - previousTaskTime[jobIndex];
assertFalse("task #" + taskCount[jobIndex] + " of job " + taskJobName + " delivered to early after only "
+ interval + "ms. Task properties: " + intermediateTask.getProperties(), interval < taskDelay[jobIndex]);
previousTaskTime[jobIndex] = getTaskTime;
_tasksToBeKeptAlive.add(new KeepAliveEntry(INTERMEDIATE_WORKER, intermediateTask));
}
}
} finally {
for (int i = 0; i < jobName.length; i++) {
cancelJob(jobName[i], jobId[i]);
}
}
}
private void createTasksForIntermediateWorker(final int expectedNoOfTasks, final String jobName,
final String jobId) throws Exception {
for (int i = 0; i < expectedNoOfTasks; i++) {
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName);
assertNotNull(initialTask);
createBulk(initialTask, OUTPUT_SLOT, "input");
finishTask(initialTask, successResult());
}
}
/**
* check task counters for worker.
*/
private void assertTaskManagerCounter(final String workerName, final int expectedTodo,
final int expectedInProgress) throws TaskmanagerException {
final Map<String, TaskCounter> counters = s_taskManager.getTaskCounters();
final TaskCounter counter = counters.get(workerName);
assertEquals(expectedTodo, counter.getTasksTodo());
assertEquals(expectedInProgress, counter.getTasksInProgress());
}
protected void keepAlive(final List<Task> tasks) throws Exception {
for (final Task task : tasks) {
keepAlive(task);
}
}
/**
* Sends a keep alive to the task manager for a worker.
*
* @param task
* The task
* @throws Exception
* An exception if something goes wrong
*/
protected void keepAlive(final Task task) throws Exception {
s_taskManager.keepAlive(task.getWorkerName(), task.getTaskId());
}
/**
* Starts the dummy job.
*
* @param jobName
* The job name
* @return The id of the job run
* @throws Exception
* An exception if something goes wrong
*/
protected String startDummyJob(final String jobName) throws Exception {
addJob(jobName, "test-workflow", createJobParameters());
return startJob(jobName);
}
/**
* Starts the dummy job.
*
* @param jobName
* The job name
* @return The id of the job run
* @throws Exception
* An exception if something goes wrong
*/
protected String startThrottledJob(final String jobName, final long delay, final String... delayedWorker)
throws Exception {
final AnyMap taskControl = DataFactory.DEFAULT.createAnyMap();
taskControl.put("delay", delay);
for (final String worker : delayedWorker) {
taskControl.getSeq("workers", true).add(worker);
}
addJob(jobName, "test-workflow", createJobParameters(), taskControl);
return startJob(jobName);
}
/**
* Starts the dummy job for the completion workflow.
*
* @param jobName
* The job name
* @return The id of the job run
* @throws Exception
* An exception if something goes wrong
*/
protected String startDummyCompletionJob(final String jobName) throws Exception {
addJob(jobName, "test-completion-workflow", createJobParameters());
return startJob(jobName);
}
/**
* Creates the job's parameters.
*
* @param tempStoreName
* temporary store name
* @param storeName
* store name
* @return parameter Any
* @throws Exception
* any exception during creation of Any.
*/
protected AnyMap createJobParameters(final String tempStoreName, final String storeName) throws Exception {
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap();
parameters.put(TEMP_STORE_PARAM, tempStoreName);
parameters.put(STORE_PARAM, storeName);
return parameters;
}
/**
* Creates an Any object with the parameters.
*
* @param maxNoOfPartitions
* The maximum number of partitions
* @return An Any object with the parameters
* @throws Exception
* An exception if something goes wrong
*/
protected AnyMap createJobParameters() throws Exception {
return createJobParameters(STORE_NAME, STORE_NAME);
}
/**
* utility method to add jobs.
*/
protected void addJob(final String jobName, final String workflow, final AnyMap parameters) throws Exception {
addJob(jobName, workflow, parameters, null);
}
/**
* utility method to add jobs.
*/
protected void addJob(final String jobName, final String workflow, final AnyMap parameters,
final AnyMap taskControl) throws Exception {
final AnyMap jobAny = DataFactory.DEFAULT.createAnyMap();
jobAny.put("name", jobName);
jobAny.put("workflow", workflow);
jobAny.put("parameters", parameters);
if (taskControl != null) {
jobAny.put("taskControl", taskControl);
}
s_defPersistence.addJob(new JobDefinition(jobAny));
final JobDefinition checkDef = s_defPersistence.getJob(jobName);
assertNotNull(checkDef);
}
/**
* Utility method to start a job. Asserts the jobId is not null.
*
* @param jobName
* The name of the job to start.
* @return The id of the started job run.
* @throws Exception
* error occurred while starting the job.
*/
protected String startJob(final String jobName) throws Exception {
final String jobId = s_jobRunEngine.startJob(jobName);
assertNotNull(jobId);
return jobId;
}
/**
* Cancels a job.
*
* @param jobName
* The job name
* @param jobId
* The job run id
* @throws Exception
* an exception if something went wrong
*/
protected void cancelJob(final String jobName, final String jobId) throws Exception {
s_jobRunEngine.cancelJob(jobName, jobId);
}
/**
* get an initial task for a worker.
*
* @param workerName
* the name of the worker
* @param jobName
* the name of the job
* @return the task or null if no task was available
* @throws Exception
* an exception that happened during requests
*/
protected Task getInitialTask(final String workerName, final String jobName) throws Exception {
return s_jobTaskProcessor.getInitialTask(workerName, jobName);
}
/* base methods. */
/**
* @return a ResultDescription for a task with completion status SUCCESS.
*/
protected ResultDescription successResult() {
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", new HashMap<String, Number>());
}
/**
* @return a ResultDescription for a task with completion status RECOVERABLE_ERROR.
*/
protected ResultDescription recoverableError() {
return new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, "", "", new HashMap<String, Number>());
}
/**
* @return a ResultDescription for a task with completion status FATAL_ERROR.
*/
protected ResultDescription fatalError() {
return new ResultDescription(TaskCompletionStatus.FATAL_ERROR, "", "", new HashMap<String, Number>());
}
/**
* @return a ResultDescription for a task with completion status POSTPONE.
*/
protected ResultDescription postponeResult() {
return new ResultDescription(TaskCompletionStatus.POSTPONE, "", "", new HashMap<String, Number>());
}
/**
* Asserts that a job run has the status "CANCELED", and return the data.
*/
protected AnyMap assertJobRunCanceled(final String jobName, final String jobRunId) throws JobManagerException {
final AnyMap data = assertJobRunData(jobName, jobRunId);
assertEquals(JobState.CANCELED.name(), data.getStringValue(JobManagerConstants.DATA_JOB_STATE));
return data;
}
/**
* get job run data, repeat until the most basic properties are set.
*/
protected AnyMap assertJobRunData(final String jobName, final String jobId) throws JobManagerException {
AnyMap data;
int count = 0;
final int maxCount = 3;
boolean done = false;
do {
data = s_jobDataProvider.getJobRunData(jobName, jobId);
try {
if (data == null
|| data.get("jobId") == null
|| data.get(JobManagerConstants.WORKFLOW_RUN_COUNTER) == null
|| data.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).get(
JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS) == null) {
// ok, data seems to be just gone from zoo keeper into store.
// let's try that again but with a small delay...
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA);
if (++count > maxCount) {
fail("Too many tries to get job run data.");
}
} else {
done = true;
}
} catch (final Exception e) {
e.printStackTrace();
++count;
}
} while (!done);
return data;
}
/**
* assert tasks counter in job run data.
*/
protected void assertTaskCounters(final int createdTasks, final int succeededTasks, final int retriedAfterError,
final int retriedAfterTimeout, final int failedWithoutRetry, final int failedAfterRetry,
final int cancelledTasks, final AnyMap jobRunData) {
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER);
assertEquals(createdTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue());
assertEquals(succeededTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS)
.intValue());
assertEquals(retriedAfterError, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER)
.intValue());
assertEquals(retriedAfterTimeout, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL)
.intValue());
assertEquals(failedAfterRetry, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED)
.intValue());
assertEquals(failedWithoutRetry,
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue());
assertEquals(cancelledTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS)
.intValue());
}
/**
* assert tasks counter sum in job run data.
*/
protected void assertTaskCounters(final AnyMap jobRunData) {
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER);
final int createdTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue();
final int succeededTasks =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS).intValue();
final int retriedByWorker =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER).intValue();
final int retriedByTtl = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL).intValue();
final int failedAfterRetry =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED).intValue();
final int failedByWorker =
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue();
final int cancelledTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS).intValue();
final int obsoleteTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS).intValue();
assertEquals(createdTasks, succeededTasks + retriedByWorker + retriedByTtl + failedByWorker + failedAfterRetry
+ cancelledTasks + obsoleteTasks);
}
/**
* check worker task counters.
*/
protected void assertWorkerCounters(final String workerName, final int expectedSuccessful,
final int expectedRetriedTimeout, final int expectedRetriedWorker, final int expectedFailedRetried,
final int expectedFailedNotRetried, final int expectedObsolete, final AnyMap jobRunData) throws Exception {
assertTrue(jobRunData.containsKey("worker"));
final AnyMap workerData = jobRunData.getMap("worker");
assertTrue(workerData.containsKey(workerName));
final AnyMap workerCounter = workerData.getMap(workerName);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, expectedSuccessful);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL,
expectedRetriedTimeout);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER,
expectedRetriedWorker);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED,
expectedFailedRetried);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED,
expectedFailedNotRetried);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS, expectedObsolete);
}
/**
* assert worker task counter, if value greater than zero expected.
*/
protected void assertWorkerTaskCounter(final AnyMap workerCounter, final String name, final int expected) {
if (expected > 0) {
assertTrue("missing counter for " + name, workerCounter.containsKey(name));
assertEquals("wrong counter value for " + name, expected, workerCounter.getLongValue(name).intValue());
}
}
/**
* asserts that a job is running in the given run mode.
*/
protected void assertJobRunning(final String jobName, final String jobId) throws JobManagerException {
final AnyMap data = s_jobDataProvider.getJobRunData(jobName, jobId);
assertNotNull(data);
assertEquals(jobId, data.getStringValue("jobId"));
assertEquals(JobState.RUNNING.name(), data.getStringValue("state"));
}
/**
* asserts that one or more tasks are currently being finished.
*
* @throws TaskmanagerException
*/
protected void assertTasksInFinish() throws TaskmanagerException {
final TaskList list =
s_taskManager.getTaskList(TaskManager.FINISHING_TASKS_WORKER, TASK_SECTION_INPROGRESS, 100);
assertNotNull(list);
assertTrue(list.getSize() > 0);
}
/**
* creates a bulk for the given task, slot name and writes the data into the bulk.
*
* @param task
* the task
* @param slotName
* the name of the worker slot
* @param data
* the data to write in the store
* @return the created bulk
* @throws Exception
* error
*/
protected BulkInfo createBulk(final Task task, final String slotName, final String data) throws Exception {
final BulkInfo bulk = task.getOutputBulks().get(slotName).get(0);
assertNotNull("Bulk for slotname '" + slotName + "' for worker '" + task.getWorkerName() + "' is null.", bulk);
_store.putObject(bulk.getStoreName(), bulk.getObjectName(), data.getBytes("utf-8"));
return bulk;
}
/**
* Finishes a task.
*/
protected void finishTask(final Task task, final ResultDescription result) throws Exception {
s_taskManager.finishTask(task.getWorkerName(), task.getTaskId(), result);
simulateFinishTaskWorker(); // to make sure that follow up tasks are generated when this method returns.
}
/**
* to actually finish the (finishing) tasks.
*/
protected void simulateFinishTaskWorker() throws Exception {
Task finishingTask = null;
do {
finishingTask = s_taskManager.getTask(TaskManager.FINISHING_TASKS_WORKER, null);
if (finishingTask != null) {
final KeepAliveEntry kae = new KeepAliveEntry(TaskManager.FINISHING_TASKS_WORKER, finishingTask);
_tasksToBeKeptAlive.add(kae);
// check if start and end time are set.
final String start = finishingTask.getProperties().get(Task.PROPERTY_START_TIME);
final String end = finishingTask.getProperties().get(Task.PROPERTY_END_TIME);
assertNotNull(start);
assertNotNull(end);
assertTrue(end.compareTo(start) >= 0);
try {
assertTasksInFinish();
final String finishingTaskId = finishingTask.getTaskId();
try {
s_jobTaskProcessor.finishTask(finishingTask);
s_taskManager.finishTask(TaskManager.FINISHING_TASKS_WORKER, finishingTaskId, successResult());
} catch (final JobManagerException e) {
final ResultDescription result = e.isRecoverable() ? recoverableError() : fatalError();
s_taskManager.finishTask(TaskManager.FINISHING_TASKS_WORKER, finishingTaskId, result);
} catch (final Exception e) {
s_taskManager.finishTask(TaskManager.FINISHING_TASKS_WORKER, finishingTaskId, fatalError());
}
} finally {
_tasksToBeKeptAlive.remove(kae);
}
}
} while (finishingTask != null);
}
/**
* Waits for a job to finish its workflow runs.
*
* @param jobName
* the name of the job
* @param jobId
* the id of the job run
* @param maxWaitTime
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail.
* @throws Exception
* an exception occurred while trying to get job run data
*/
protected void waitForJobRunFinished(final String jobName, final String jobId, final long maxWaitTime)
throws Exception {
final long sleepTime = 500L;
final long millisStarted = System.currentTimeMillis();
while (true) {
try {
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId);
if (jobRunData != null) {
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
final Any activeRunsAny = workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS);
if (activeRunsAny != null) {
assertEquals("wrong type of " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS,
ValueType.LONG, activeRunsAny.getValueType());
final int activeRuns = ((Value) activeRunsAny).asLong().intValue();
assertTrue("negative value for " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS,
activeRuns >= 0);
if (activeRuns == 0) {
assertTaskCounters(jobRunData);
return;
}
}
}
assertTrue("Waited too long for job to finish. Latest job run data: " + jobRunData,
System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(sleepTime);
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
}
}
}