blob: 9dbd6da76d04d6049e18653671a4bc8b5eb1233f [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial
* implementation
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.jobmanager.JobState;
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants;
import org.eclipse.smila.jobmanager.definitions.JobRunMode;
import org.eclipse.smila.jobmanager.exceptions.IllegalJobStateException;
import org.eclipse.smila.jobmanager.exceptions.JobManagerException;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskManager;
/**
* Tests for Job Run Canceling.
*
*/
public class TestJobCanceling extends JobTaskProcessingTestBase {
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
/**
* cancel a job run without workflow runs.
*/
public void testCancelEmptyJobRun() throws Exception {
final String jobName = "testCancelEmptyJobRun";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId = startJob(jobName);
assertJobRunning(jobName, jobId);
_jobRunEngine.cancelJob(jobName, jobId);
try {
_jobTaskProcessor.getInitialTask(WORKER_1, jobName);
fail("should not work");
} catch (final IllegalJobStateException ex) {
; // OK
}
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId);
assertWorkflowCounters(0, 0, 0, 0, 0, jobRunData);
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData);
}
/**
* cancel a job run with an active workflow run.
*/
public void testCancelJobRunWithActiveWorkflows() throws Exception {
final String jobName = "testCancel";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId = startJob(jobName);
assertJobRunning(jobName, jobId);
final Task inputTask = _jobTaskProcessor.getInitialTask(WORKER_1, jobName);
final Task intermediateTask =
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);
final ResultDescription intermediateResult =
processWorker2Task(intermediateTask, workerParameterValue, tempStoreName);
_jobRunEngine.cancelJob(jobName, jobId);
try {
getSingleNextTask(intermediateTask, intermediateResult, WORKER_3);
fail("should not work");
} catch (final IllegalJobStateException ex) {
; // OK
}
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId);
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData);
assertTaskCounters(2, 1, 0, 0, 0, 0, 1, jobRunData);
// assert that tasks were removed from taskmanager:
assertTaskManagerCounter(WORKER_2, 0, 0);
}
/**
* test that transient data is removed when canceling job run.
*/
public void testTransientDataRemoval() throws Exception {
final String jobName = "testTransientDataRemoval";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId = startJob(jobName);
assertJobRunning(jobName, jobId);
final Task inputTask = _jobTaskProcessor.getInitialTask(WORKER_1, jobName);
final List<BulkInfo> transientObjects = new ArrayList<BulkInfo>();
for (final List<BulkInfo> bulk : inputTask.getOutputBulks().values()) {
transientObjects.addAll(bulk);
}
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);
_jobRunEngine.cancelJob(jobName, jobId);
assertObjectsDeleted(transientObjects); // checks that transient data was removed
}
/**
* cancel one of two simultaneously running jobs.
*/
public void testCancelOneOfTwo() throws Exception {
final String jobName1 = "testCancelOneOfTwo1";
final String jobName2 = "testCancelOneOfTwo2";
final String workerParameterValue = "test";
final String tempStoreName1 = "tempstore1";
final String tempStoreName2 = "tempstore2";
final AnyMap parameters1 = createJobParameters(workerParameterValue, tempStoreName1, tempStoreName1);
addJob(jobName1, "testWorkflow", parameters1);
_objectStoreService.ensureStore(tempStoreName1);
final AnyMap parameters2 = createJobParameters(workerParameterValue, tempStoreName2, tempStoreName2);
addJob(jobName2, "testWorkflow", parameters2);
_objectStoreService.ensureStore(tempStoreName2);
final String jobId1 = startJob(jobName1);
final String jobId2 = startJob(jobName2);
assertJobRunning(jobName1, jobId1);
assertJobRunning(jobName2, jobId2);
final StandardJobClient client1 = new StandardJobClient(jobName1, workerParameterValue, tempStoreName1);
final StandardJobClient client2 = new StandardJobClient(jobName2, workerParameterValue, tempStoreName2);
client1.call();
client2.call();
_jobRunEngine.cancelJob(jobName1, jobId1);
try {
_jobTaskProcessor.getInitialTask(WORKER_1, jobName1);
fail("should not work");
} catch (final IllegalJobStateException ex) {
; // OK
}
client2.call();
_jobRunEngine.finishJob(jobName2, jobId2);
final AnyMap jobRunData1 = assertJobRunCanceled(jobName1, jobId1);
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData1);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData1);
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName2, jobId2);
assertWorkflowCounters(2, 0, 2, 0, 0, jobRunData2);
assertTaskCounters(6, 6, 0, 0, 0, 0, 0, jobRunData2);
}
/**
* test restarting a canceled job.
*/
public void testStartJobAfterCanceled() throws Exception {
final String jobName = "testStartJobAfterCanceled";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId1 = startJob(jobName);
assertJobRunning(jobName, jobId1);
_jobRunEngine.cancelJob(jobName, jobId1);
final AnyMap jobRunData1 = assertJobRunCanceled(jobName, jobId1);
assertWorkflowCounters(0, 0, 0, 0, 0, jobRunData1);
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData1);
final String jobId2 = startJob(jobName);
assertJobRunning(jobName, jobId2);
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call();
_jobRunEngine.finishJob(jobName, jobId2);
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId2);
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2);
}
/**
* test if canceling a finished job run fails.
*/
public void testFailCancelJobAlreadyFinished() throws Exception {
final String jobName = "testFailCancelJobNotRunning";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId = startJob(jobName);
assertJobRunning(jobName, jobId);
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call();
_jobRunEngine.finishJob(jobName, jobId);
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId);
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2);
try {
_jobRunEngine.cancelJob(jobName, jobId);
fail("should not work");
} catch (final JobManagerException ex) {
; // OK
}
}
/**
* test if canceling a finished previous run fails, when a new run is active.
*/
public void testFailCancelPreviousRun() throws Exception {
final String jobName = "testFailCancelJobNotRunning";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId1 = startJob(jobName);
assertJobRunning(jobName, jobId1);
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call();
_jobRunEngine.finishJob(jobName, jobId1);
final AnyMap jobRunData1 = assertJobRunSucceeded(jobName, jobId1);
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData1);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData1);
final String jobId2 = startJob(jobName);
try {
_jobRunEngine.cancelJob(jobName, jobId1);
fail("should not work");
} catch (final JobManagerException ex) {
; // OK
}
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call();
_jobRunEngine.finishJob(jobName, jobId2);
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId2);
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2);
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2);
}
/**
* test if canceling an job run with an invalid run id fails.
*/
public void testFailCancelInvalidRun() throws Exception {
final String jobName = "testFailCancelJobNotRunning";
final String workerParameterValue = "test";
final String tempStoreName = "tempstore";
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
addJob(jobName, "testWorkflow", parameters);
_objectStoreService.ensureStore(tempStoreName);
final String jobId = startJob(jobName);
assertJobRunning(jobName, jobId);
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call();
try {
_jobRunEngine.cancelJob(jobName, jobId + "-invalid");
fail("should not work");
} catch (final JobManagerException ex) {
; // OK
}
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call();
_jobRunEngine.finishJob(jobName, jobId);
final AnyMap jobRunData = assertJobRunSucceeded(jobName, jobId);
assertWorkflowCounters(2, 0, 2, 0, 0, jobRunData);
assertTaskCounters(6, 6, 0, 0, 0, 0, 0, jobRunData);
}
/**
* test canceling a runOnce job.
*/
public void testCancelRunOnceJob() throws Exception {
final String jobName = "testCancelRunOnceJob";
final String workerParameterValue = "test";
final String storeName = "test-store";
final int numberOfObjects = 10;
addJob(jobName, "triggeredWorkflow", createJobParameters(workerParameterValue, storeName, storeName));
_objectStoreService.ensureStore(storeName);
final String bucketName = "finalBucket";
for (int i = 0; i < numberOfObjects; i++) {
_objectStoreService.putObject(storeName, bucketName + "/objectnumber" + i, ("" + i).getBytes());
}
final String jobId = _jobRunEngine.startJob(jobName, JobRunMode.RUNONCE);
final AnyMap jobRunData1 = _jobRunDataProvider.getJobRunData(jobName, jobId);
assertEquals(JobState.FINISHING.name(), jobRunData1.getStringValue(JobManagerConstants.DATA_JOB_STATE));
assertEquals(JobRunMode.RUNONCE.name(), jobRunData1.getStringValue(JobManagerConstants.DATA_JOB_RUN_MODE));
assertWorkflowCounters(1, 1, 0, 0, 0, jobRunData1);
assertTaskCounters(numberOfObjects, 0, 0, 0, 0, 0, 0, jobRunData1);
_jobRunEngine.cancelJob(jobName, jobId);
final AnyMap jobRunData2 = assertJobRunCanceled(jobName, jobId);
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData2);
assertTaskCounters(numberOfObjects, 0, 0, 0, 0, 0, numberOfObjects, jobRunData2);
}
/**
* cancel a job run where follow up tasks are currently generated.
*/
public void testCancelJobRunWhileTaskGeneration() throws Exception {
final String jobName = "testCancelWhileTaskGeneration";
final String workflowName = "testCancelWorkflow";
final String storeName = "teststore";
final TaskManager taskManager = getService(TaskManager.class);
final AnyMap parameters = createJobParameters("dummy", storeName, "tempstore");
_objectStoreService.ensureStore(storeName);
_objectStoreService.ensureStore("tempstore");
addJob(jobName, workflowName, parameters);
// add 100 objects to bigBucket
_objectStoreService.clearStore(storeName);
for (int i = 0; i < 100; i++) {
final Record r = DataFactory.DEFAULT.createRecord(String.valueOf(i));
_objectStoreService.putObject(storeName, "bigBucket/" + String.valueOf(i),
_ipcRecordWriter.writeBinaryObject(r));
}
// add 1 object to inBucket
final Record startRecord = DataFactory.DEFAULT.createRecord("id");
_objectStoreService.putObject(storeName, "inBucket/startid", _ipcRecordWriter.writeBinaryObject(startRecord));
// start run once job
// -> 1 task from inBucket will have 100 follow up tasks (test workflow uses combine-taskgenerator)
final String jobRunId = _jobRunEngine.startJob(jobName, JobRunMode.RUNONCE);
// wait until follow up task generation has begun, afterwards cancel job
TaskCounter counter = null;
final int maxNoOfLoops = 1000;
int noOfLoops = 0;
do {
Thread.sleep(1);
final Map<String, TaskCounter> counters = taskManager.getTaskCounters();
counter = counters.get("testCombinedInputWorker");
noOfLoops++;
} while (noOfLoops < maxNoOfLoops
&& (counter == null || (counter.getTasksTodo() == 0 && counter.getTasksInProgress() == 0)));
_jobRunEngine.cancelJob(jobName, jobRunId);
// there may be tasks that were added to Taskmanager after (resp. while) job canceling.
// Make sure that they are proccessed by worker (with errors on finish of course, but that's ok)
noOfLoops = 0;
TaskCounter counterFinish = null;
do {
Thread.sleep(100);
final Map<String, TaskCounter> counters = taskManager.getTaskCounters();
counter = counters.get("testCombinedInputWorker");
counterFinish = counters.get("_finishingTasks");
noOfLoops++;
} while (noOfLoops < maxNoOfLoops
&& (counter.getTasksTodo() > 0 || counter.getTasksInProgress() > 0 || counterFinish.getTasksTodo() > 0 || counterFinish
.getTasksInProgress() > 0));
// Check that workers have "cleaned" all task queues
assertTrue("Remaining tasks in Taskmanager's 'testCombinedInputWorker' TODO queue", counter.getTasksTodo() == 0);
assertTrue("Remaining tasks in Taskmanager's 'testCombinedInputWorker' INPROGRESS queue",
counter.getTasksTodo() == 0);
assertTrue("Remaining tasks in Taskmanager's '_finishingTasks' TODO queue", counterFinish.getTasksTodo() == 0);
assertTrue("Remaining tasks in Taskmanager's '_finishingTasks' INPROGRESS queue",
counterFinish.getTasksTodo() == 0);
}
}