/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.jobmanager.JobState; | |
import org.eclipse.smila.jobmanager.definitions.JobManagerConstants; | |
import org.eclipse.smila.jobmanager.definitions.JobRunMode; | |
import org.eclipse.smila.jobmanager.exceptions.IllegalJobStateException; | |
import org.eclipse.smila.jobmanager.exceptions.JobManagerException; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskManager; | |
/** | |
* Tests for Job Run Canceling. | |
* | |
*/ | |
public class TestJobCanceling extends JobTaskProcessingTestBase { | |
@Override | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
} | |
/** | |
* cancel a job run without workflow runs. | |
*/ | |
public void testCancelEmptyJobRun() throws Exception { | |
final String jobName = "testCancelEmptyJobRun"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
_jobRunEngine.cancelJob(jobName, jobId); | |
try { | |
_jobTaskProcessor.getInitialTask(WORKER_1, jobName); | |
fail("should not work"); | |
} catch (final IllegalJobStateException ex) { | |
; // OK | |
} | |
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId); | |
assertWorkflowCounters(0, 0, 0, 0, 0, jobRunData); | |
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData); | |
} | |
/** | |
* cancel a job run with an active workflow run. | |
*/ | |
public void testCancelJobRunWithActiveWorkflows() throws Exception { | |
final String jobName = "testCancel"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
final Task inputTask = _jobTaskProcessor.getInitialTask(WORKER_1, jobName); | |
final Task intermediateTask = | |
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2); | |
final ResultDescription intermediateResult = | |
processWorker2Task(intermediateTask, workerParameterValue, tempStoreName); | |
_jobRunEngine.cancelJob(jobName, jobId); | |
try { | |
getSingleNextTask(intermediateTask, intermediateResult, WORKER_3); | |
fail("should not work"); | |
} catch (final IllegalJobStateException ex) { | |
; // OK | |
} | |
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId); | |
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 1, jobRunData); | |
// assert that tasks were removed from taskmanager: | |
assertTaskManagerCounter(WORKER_2, 0, 0); | |
} | |
/** | |
* test that transient data is removed when canceling job run. | |
*/ | |
public void testTransientDataRemoval() throws Exception { | |
final String jobName = "testTransientDataRemoval"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
final Task inputTask = _jobTaskProcessor.getInitialTask(WORKER_1, jobName); | |
final List<BulkInfo> transientObjects = new ArrayList<BulkInfo>(); | |
for (final List<BulkInfo> bulk : inputTask.getOutputBulks().values()) { | |
transientObjects.addAll(bulk); | |
} | |
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2); | |
_jobRunEngine.cancelJob(jobName, jobId); | |
assertObjectsDeleted(transientObjects); // checks that transient data was removed | |
} | |
/** | |
* cancel one of two simultaneously running jobs. | |
*/ | |
public void testCancelOneOfTwo() throws Exception { | |
final String jobName1 = "testCancelOneOfTwo1"; | |
final String jobName2 = "testCancelOneOfTwo2"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName1 = "tempstore1"; | |
final String tempStoreName2 = "tempstore2"; | |
final AnyMap parameters1 = createJobParameters(workerParameterValue, tempStoreName1, tempStoreName1); | |
addJob(jobName1, "testWorkflow", parameters1); | |
_objectStoreService.ensureStore(tempStoreName1); | |
final AnyMap parameters2 = createJobParameters(workerParameterValue, tempStoreName2, tempStoreName2); | |
addJob(jobName2, "testWorkflow", parameters2); | |
_objectStoreService.ensureStore(tempStoreName2); | |
final String jobId1 = startJob(jobName1); | |
final String jobId2 = startJob(jobName2); | |
assertJobRunning(jobName1, jobId1); | |
assertJobRunning(jobName2, jobId2); | |
final StandardJobClient client1 = new StandardJobClient(jobName1, workerParameterValue, tempStoreName1); | |
final StandardJobClient client2 = new StandardJobClient(jobName2, workerParameterValue, tempStoreName2); | |
client1.call(); | |
client2.call(); | |
_jobRunEngine.cancelJob(jobName1, jobId1); | |
try { | |
_jobTaskProcessor.getInitialTask(WORKER_1, jobName1); | |
fail("should not work"); | |
} catch (final IllegalJobStateException ex) { | |
; // OK | |
} | |
client2.call(); | |
_jobRunEngine.finishJob(jobName2, jobId2); | |
final AnyMap jobRunData1 = assertJobRunCanceled(jobName1, jobId1); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData1); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData1); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName2, jobId2); | |
assertWorkflowCounters(2, 0, 2, 0, 0, jobRunData2); | |
assertTaskCounters(6, 6, 0, 0, 0, 0, 0, jobRunData2); | |
} | |
/** | |
* test restarting a canceled job. | |
*/ | |
public void testStartJobAfterCanceled() throws Exception { | |
final String jobName = "testStartJobAfterCanceled"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId1 = startJob(jobName); | |
assertJobRunning(jobName, jobId1); | |
_jobRunEngine.cancelJob(jobName, jobId1); | |
final AnyMap jobRunData1 = assertJobRunCanceled(jobName, jobId1); | |
assertWorkflowCounters(0, 0, 0, 0, 0, jobRunData1); | |
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData1); | |
final String jobId2 = startJob(jobName); | |
assertJobRunning(jobName, jobId2); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobRunEngine.finishJob(jobName, jobId2); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId2); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2); | |
} | |
/** | |
* test if canceling a finished job run fails. | |
*/ | |
public void testFailCancelJobAlreadyFinished() throws Exception { | |
final String jobName = "testFailCancelJobNotRunning"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobRunEngine.finishJob(jobName, jobId); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2); | |
try { | |
_jobRunEngine.cancelJob(jobName, jobId); | |
fail("should not work"); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
} | |
/** | |
* test if canceling a finished previous run fails, when a new run is active. | |
*/ | |
public void testFailCancelPreviousRun() throws Exception { | |
final String jobName = "testFailCancelJobNotRunning"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId1 = startJob(jobName); | |
assertJobRunning(jobName, jobId1); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobRunEngine.finishJob(jobName, jobId1); | |
final AnyMap jobRunData1 = assertJobRunSucceeded(jobName, jobId1); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData1); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData1); | |
final String jobId2 = startJob(jobName); | |
try { | |
_jobRunEngine.cancelJob(jobName, jobId1); | |
fail("should not work"); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobRunEngine.finishJob(jobName, jobId2); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId2); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2); | |
} | |
/** | |
* test if canceling an job run with an invalid run id fails. | |
*/ | |
public void testFailCancelInvalidRun() throws Exception { | |
final String jobName = "testFailCancelJobNotRunning"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
try { | |
_jobRunEngine.cancelJob(jobName, jobId + "-invalid"); | |
fail("should not work"); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobRunEngine.finishJob(jobName, jobId); | |
final AnyMap jobRunData = assertJobRunSucceeded(jobName, jobId); | |
assertWorkflowCounters(2, 0, 2, 0, 0, jobRunData); | |
assertTaskCounters(6, 6, 0, 0, 0, 0, 0, jobRunData); | |
} | |
/** | |
* test canceling a runOnce job. | |
*/ | |
public void testCancelRunOnceJob() throws Exception { | |
final String jobName = "testCancelRunOnceJob"; | |
final String workerParameterValue = "test"; | |
final String storeName = "test-store"; | |
final int numberOfObjects = 10; | |
addJob(jobName, "triggeredWorkflow", createJobParameters(workerParameterValue, storeName, storeName)); | |
_objectStoreService.ensureStore(storeName); | |
final String bucketName = "finalBucket"; | |
for (int i = 0; i < numberOfObjects; i++) { | |
_objectStoreService.putObject(storeName, bucketName + "/objectnumber" + i, ("" + i).getBytes()); | |
} | |
final String jobId = _jobRunEngine.startJob(jobName, JobRunMode.RUNONCE); | |
final AnyMap jobRunData1 = _jobRunDataProvider.getJobRunData(jobName, jobId); | |
assertEquals(JobState.FINISHING.name(), jobRunData1.getStringValue(JobManagerConstants.DATA_JOB_STATE)); | |
assertEquals(JobRunMode.RUNONCE.name(), jobRunData1.getStringValue(JobManagerConstants.DATA_JOB_RUN_MODE)); | |
assertWorkflowCounters(1, 1, 0, 0, 0, jobRunData1); | |
assertTaskCounters(numberOfObjects, 0, 0, 0, 0, 0, 0, jobRunData1); | |
_jobRunEngine.cancelJob(jobName, jobId); | |
final AnyMap jobRunData2 = assertJobRunCanceled(jobName, jobId); | |
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData2); | |
assertTaskCounters(numberOfObjects, 0, 0, 0, 0, 0, numberOfObjects, jobRunData2); | |
} | |
/** | |
* cancel a job run where follow up tasks are currently generated. | |
*/ | |
public void testCancelJobRunWhileTaskGeneration() throws Exception { | |
final String jobName = "testCancelWhileTaskGeneration"; | |
final String workflowName = "testCancelWorkflow"; | |
final String storeName = "teststore"; | |
final TaskManager taskManager = getService(TaskManager.class); | |
final AnyMap parameters = createJobParameters("dummy", storeName, "tempstore"); | |
_objectStoreService.ensureStore(storeName); | |
_objectStoreService.ensureStore("tempstore"); | |
addJob(jobName, workflowName, parameters); | |
// add 100 objects to bigBucket | |
_objectStoreService.clearStore(storeName); | |
for (int i = 0; i < 100; i++) { | |
final Record r = DataFactory.DEFAULT.createRecord(String.valueOf(i)); | |
_objectStoreService.putObject(storeName, "bigBucket/" + String.valueOf(i), | |
_ipcRecordWriter.writeBinaryObject(r)); | |
} | |
// add 1 object to inBucket | |
final Record startRecord = DataFactory.DEFAULT.createRecord("id"); | |
_objectStoreService.putObject(storeName, "inBucket/startid", _ipcRecordWriter.writeBinaryObject(startRecord)); | |
// start run once job | |
// -> 1 task from inBucket will have 100 follow up tasks (test workflow uses combine-taskgenerator) | |
final String jobRunId = _jobRunEngine.startJob(jobName, JobRunMode.RUNONCE); | |
// wait until follow up task generation has begun, afterwards cancel job | |
TaskCounter counter = null; | |
final int maxNoOfLoops = 1000; | |
int noOfLoops = 0; | |
do { | |
Thread.sleep(1); | |
final Map<String, TaskCounter> counters = taskManager.getTaskCounters(); | |
counter = counters.get("testCombinedInputWorker"); | |
noOfLoops++; | |
} while (noOfLoops < maxNoOfLoops | |
&& (counter == null || (counter.getTasksTodo() == 0 && counter.getTasksInProgress() == 0))); | |
_jobRunEngine.cancelJob(jobName, jobRunId); | |
// there may be tasks that were added to Taskmanager after (resp. while) job canceling. | |
// Make sure that they are proccessed by worker (with errors on finish of course, but that's ok) | |
noOfLoops = 0; | |
TaskCounter counterFinish = null; | |
do { | |
Thread.sleep(100); | |
final Map<String, TaskCounter> counters = taskManager.getTaskCounters(); | |
counter = counters.get("testCombinedInputWorker"); | |
counterFinish = counters.get("_finishingTasks"); | |
noOfLoops++; | |
} while (noOfLoops < maxNoOfLoops | |
&& (counter.getTasksTodo() > 0 || counter.getTasksInProgress() > 0 || counterFinish.getTasksTodo() > 0 || counterFinish | |
.getTasksInProgress() > 0)); | |
// Check that workers have "cleaned" all task queues | |
assertTrue("Remaining tasks in Taskmanager's 'testCombinedInputWorker' TODO queue", counter.getTasksTodo() == 0); | |
assertTrue("Remaining tasks in Taskmanager's 'testCombinedInputWorker' INPROGRESS queue", | |
counter.getTasksTodo() == 0); | |
assertTrue("Remaining tasks in Taskmanager's '_finishingTasks' TODO queue", counterFinish.getTasksTodo() == 0); | |
assertTrue("Remaining tasks in Taskmanager's '_finishingTasks' INPROGRESS queue", | |
counterFinish.getTasksTodo() == 0); | |
} | |
} |