/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* <pre> | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation, | |
* Andreas Schank (Attensity Europe GmbH) - additional tests | |
* </pre> | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.test; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.Any.ValueType; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Value; | |
import org.eclipse.smila.jobmanager.JobRunDataProvider; | |
import org.eclipse.smila.jobmanager.JobRunEngine; | |
import org.eclipse.smila.jobmanager.JobState; | |
import org.eclipse.smila.jobmanager.JobTaskProcessor; | |
import org.eclipse.smila.jobmanager.definitions.DefinitionPersistence; | |
import org.eclipse.smila.jobmanager.definitions.JobDefinition; | |
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants; | |
import org.eclipse.smila.jobmanager.exceptions.JobManagerException; | |
import org.eclipse.smila.jobmanager.taskgenerator.TaskGeneratorException; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskList; | |
import org.eclipse.smila.taskmanager.TaskManager; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.test.taskgenerator.ExceptionalTaskGenerator; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
/** | |
* Test for TaskManager service. | |
* | |
*/ | |
public class TestTaskManager extends DeclarativeServiceTestCase { | |
/** keep alive schedule time in ms. */ | |
protected static final int KEEP_ALIVE_SCHEDULE_TIME = 1000; | |
/** waiting time in ms. */ | |
protected static final int WAITING_TIME = 1000; | |
/** waiting time between invocations to retrieve updated job run data. */ | |
protected static final long WAITING_TIME_FOR_JOB_RUN_DATA = 1000L; | |
/** Time to live in ms. */ | |
protected static final long TIME_TO_LIVE = 5500L; | |
/** Definition Persistence reference. */ | |
protected static DefinitionPersistence s_defPersistence; | |
/** the inprogress queue. */ | |
protected static final String TASK_SECTION_INPROGRESS = "inprogress"; | |
/** tempStore param name. */ | |
private static final String TEMP_STORE_PARAM = "tempStore"; | |
/** store param name. */ | |
private static final String STORE_PARAM = "store"; | |
/** test store name. */ | |
private static final String STORE_NAME = "test-store"; | |
/** output slot for workers. */ | |
private static final String OUTPUT_SLOT = "output"; | |
/** name of input worker. */ | |
private static final String INPUT_WORKER = "input-worker"; | |
/** name of intermediate worker. */ | |
private static final String INTERMEDIATE_WORKER = "intermediate-worker"; | |
/** name of intermediate worker that requests a completion task. */ | |
private static final String INTERMEDIATE_COMPLETION_WORKER = "intermediate-completion-worker"; | |
/** name of final worker. */ | |
private static final String FINAL_WORKER = "final-worker"; | |
/** reference to task manager service. */ | |
private static TaskManager s_taskManager; | |
/** reference to jobdata provider service. */ | |
private static JobRunDataProvider s_jobDataProvider; | |
/** reference to jobmanager task processor. */ | |
private static JobTaskProcessor s_jobTaskProcessor; | |
/** reference to jobrun engine service. */ | |
private static JobRunEngine s_jobRunEngine; | |
/** keep alive tasks. */ | |
protected List<KeepAliveEntry> _tasksToBeKeptAlive = Collections | |
.synchronizedList(new ArrayList<KeepAliveEntry>()); | |
/** reference to object store service. */ | |
private ObjectStoreService _store; | |
/** keep alive executor. */ | |
private final ScheduledExecutorService _keepAliveExecutor = Executors.newSingleThreadScheduledExecutor(); | |
/** keep worker name with task. */ | |
protected class KeepAliveEntry { | |
/** the worker name to send the keep alive for. */ | |
private final String _workerName; | |
/** the task to be kept alive. */ | |
private final Task _task; | |
/** | |
* Creates a new KeepAliveEntry. | |
* | |
* @param workerName | |
* the worker name | |
* @param task | |
* the task to be kept alive for workerName | |
*/ | |
public KeepAliveEntry(final String workerName, final Task task) { | |
_workerName = workerName; | |
_task = task; | |
} | |
} | |
/** | |
* Keeps tasks alive (on a very simple way). | |
*/ | |
protected class KeepAlive implements Runnable { | |
/** task list reference. */ | |
private final List<KeepAliveEntry> _taskList; | |
/** | |
* Constructor. | |
* | |
* @param taskList | |
* reference to task list | |
*/ | |
public KeepAlive(final List<KeepAliveEntry> taskList) { | |
_taskList = taskList; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void run() { | |
try { | |
final List<KeepAliveEntry> tasksCopy = new ArrayList<KeepAliveEntry>(_taskList.size()); | |
synchronized (_taskList) { | |
for (final KeepAliveEntry task : _taskList) { | |
tasksCopy.add(task); | |
} | |
} | |
for (final KeepAliveEntry keepAliveEntry : tasksCopy) { | |
try { | |
s_taskManager.keepAlive(keepAliveEntry._workerName, keepAliveEntry._task.getTaskId()); | |
if (_log.isDebugEnabled()) { | |
_log.debug("keeping alive task " + keepAliveEntry._task.getTaskId() + " for " | |
+ keepAliveEntry._workerName); | |
} | |
} catch (final TaskmanagerException e) { | |
; // ignore | |
e.printStackTrace(); | |
} | |
} | |
} catch (final Throwable e) { | |
; // ignore | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_store = getService(ObjectStoreService.class); | |
_store.ensureStore(STORE_NAME); | |
if (s_taskManager == null) { | |
s_taskManager = getService(TaskManager.class); | |
} | |
if (s_jobDataProvider == null) { | |
s_jobDataProvider = getService(JobRunDataProvider.class); | |
} | |
if (s_jobRunEngine == null) { | |
s_jobRunEngine = getService(JobRunEngine.class); | |
} | |
if (s_jobTaskProcessor == null) { | |
s_jobTaskProcessor = getService(JobTaskProcessor.class); | |
} | |
if (s_defPersistence == null) { | |
s_defPersistence = getService(DefinitionPersistence.class); | |
} | |
_keepAliveExecutor.scheduleAtFixedRate(new KeepAlive(_tasksToBeKeptAlive), KEEP_ALIVE_SCHEDULE_TIME, | |
KEEP_ALIVE_SCHEDULE_TIME, TimeUnit.MILLISECONDS); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
protected void tearDown() throws Exception { | |
// get rid of any finishing tasks, that may have been left unheeded. | |
simulateFinishTaskWorker(); | |
_keepAliveExecutor.shutdownNow(); | |
_store.removeStore(STORE_NAME); | |
super.tearDown(); | |
} | |
/** | |
* @throws Exception | |
* unexpected error | |
*/ | |
public void testService() throws Exception { | |
final TaskManager taskManager = getService(TaskManager.class); | |
assertNotNull(taskManager); | |
} | |
/** | |
* Tests exception thrown when requests are made for an invalid worker name. | |
* | |
* @throws Exception | |
* any exception that is not expected here. | |
*/ | |
public void testInvalidWorkerName() throws Exception { | |
try { | |
s_taskManager.getTask("INVALID_WORKER", null); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.workerName, ex.getCauseCode()); | |
} | |
try { | |
s_taskManager.finishTask("INVALID_WORKER", "taskId", successResult()); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.workerName, ex.getCauseCode()); | |
} | |
try { | |
s_taskManager.keepAlive("INVALID_WORKER", "taskId"); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.workerName, ex.getCauseCode()); | |
} | |
} | |
/** | |
* tests requests with incorrect task id. | |
* | |
* @throws Exception | |
* an exception occurred. | |
*/ | |
public void testInvalidTaskId() throws Exception { | |
try { | |
s_taskManager.finishTask(INTERMEDIATE_WORKER, "taskId", successResult()); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode()); | |
} | |
try { | |
s_taskManager.keepAlive(INTERMEDIATE_WORKER, "taskId"); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode()); | |
} | |
} | |
/** | |
* Tests handling of a recoverable error. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testRecoverableError() throws Exception { | |
final String jobName = "testRecoverableError"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
finishTask(intermediateTask1, recoverableError()); | |
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask2); | |
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId())); | |
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks()); | |
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks()); | |
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask2, successResult()); | |
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData); | |
assertWorkerCounters(INTERMEDIATE_WORKER, 1, 0, 1, 0, 0, 0, jobRunData); | |
} | |
/** | |
* Tests handling of a fatal error. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testFatalError() throws Exception { | |
final String jobName = "testFatalError"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
finishTask(intermediateTask1, fatalError()); | |
assertTaskCounters(2, 1, 0, 0, 1, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertWorkerCounters(INTERMEDIATE_WORKER, 0, 0, 0, 0, 1, 0, jobRunData); | |
} | |
/** | |
* Tests handling of a rollback because of an exceeded time-to-live. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testTimeToLiveRollback() throws Exception { | |
final String jobName = "testTimeToLiveRollback"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
Thread.sleep(2 * TIME_TO_LIVE); | |
simulateFinishTaskWorker(); | |
assertTaskCounters(3, 1, 0, 1, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
try { | |
keepAlive(intermediateTask1); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode()); | |
} | |
try { | |
finishTask(intermediateTask1, successResult()); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(BadParameterTaskmanagerException.Cause.taskId, ex.getCauseCode()); | |
} | |
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask2); | |
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId())); | |
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks()); | |
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks()); | |
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask2, successResult()); | |
assertTaskCounters(4, 2, 0, 1, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertWorkerCounters(INTERMEDIATE_WORKER, 1, 1, 0, 0, 0, 0, jobRunData); | |
assertTaskCounters(4, 3, 0, 1, 0, 0, 0, jobRunData); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
} | |
/** | |
* Tests handling of a keep alive. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testKeepAlive() throws Exception { | |
final String jobName = "testKeepAlive"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate"); | |
for (int i = 0; i < 2 * TIME_TO_LIVE / WAITING_TIME; i++) { | |
keepAlive(intermediateTask); | |
Thread.sleep(WAITING_TIME); | |
} | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
finishTask(intermediateTask, successResult()); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
} | |
/** | |
* Tests handling of a recoverable error. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testRecoverableErrorAutocommit() throws Exception { | |
final String jobName = "testRecoverableErrorAutocommit"; | |
startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, recoverableError()); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask, successResult()); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
} | |
/** | |
* Tests handling of a success for autocommit bulk. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testTimeToLiveAutocommit() throws Exception { | |
final String jobName = "testTimeToLiveAutocommit"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
Thread.sleep(2 * TIME_TO_LIVE); | |
simulateFinishTaskWorker(); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask, successResult()); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
} | |
/** | |
* Tests handling of a fatal error for autocommit bulk. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testFatalErrorAutocommit() throws Exception { | |
final String jobName = "testFatalErrorAutocommit"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, fatalError()); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
} | |
/** | |
* test for postpone a task. | |
*/ | |
public void testPostponeTask() throws Exception { | |
final String jobName = "testPostponeTask"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
Thread.sleep(100); | |
Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask); | |
final String createdTime = intermediateTask.getProperties().get(Task.PROPERTY_CREATED_TIME); | |
assertTrue(intermediateTask.getProperties().containsKey(Task.PROPERY_TASK_AGE)); | |
final long initialTaskAge = Long.parseLong(intermediateTask.getProperties().get(Task.PROPERY_TASK_AGE)); | |
assertTrue(initialTaskAge >= 100); | |
long postponedTaskAge = 0; | |
for (int i = 0; i < 100; i++) { | |
finishTask(intermediateTask, postponeResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
final Task postponedTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(postponedTask); | |
assertEquals(intermediateTask.getTaskId(), postponedTask.getTaskId()); | |
assertEquals(intermediateTask.getParameters(), postponedTask.getParameters()); | |
// check how often task has been postponed. | |
assertEquals(Integer.toString(i + 1), postponedTask.getProperties().get(Task.PROPERTY_POSTPONED)); | |
// check that creation time has not been changed after postponing. | |
assertEquals(createdTime, postponedTask.getProperties().get(Task.PROPERTY_CREATED_TIME)); | |
assertTrue(postponedTask.getProperties().containsKey(Task.PROPERY_TASK_AGE)); | |
postponedTaskAge = Long.parseLong(postponedTask.getProperties().get(Task.PROPERY_TASK_AGE)); | |
assertTrue(postponedTaskAge >= initialTaskAge); | |
intermediateTask = postponedTask; | |
} | |
assertTrue(postponedTaskAge > initialTaskAge); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask, successResult()); | |
assertTaskCounters(3, 2, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData); | |
assertWorkerCounters(INTERMEDIATE_WORKER, 1, 0, 0, 0, 0, 0, jobRunData); | |
} | |
/** | |
* Test for cancelJob, task manager should remove the respective tasks. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testCancel() throws Exception { | |
final String jobName = "testCancel"; | |
final String jobId = startDummyJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, OUTPUT_SLOT); | |
cancelJob(jobName, jobId); | |
try { | |
finishTask(initialTask, successResult()); | |
fail("Should not happen, finish on removed task is not possible."); | |
} catch (final BadParameterTaskmanagerException bpte) { | |
assertNotNull(bpte); | |
} | |
AnyMap jobRunData = assertJobRunCanceled(jobName, jobId); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 1, jobRunData); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNull(intermediateTask); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 1, jobRunData); | |
} | |
/** | |
* Cancel one of two simultaneously running jobs and check if just the correct tasks are removed. | |
* | |
* @throws Exception | |
* any exception that is not expected. | |
*/ | |
public void testCancelOneOfTwo() throws Exception { | |
final String jobName1 = "testCancelOneOfTwo1"; | |
final String jobName2 = "testCancelOneOfTwo2"; | |
final String jobId1 = startDummyJob(jobName1); | |
final String jobId2 = startDummyJob(jobName2); | |
try { | |
assertJobRunning(jobName1, jobId1); | |
assertJobRunning(jobName2, jobId2); | |
final Task initialTask1 = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName1); | |
createBulk(initialTask1, OUTPUT_SLOT, OUTPUT_SLOT); | |
final Task initialTask2 = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName2); | |
createBulk(initialTask2, OUTPUT_SLOT, OUTPUT_SLOT); | |
cancelJob(jobName1, jobId1); | |
try { | |
finishTask(initialTask1, successResult()); | |
fail("should not work"); | |
} catch (final TaskmanagerException ex) { | |
assertNotNull(ex); | |
} | |
finishTask(initialTask2, successResult()); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
assertNotNull(intermediateTask); | |
final AnyMap jobRunData1 = assertJobRunCanceled(jobName1, jobId1); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 1, jobRunData1); | |
final AnyMap jobRunData2 = s_jobDataProvider.getJobRunData(jobName2, jobId2); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, jobRunData2); | |
} finally { | |
cancelJob(jobName2, jobId2); | |
} | |
} | |
/** | |
* test if no tasks can appear in a qualified queue after finishing a qualified task. | |
*/ | |
public void testNoTodoTasksAfterFinishingQualifiedQueue() throws Exception { | |
try { | |
final String workerName = "qualfiedWorker"; | |
final String qualifier = "q1"; | |
final Task task1 = new Task("1", workerName); | |
task1.setQualifier(qualifier); | |
s_taskManager.addTask(task1); | |
final Task task2 = new Task("2", workerName); | |
task2.setQualifier(qualifier); | |
s_taskManager.addTask(task2); | |
final Task inProgressTask = s_taskManager.getTask(workerName, null, Arrays.asList(qualifier)); | |
assertNotNull(inProgressTask); | |
assertTaskManagerCounter(workerName, 1, 1); | |
s_taskManager.finishTasks(workerName, Arrays.asList(qualifier), new ResultDescription( | |
TaskCompletionStatus.OBSOLETE, null, null, null)); | |
assertTaskManagerCounter(workerName, 0, 1); | |
s_taskManager.finishTask(workerName, inProgressTask.getTaskId(), new ResultDescription( | |
TaskCompletionStatus.POSTPONE, null, null, null)); | |
assertTaskManagerCounter(workerName, 0, 0); | |
final Task task3 = new Task("3", workerName); | |
task3.setQualifier(qualifier); | |
s_taskManager.addTask(task3); | |
assertTaskManagerCounter(workerName, 0, 0); | |
assertNull(s_taskManager.getTask(workerName, null, Arrays.asList(qualifier))); | |
} finally { | |
simulateFinishTaskWorker(); | |
} | |
} | |
/** | |
* test completion workflow run with recoverable error during completion. | |
*/ | |
public void testCompletionTasksForCompletionRequestingWorkersWithIntermediateRecoverableError() throws Exception { | |
final String jobName = getName(); | |
final String jobId = startDummyCompletionJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
finishTask(intermediateTask1, recoverableError()); | |
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask2); | |
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId())); | |
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks()); | |
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks()); | |
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask2, successResult()); | |
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
// check that we get a completion task (but only for out intermediate worker!) | |
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(completionTask); | |
assertTrue(completionTask.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK)); | |
assertEquals(Boolean.TRUE, | |
Boolean.valueOf(completionTask.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK))); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// check the correct state of the job | |
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState()); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(5, 3, 1, 0, 0, 0, 0, jobRunData); | |
// first recoverable error on completion task... | |
finishTask(completionTask, recoverableError()); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(6, 3, 2, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task completionTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(completionTask2); | |
assertEquals(Boolean.TRUE, | |
Boolean.valueOf(completionTask2.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK))); | |
finishTask(completionTask2, successResult()); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(6, 4, 2, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 2, 0, 2, 0, 0, 0, jobRunData); | |
assertEquals( | |
Long.valueOf(1), | |
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue( | |
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)); | |
assertEquals( | |
JobState.SUCCEEDED, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
} | |
/** | |
* test completion workflow run with timeout error during completion. | |
*/ | |
public void testCompletionTasksForCompletionRequestingWorkersWithTimeoutDuringCompletion() throws Exception { | |
final String jobName = getName(); | |
final String jobId = startDummyCompletionJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask); | |
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask, successResult()); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
// check that we get a completion task (but only for out intermediate worker!) | |
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(completionTask); | |
assertTrue(completionTask.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK)); | |
assertEquals(Boolean.TRUE, | |
Boolean.valueOf(completionTask.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK))); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// check the correct state of the job | |
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState()); | |
// sleep a bit: | |
Thread.sleep(2 * TIME_TO_LIVE); | |
// first recoverable error on completion task... | |
try { | |
finishTask(completionTask, recoverableError()); | |
fail("task must have timed out."); | |
} catch (final BadParameterTaskmanagerException e) { | |
;// ignore | |
} | |
// run the finish task worker. | |
simulateFinishTaskWorker(); | |
// second try: | |
final Task completionTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(completionTask2); | |
assertTrue(completionTask2.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK)); | |
assertEquals(Boolean.TRUE, | |
Boolean.valueOf(completionTask2.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK))); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// check the correct state of the job | |
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState()); | |
finishTask(completionTask2, successResult()); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(5, 4, 0, 1, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 2, 1, 0, 0, 0, 0, jobRunData); | |
assertEquals( | |
Long.valueOf(1), | |
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue( | |
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)); | |
assertEquals( | |
JobState.SUCCEEDED, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
} | |
/** | |
* test completion workflow run with fatal error during completion. | |
*/ | |
public void testCompletionTasksForCompletionRequestingWorkersWithFatalErrorDuringCompletion() throws Exception { | |
final String jobName = getName(); | |
final String jobId = startDummyCompletionJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
finishTask(intermediateTask1, recoverableError()); | |
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask2); | |
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId())); | |
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks()); | |
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks()); | |
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask2, successResult()); | |
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData); | |
s_jobRunEngine.finishJob(jobName, jobId); | |
// check that we get a completion task (but only for out intermediate worker!) | |
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(completionTask); | |
assertTrue(completionTask.getProperties().containsKey(Task.PROPERTY_IS_COMPLETING_TASK)); | |
assertEquals(Boolean.TRUE, | |
Boolean.valueOf(completionTask.getProperties().get(Task.PROPERTY_IS_COMPLETING_TASK))); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// check the correct state of the job | |
assertEquals(JobState.COMPLETING, s_jobDataProvider.getJobRunInfo(jobName).getState()); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(5, 3, 1, 0, 0, 0, 0, jobRunData); | |
// first recoverable error on completion task... | |
finishTask(completionTask, fatalError()); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(5, 3, 1, 0, 1, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
// we won�t get another task here... | |
final Task completionTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNull(completionTask2); | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(5, 3, 1, 0, 1, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 1, 0, 1, 0, 1, 0, jobRunData); | |
assertEquals( | |
Long.valueOf(1), | |
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue( | |
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)); | |
// still succeeded. | |
assertEquals( | |
JobState.SUCCEEDED, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
} | |
/** | |
* test completion workflow run with exception during completion task generation. | |
*/ | |
public void testCompletionTasksForCompletionRequestingWorkersWithExceptionDuringTaskGeneration() throws Exception { | |
final String jobName = getName(); | |
final String jobId = startDummyCompletionJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
finishTask(intermediateTask1, recoverableError()); | |
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask2); | |
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId())); | |
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks()); | |
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks()); | |
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask2, successResult()); | |
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData); | |
// set the task generator to throw an exception | |
ExceptionalTaskGenerator.setExceptionOnNextCompletionTask(true); | |
try { | |
s_jobRunEngine.finishJob(jobName, jobId); | |
fail("exception requested."); | |
} catch (final JobManagerException e) { | |
assertTrue(e.getCause() instanceof TaskGeneratorException); | |
} | |
// Job must have failed. | |
// check that we will get no completion task (but only for out intermediate worker!) | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// check the correct state of the job | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertEquals( | |
Long.valueOf(1), | |
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue( | |
JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)); | |
assertEquals( | |
Long.valueOf(1), | |
jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).getLongValue( | |
JobManagerConstants.DATA_JOB_NO_OF_FAILED_WORKFLOW_RUNS)); | |
// failed. | |
assertEquals( | |
JobState.FAILED, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
} | |
/** | |
* test completion workflow run with unchecked exception during completion task generation. | |
*/ | |
public void testCompletionTasksForCompletionRequestingWorkersWithUncheckedExceptionDuringTaskGeneration() | |
throws Exception { | |
final String jobName = getName(); | |
final String jobId = startDummyCompletionJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertTaskCounters(1, 0, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask1); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
finishTask(intermediateTask1, recoverableError()); | |
assertTaskCounters(3, 1, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
final Task intermediateTask2 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNotNull(intermediateTask2); | |
assertFalse(intermediateTask1.getTaskId().equals(intermediateTask2.getTaskId())); | |
assertEquals(intermediateTask1.getInputBulks(), intermediateTask2.getInputBulks()); | |
assertEquals(intermediateTask1.getOutputBulks(), intermediateTask2.getOutputBulks()); | |
createBulk(intermediateTask2, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask2, successResult()); | |
assertTaskCounters(4, 2, 1, 0, 0, 0, 0, s_jobDataProvider.getJobRunData(jobName, jobId)); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
final Task finalTask = s_taskManager.getTask(FINAL_WORKER, null); | |
createBulk(finalTask, OUTPUT_SLOT, "final"); | |
finishTask(finalTask, successResult()); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertTaskCounters(4, 3, 1, 0, 0, 0, 0, jobRunData); | |
// set the task generator to throw an exception | |
ExceptionalTaskGenerator.setUncheckedExceptionOnNextCompletionTask(true); | |
try { | |
s_jobRunEngine.finishJob(jobName, jobId); | |
fail("exception requested."); | |
} catch (final IllegalArgumentException e) { | |
;// | |
} | |
// check that we will get no completion task (but only for out intermediate worker!) | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertEquals( | |
JobState.COMPLETING, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
// Job must have still be cancelable | |
s_jobRunEngine.cancelJob(jobName, jobId); | |
assertNull(s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null)); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// check the correct state of the job | |
jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
// canceled | |
assertEquals( | |
JobState.CANCELED, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
} | |
/** | |
* test that no completion workflow run is started when the jobrun would fail. | |
*/ | |
public void testNoCompletionWorkflowForFailedJobRun() throws Exception { | |
final String jobName = getName(); | |
final String jobId = startDummyCompletionJob(jobName); | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
// fail hard. | |
finishTask(initialTask, fatalError()); | |
// no standard task | |
final Task intermediateTask1 = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNull(intermediateTask1); | |
// finish this failed run. | |
s_jobRunEngine.finishJob(jobName, jobId); | |
// check that we get no (completion) tasks whatsoever | |
final Task completionTask = s_taskManager.getTask(INTERMEDIATE_COMPLETION_WORKER, null); | |
assertNull(completionTask); | |
assertNull(s_taskManager.getTask(INPUT_WORKER, null)); | |
assertNull(s_taskManager.getTask(FINAL_WORKER, null)); | |
// but the job run will get finished, but this time with FAILED status. | |
waitForJobRunFinished(jobName, jobId, WAITING_TIME); | |
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertWorkerCounters(INPUT_WORKER, 0, 0, 0, 0, 1, 0, jobRunData); | |
// if the worker has any entries they must all be 0: | |
if (jobRunData.getMap("worker").containsKey(INTERMEDIATE_COMPLETION_WORKER)) { | |
assertWorkerCounters(INTERMEDIATE_COMPLETION_WORKER, 0, 0, 0, 0, 0, 0, jobRunData); | |
} | |
final AnyMap workflowRuns = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); | |
// if the completion workflow runs are available, they must be 0. | |
if (workflowRuns.containsKey(JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)) { | |
assertEquals(Long.valueOf(0), | |
workflowRuns.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_COMPLETION_WORKFLOW_RUNS)); | |
} | |
assertEquals(Long.valueOf(1), | |
workflowRuns.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_STARTED_WORKFLOW_RUNS)); | |
assertEquals(Long.valueOf(0), | |
workflowRuns.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS)); | |
// still succeeded. | |
assertEquals( | |
JobState.FAILED, | |
JobState.valueOf(s_jobDataProvider.getJobRunData(jobName, jobId).getStringValue( | |
JobManagerConstants.DATA_JOB_STATE))); | |
} | |
/** | |
* Test handling of a throttled job: 1 throttled job, workers ask in parallel for tasks. | |
*/ | |
public void testThrottledJob() throws Exception { | |
final int expectedNoOfTasks = 3; | |
final long taskDelay = 1000; | |
final String jobName = "testThrottledJob"; | |
final String jobId = startThrottledJob(jobName, taskDelay, INTERMEDIATE_WORKER); | |
try { | |
createTasksForIntermediateWorker(expectedNoOfTasks, jobName, jobId); | |
long previousTaskTime = 0; | |
int taskCount = 0; | |
while (taskCount < expectedNoOfTasks) { | |
final long getTaskTime = System.currentTimeMillis(); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
if (intermediateTask != null) { | |
assertEquals(jobName, intermediateTask.getProperties().get(Task.PROPERTY_JOB_NAME)); | |
final long interval = System.currentTimeMillis() - previousTaskTime; | |
assertTrue("task #" + taskCount + " delivered to early after only " + interval + "ms. Task properties: " | |
+ intermediateTask.getProperties(), interval >= taskDelay); | |
previousTaskTime = getTaskTime; | |
_tasksToBeKeptAlive.add(new KeepAliveEntry(INTERMEDIATE_WORKER, intermediateTask)); | |
taskCount++; | |
} | |
} | |
} finally { | |
cancelJob(jobName, jobId); | |
} | |
} | |
/** | |
* Test handling of a throttled job: 1 throttled job, workers ask in sequence for tasks. | |
*/ | |
public void testThrottledJobWithFinishedTasks() throws Exception { | |
final int expectedNoOfTasks = 3; | |
final long taskDelay = 1000; | |
final String jobName = "testThrottledJobWithFinishedTasks"; | |
final String jobId = startThrottledJob(jobName, taskDelay, INTERMEDIATE_WORKER); | |
try { | |
createTasksForIntermediateWorker(expectedNoOfTasks, jobName, jobId); | |
long previousTaskTime = 0; | |
int taskCount = 0; | |
while (taskCount < expectedNoOfTasks) { | |
final long getTaskTime = System.currentTimeMillis(); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
if (intermediateTask != null) { | |
final long interval = System.currentTimeMillis() - previousTaskTime; | |
assertFalse("task #" + taskCount + " delivered to early after only " + interval + "ms. Task properties: " | |
+ intermediateTask.getProperties(), interval < taskDelay); | |
previousTaskTime = getTaskTime; | |
createBulk(intermediateTask, OUTPUT_SLOT, "intermediate"); | |
finishTask(intermediateTask, successResult()); | |
taskCount++; | |
} | |
} | |
} finally { | |
cancelJob(jobName, jobId); | |
} | |
} | |
/** | |
* Test handling of a throttled job: throttled job runs at the same time as a normal job. | |
*/ | |
public void testThrottledVsNormalJob() throws Exception { | |
final int expectedNoOfThrottledTasks = 3; | |
final int expectedNoOfNormalTasks = 100; | |
final long taskDelay = 1000; | |
final String throttledJobName = "testThrottledVsNormalJob-throttled"; | |
final String throttledJobId = startThrottledJob(throttledJobName, taskDelay, INTERMEDIATE_WORKER); | |
final String normalJobName = "testThrottledVsNormalJob-normal"; | |
final String normalJobId = startDummyJob(normalJobName); | |
try { | |
createTasksForIntermediateWorker(expectedNoOfThrottledTasks, throttledJobName, throttledJobId); | |
createTasksForIntermediateWorker(expectedNoOfNormalTasks, normalJobName, normalJobId); | |
long previousThrottledTaskTime = 0; | |
int throttledTaskCount = 0; | |
int normalTaskCount = 0; | |
while (throttledTaskCount < expectedNoOfThrottledTasks || normalTaskCount < expectedNoOfNormalTasks) { | |
final long getTaskTime = System.currentTimeMillis(); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
if (intermediateTask != null) { | |
final boolean isThrottledTask = | |
intermediateTask.getProperties().get(Task.PROPERTY_JOB_NAME).equals(throttledJobName); | |
if (isThrottledTask) { | |
throttledTaskCount++; | |
assertFalse("too many throttled tasks", throttledTaskCount > expectedNoOfThrottledTasks); | |
final long interval = System.currentTimeMillis() - previousThrottledTaskTime; | |
assertFalse("task #" + throttledTaskCount + " delivered to early after only " + interval | |
+ "ms. Task properties: " + intermediateTask.getProperties(), interval < taskDelay); | |
previousThrottledTaskTime = getTaskTime; | |
} else { | |
normalTaskCount++; | |
assertFalse("too many normal tasks", normalTaskCount > expectedNoOfNormalTasks); | |
} | |
_tasksToBeKeptAlive.add(new KeepAliveEntry(INTERMEDIATE_WORKER, intermediateTask)); | |
} | |
} | |
} finally { | |
cancelJob(throttledJobName, throttledJobId); | |
cancelJob(normalJobName, normalJobId); | |
} | |
} | |
/** | |
* Test handling of a throttled job: multiple throttled job run at the same time with different delays. | |
*/ | |
public void testMultipleThrottledJobs() throws Exception { | |
final int expectedNoOfTasks = 3; | |
final long[] taskDelay = new long[] { 1000, 1500, 500 }; | |
final String[] jobName = new String[3]; | |
final String[] jobId = new String[3]; | |
for (int i = 0; i < jobName.length; i++) { | |
jobName[i] = "testMultipleThrottledJobs-" + i; | |
jobId[i] = startThrottledJob(jobName[i], taskDelay[i], INTERMEDIATE_WORKER); | |
} | |
try { | |
for (int j = 0; j < expectedNoOfTasks; j++) { | |
for (int i = 0; i < jobName.length; i++) { | |
createTasksForIntermediateWorker(1, jobName[i], jobId[i]); | |
} | |
} | |
final long[] previousTaskTime = new long[jobName.length]; | |
final int[] taskCount = new int[jobName.length]; | |
while (_tasksToBeKeptAlive.size() < expectedNoOfTasks * jobName.length) { | |
final long getTaskTime = System.currentTimeMillis(); | |
final Task intermediateTask = s_taskManager.getTask(INTERMEDIATE_WORKER, null); | |
if (intermediateTask != null) { | |
final String taskJobName = intermediateTask.getProperties().get(Task.PROPERTY_JOB_NAME); | |
final int jobIndex = Arrays.binarySearch(jobName, taskJobName); | |
assertTrue("got task for unknown job " + taskJobName, jobIndex >= 0); | |
taskCount[jobIndex]++; | |
assertFalse("too many tasks for job " + taskJobName, taskCount[jobIndex] > expectedNoOfTasks); | |
final long interval = System.currentTimeMillis() - previousTaskTime[jobIndex]; | |
assertFalse("task #" + taskCount[jobIndex] + " of job " + taskJobName + " delivered to early after only " | |
+ interval + "ms. Task properties: " + intermediateTask.getProperties(), interval < taskDelay[jobIndex]); | |
previousTaskTime[jobIndex] = getTaskTime; | |
_tasksToBeKeptAlive.add(new KeepAliveEntry(INTERMEDIATE_WORKER, intermediateTask)); | |
} | |
} | |
} finally { | |
for (int i = 0; i < jobName.length; i++) { | |
cancelJob(jobName[i], jobId[i]); | |
} | |
} | |
} | |
private void createTasksForIntermediateWorker(final int expectedNoOfTasks, final String jobName, | |
final String jobId) throws Exception { | |
for (int i = 0; i < expectedNoOfTasks; i++) { | |
final Task initialTask = s_jobTaskProcessor.getInitialTask(INPUT_WORKER, jobName); | |
assertNotNull(initialTask); | |
createBulk(initialTask, OUTPUT_SLOT, "input"); | |
finishTask(initialTask, successResult()); | |
} | |
} | |
/** | |
* check task counters for worker. | |
*/ | |
private void assertTaskManagerCounter(final String workerName, final int expectedTodo, | |
final int expectedInProgress) throws TaskmanagerException { | |
final Map<String, TaskCounter> counters = s_taskManager.getTaskCounters(); | |
final TaskCounter counter = counters.get(workerName); | |
assertEquals(expectedTodo, counter.getTasksTodo()); | |
assertEquals(expectedInProgress, counter.getTasksInProgress()); | |
} | |
protected void keepAlive(final List<Task> tasks) throws Exception { | |
for (final Task task : tasks) { | |
keepAlive(task); | |
} | |
} | |
/** | |
* Sends a keep alive to the task manager for a worker. | |
* | |
* @param task | |
* The task | |
* @throws Exception | |
* An exception if something goes wrong | |
*/ | |
protected void keepAlive(final Task task) throws Exception { | |
s_taskManager.keepAlive(task.getWorkerName(), task.getTaskId()); | |
} | |
/** | |
* Starts the dummy job. | |
* | |
* @param jobName | |
* The job name | |
* @return The id of the job run | |
* @throws Exception | |
* An exception if something goes wrong | |
*/ | |
protected String startDummyJob(final String jobName) throws Exception { | |
addJob(jobName, "test-workflow", createJobParameters()); | |
return startJob(jobName); | |
} | |
/** | |
* Starts the dummy job. | |
* | |
* @param jobName | |
* The job name | |
* @return The id of the job run | |
* @throws Exception | |
* An exception if something goes wrong | |
*/ | |
protected String startThrottledJob(final String jobName, final long delay, final String... delayedWorker) | |
throws Exception { | |
final AnyMap taskControl = DataFactory.DEFAULT.createAnyMap(); | |
taskControl.put("delay", delay); | |
for (final String worker : delayedWorker) { | |
taskControl.getSeq("workers", true).add(worker); | |
} | |
addJob(jobName, "test-workflow", createJobParameters(), taskControl); | |
return startJob(jobName); | |
} | |
/** | |
* Starts the dummy job for the completion workflow. | |
* | |
* @param jobName | |
* The job name | |
* @return The id of the job run | |
* @throws Exception | |
* An exception if something goes wrong | |
*/ | |
protected String startDummyCompletionJob(final String jobName) throws Exception { | |
addJob(jobName, "test-completion-workflow", createJobParameters()); | |
return startJob(jobName); | |
} | |
/** | |
* Creates the job's parameters. | |
* | |
* @param tempStoreName | |
* temporary store name | |
* @param storeName | |
* store name | |
* @return parameter Any | |
* @throws Exception | |
* any exception during creation of Any. | |
*/ | |
protected AnyMap createJobParameters(final String tempStoreName, final String storeName) throws Exception { | |
final AnyMap parameters = DataFactory.DEFAULT.createAnyMap(); | |
parameters.put(TEMP_STORE_PARAM, tempStoreName); | |
parameters.put(STORE_PARAM, storeName); | |
return parameters; | |
} | |
/** | |
* Creates an Any object with the parameters. | |
* | |
* @param maxNoOfPartitions | |
* The maximum number of partitions | |
* @return An Any object with the parameters | |
* @throws Exception | |
* An exception if something goes wrong | |
*/ | |
protected AnyMap createJobParameters() throws Exception { | |
return createJobParameters(STORE_NAME, STORE_NAME); | |
} | |
/** | |
* utility method to add jobs. | |
*/ | |
protected void addJob(final String jobName, final String workflow, final AnyMap parameters) throws Exception { | |
addJob(jobName, workflow, parameters, null); | |
} | |
/** | |
* utility method to add jobs. | |
*/ | |
protected void addJob(final String jobName, final String workflow, final AnyMap parameters, | |
final AnyMap taskControl) throws Exception { | |
final AnyMap jobAny = DataFactory.DEFAULT.createAnyMap(); | |
jobAny.put("name", jobName); | |
jobAny.put("workflow", workflow); | |
jobAny.put("parameters", parameters); | |
if (taskControl != null) { | |
jobAny.put("taskControl", taskControl); | |
} | |
s_defPersistence.addJob(new JobDefinition(jobAny)); | |
final JobDefinition checkDef = s_defPersistence.getJob(jobName); | |
assertNotNull(checkDef); | |
} | |
/** | |
* Utility method to start a job. Asserts the jobId is not null. | |
* | |
* @param jobName | |
* The name of the job to start. | |
* @return The id of the started job run. | |
* @throws Exception | |
* error occurred while starting the job. | |
*/ | |
protected String startJob(final String jobName) throws Exception { | |
final String jobId = s_jobRunEngine.startJob(jobName); | |
assertNotNull(jobId); | |
return jobId; | |
} | |
/** | |
* Cancels a job. | |
* | |
* @param jobName | |
* The job name | |
* @param jobId | |
* The job run id | |
* @throws Exception | |
* an exception if something went wrong | |
*/ | |
protected void cancelJob(final String jobName, final String jobId) throws Exception { | |
s_jobRunEngine.cancelJob(jobName, jobId); | |
} | |
/** | |
* get an initial task for a worker. | |
* | |
* @param workerName | |
* the name of the worker | |
* @param jobName | |
* the name of the job | |
* @return the task or null if no task was available | |
* @throws Exception | |
* an exception that happened during requests | |
*/ | |
protected Task getInitialTask(final String workerName, final String jobName) throws Exception { | |
return s_jobTaskProcessor.getInitialTask(workerName, jobName); | |
} | |
/* base methods. */ | |
/** | |
* @return a ResultDescription for a task with completion status SUCCESS. | |
*/ | |
protected ResultDescription successResult() { | |
return new ResultDescription(TaskCompletionStatus.SUCCESSFUL, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status RECOVERABLE_ERROR. | |
*/ | |
protected ResultDescription recoverableError() { | |
return new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status FATAL_ERROR. | |
*/ | |
protected ResultDescription fatalError() { | |
return new ResultDescription(TaskCompletionStatus.FATAL_ERROR, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* @return a ResultDescription for a task with completion status POSTPONE. | |
*/ | |
protected ResultDescription postponeResult() { | |
return new ResultDescription(TaskCompletionStatus.POSTPONE, "", "", new HashMap<String, Number>()); | |
} | |
/** | |
* Asserts that a job run has the status "CANCELED", and return the data. | |
*/ | |
protected AnyMap assertJobRunCanceled(final String jobName, final String jobRunId) throws JobManagerException { | |
final AnyMap data = assertJobRunData(jobName, jobRunId); | |
assertEquals(JobState.CANCELED.name(), data.getStringValue(JobManagerConstants.DATA_JOB_STATE)); | |
return data; | |
} | |
/** | |
* get job run data, repeat until the most basic properties are set. | |
*/ | |
protected AnyMap assertJobRunData(final String jobName, final String jobId) throws JobManagerException { | |
AnyMap data; | |
int count = 0; | |
final int maxCount = 3; | |
boolean done = false; | |
do { | |
data = s_jobDataProvider.getJobRunData(jobName, jobId); | |
try { | |
if (data == null | |
|| data.get("jobId") == null | |
|| data.get(JobManagerConstants.WORKFLOW_RUN_COUNTER) == null | |
|| data.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER).get( | |
JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_WORKFLOW_RUNS) == null) { | |
// ok, data seems to be just gone from zoo keeper into store. | |
// let's try that again but with a small delay... | |
Thread.sleep(WAITING_TIME_FOR_JOB_RUN_DATA); | |
if (++count > maxCount) { | |
fail("Too many tries to get job run data."); | |
} | |
} else { | |
done = true; | |
} | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
++count; | |
} | |
} while (!done); | |
return data; | |
} | |
/** | |
* assert tasks counter in job run data. | |
*/ | |
protected void assertTaskCounters(final int createdTasks, final int succeededTasks, final int retriedAfterError, | |
final int retriedAfterTimeout, final int failedWithoutRetry, final int failedAfterRetry, | |
final int cancelledTasks, final AnyMap jobRunData) { | |
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER); | |
assertEquals(createdTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue()); | |
assertEquals(succeededTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS) | |
.intValue()); | |
assertEquals(retriedAfterError, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER) | |
.intValue()); | |
assertEquals(retriedAfterTimeout, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL) | |
.intValue()); | |
assertEquals(failedAfterRetry, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED) | |
.intValue()); | |
assertEquals(failedWithoutRetry, | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue()); | |
assertEquals(cancelledTasks, taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS) | |
.intValue()); | |
} | |
/** | |
* assert tasks counter sum in job run data. | |
*/ | |
protected void assertTaskCounters(final AnyMap jobRunData) { | |
final AnyMap taskData = jobRunData.getMap(JobManagerConstants.TASK_COUNTER); | |
final int createdTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CREATED_TASKS).intValue(); | |
final int succeededTasks = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS).intValue(); | |
final int retriedByWorker = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER).intValue(); | |
final int retriedByTtl = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL).intValue(); | |
final int failedAfterRetry = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED).intValue(); | |
final int failedByWorker = | |
taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED).intValue(); | |
final int cancelledTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_CANCELLED_TASKS).intValue(); | |
final int obsoleteTasks = taskData.getLongValue(JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS).intValue(); | |
assertEquals(createdTasks, succeededTasks + retriedByWorker + retriedByTtl + failedByWorker + failedAfterRetry | |
+ cancelledTasks + obsoleteTasks); | |
} | |
/** | |
* check worker task counters. | |
*/ | |
protected void assertWorkerCounters(final String workerName, final int expectedSuccessful, | |
final int expectedRetriedTimeout, final int expectedRetriedWorker, final int expectedFailedRetried, | |
final int expectedFailedNotRetried, final int expectedObsolete, final AnyMap jobRunData) throws Exception { | |
assertTrue(jobRunData.containsKey("worker")); | |
final AnyMap workerData = jobRunData.getMap("worker"); | |
assertTrue(workerData.containsKey(workerName)); | |
final AnyMap workerCounter = workerData.getMap(workerName); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, expectedSuccessful); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_TTL, | |
expectedRetriedTimeout); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_RETRIED_TASKS_WORKER, | |
expectedRetriedWorker); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_RETRIED, | |
expectedFailedRetried); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_FAILED_TASKS_NOT_RETRIED, | |
expectedFailedNotRetried); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_OBSOLETE_TASKS, expectedObsolete); | |
} | |
/** | |
* assert worker task counter, if value greater than zero expected. | |
*/ | |
protected void assertWorkerTaskCounter(final AnyMap workerCounter, final String name, final int expected) { | |
if (expected > 0) { | |
assertTrue("missing counter for " + name, workerCounter.containsKey(name)); | |
assertEquals("wrong counter value for " + name, expected, workerCounter.getLongValue(name).intValue()); | |
} | |
} | |
/** | |
* asserts that a job is running in the given run mode. | |
*/ | |
protected void assertJobRunning(final String jobName, final String jobId) throws JobManagerException { | |
final AnyMap data = s_jobDataProvider.getJobRunData(jobName, jobId); | |
assertNotNull(data); | |
assertEquals(jobId, data.getStringValue("jobId")); | |
assertEquals(JobState.RUNNING.name(), data.getStringValue("state")); | |
} | |
/** | |
* asserts that one or more tasks are currently being finished. | |
* | |
* @throws TaskmanagerException | |
*/ | |
protected void assertTasksInFinish() throws TaskmanagerException { | |
final TaskList list = | |
s_taskManager.getTaskList(TaskManager.FINISHING_TASKS_WORKER, TASK_SECTION_INPROGRESS, 100); | |
assertNotNull(list); | |
assertTrue(list.getSize() > 0); | |
} | |
/** | |
* creates a bulk for the given task, slot name and writes the data into the bulk. | |
* | |
* @param task | |
* the task | |
* @param slotName | |
* the name of the worker slot | |
* @param data | |
* the data to write in the store | |
* @return the created bulk | |
* @throws Exception | |
* error | |
*/ | |
protected BulkInfo createBulk(final Task task, final String slotName, final String data) throws Exception { | |
final BulkInfo bulk = task.getOutputBulks().get(slotName).get(0); | |
assertNotNull("Bulk for slotname '" + slotName + "' for worker '" + task.getWorkerName() + "' is null.", bulk); | |
_store.putObject(bulk.getStoreName(), bulk.getObjectName(), data.getBytes("utf-8")); | |
return bulk; | |
} | |
/** | |
* Finishes a task. | |
*/ | |
protected void finishTask(final Task task, final ResultDescription result) throws Exception { | |
s_taskManager.finishTask(task.getWorkerName(), task.getTaskId(), result); | |
simulateFinishTaskWorker(); // to make sure that follow up tasks are generated when this method returns. | |
} | |
/** | |
* to actually finish the (finishing) tasks. | |
*/ | |
protected void simulateFinishTaskWorker() throws Exception { | |
Task finishingTask = null; | |
do { | |
finishingTask = s_taskManager.getTask(TaskManager.FINISHING_TASKS_WORKER, null); | |
if (finishingTask != null) { | |
final KeepAliveEntry kae = new KeepAliveEntry(TaskManager.FINISHING_TASKS_WORKER, finishingTask); | |
_tasksToBeKeptAlive.add(kae); | |
// check if start and end time are set. | |
final String start = finishingTask.getProperties().get(Task.PROPERTY_START_TIME); | |
final String end = finishingTask.getProperties().get(Task.PROPERTY_END_TIME); | |
assertNotNull(start); | |
assertNotNull(end); | |
assertTrue(end.compareTo(start) >= 0); | |
try { | |
assertTasksInFinish(); | |
final String finishingTaskId = finishingTask.getTaskId(); | |
try { | |
s_jobTaskProcessor.finishTask(finishingTask); | |
s_taskManager.finishTask(TaskManager.FINISHING_TASKS_WORKER, finishingTaskId, successResult()); | |
} catch (final JobManagerException e) { | |
final ResultDescription result = e.isRecoverable() ? recoverableError() : fatalError(); | |
s_taskManager.finishTask(TaskManager.FINISHING_TASKS_WORKER, finishingTaskId, result); | |
} catch (final Exception e) { | |
s_taskManager.finishTask(TaskManager.FINISHING_TASKS_WORKER, finishingTaskId, fatalError()); | |
} | |
} finally { | |
_tasksToBeKeptAlive.remove(kae); | |
} | |
} | |
} while (finishingTask != null); | |
} | |
/** | |
* Waits for a job to finish its workflow runs. | |
* | |
* @param jobName | |
* the name of the job | |
* @param jobId | |
* the id of the job run | |
* @param maxWaitTime | |
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail. | |
* @throws Exception | |
* an exception occurred while trying to get job run data | |
*/ | |
protected void waitForJobRunFinished(final String jobName, final String jobId, final long maxWaitTime) | |
throws Exception { | |
final long sleepTime = 500L; | |
final long millisStarted = System.currentTimeMillis(); | |
while (true) { | |
try { | |
final AnyMap jobRunData = s_jobDataProvider.getJobRunData(jobName, jobId); | |
if (jobRunData != null) { | |
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); | |
final Any activeRunsAny = workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS); | |
if (activeRunsAny != null) { | |
assertEquals("wrong type of " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS, | |
ValueType.LONG, activeRunsAny.getValueType()); | |
final int activeRuns = ((Value) activeRunsAny).asLong().intValue(); | |
assertTrue("negative value for " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS, | |
activeRuns >= 0); | |
if (activeRuns == 0) { | |
assertTaskCounters(jobRunData); | |
return; | |
} | |
} | |
} | |
assertTrue("Waited too long for job to finish. Latest job run data: " + jobRunData, | |
System.currentTimeMillis() - millisStarted <= maxWaitTime); | |
Thread.sleep(sleepTime); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
throw e; | |
} | |
} | |
} | |
} |