/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.jobmanager.IllegalJobStateException; | |
import org.eclipse.smila.jobmanager.JobManagerConstants; | |
import org.eclipse.smila.jobmanager.JobManagerException; | |
import org.eclipse.smila.jobmanager.JobRunMode; | |
import org.eclipse.smila.jobmanager.JobState; | |
import org.eclipse.smila.taskmanager.Task; | |
/** | |
* Tests for Job Run Canceling. | |
* | |
*/ | |
public class TestJobCanceling extends JobManagerTestBase { | |
/** {@inheritDoc} */ | |
@Override | |
protected void tearDown() throws Exception { | |
super.tearDown(); | |
} | |
/** | |
* cancel a job run without workflow runs. | |
*/ | |
public void testCancelEmptyJobRun() throws Exception { | |
final String jobName = "testCancelEmptyJobRun"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
_jobManager.cancelJob(jobName, jobId); | |
try { | |
_jobManager.getInitialTask(WORKER_1, jobName); | |
fail("should not work"); | |
} catch (final IllegalJobStateException ex) { | |
; // OK | |
} | |
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId); | |
assertWorkflowCounters(0, 0, 0, 0, 0, jobRunData); | |
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData); | |
} | |
/** | |
* cancel a job run with an active workflow run. | |
*/ | |
public void testCancelJobRunWithActiveWorkflows() throws Exception { | |
final String jobName = "testCancel"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
final Task inputTask = _jobManager.getInitialTask(WORKER_1, jobName); | |
final Task intermediateTask = | |
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2); | |
_jobManager.cancelJob(jobName, jobId); | |
try { | |
getSingleNextTask(intermediateTask, | |
processWorker2Task(intermediateTask, workerParameterValue, tempStoreName), WORKER_3); | |
fail("should not work"); | |
} catch (final IllegalJobStateException ex) { | |
; // OK | |
} | |
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId); | |
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData); | |
assertTaskCounters(2, 1, 0, 0, 0, 0, 1, jobRunData); | |
} | |
/** | |
* cancel one of two simultaneously running jobs. | |
*/ | |
public void testCancelOneOfTwo() throws Exception { | |
final String jobName1 = "testCancelOneOfTwo1"; | |
final String jobName2 = "testCancelOneOfTwo2"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName1 = "tempstore1"; | |
final String tempStoreName2 = "tempstore2"; | |
final AnyMap parameters1 = createJobParameters(workerParameterValue, tempStoreName1, tempStoreName1); | |
addJob(jobName1, "testWorkflow", parameters1); | |
_objectStoreService.ensureStore(tempStoreName1); | |
final AnyMap parameters2 = createJobParameters(workerParameterValue, tempStoreName2, tempStoreName2); | |
addJob(jobName2, "testWorkflow", parameters2); | |
_objectStoreService.ensureStore(tempStoreName2); | |
final String jobId1 = startJob(jobName1); | |
final String jobId2 = startJob(jobName2); | |
assertJobRunning(jobName1, jobId1); | |
assertJobRunning(jobName2, jobId2); | |
final StandardJobClient client1 = new StandardJobClient(jobName1, workerParameterValue, tempStoreName1); | |
final StandardJobClient client2 = new StandardJobClient(jobName2, workerParameterValue, tempStoreName2); | |
client1.call(); | |
client2.call(); | |
_jobManager.cancelJob(jobName1, jobId1); | |
try { | |
_jobManager.getInitialTask(WORKER_1, jobName1); | |
fail("should not work"); | |
} catch (final IllegalJobStateException ex) { | |
; // OK | |
} | |
client2.call(); | |
_jobManager.finishJob(jobName2, jobId2); | |
final AnyMap jobRunData1 = assertJobRunCanceled(jobName1, jobId1); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData1); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData1); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName2, jobId2); | |
assertWorkflowCounters(2, 0, 2, 0, 0, jobRunData2); | |
assertTaskCounters(6, 6, 0, 0, 0, 0, 0, jobRunData2); | |
} | |
/** | |
* test restarting a canceled job. | |
*/ | |
public void testStartJobAfterCanceled() throws Exception { | |
final String jobName = "testStartJobAfterCanceled"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId1 = startJob(jobName); | |
assertJobRunning(jobName, jobId1); | |
_jobManager.cancelJob(jobName, jobId1); | |
final AnyMap jobRunData1 = assertJobRunCanceled(jobName, jobId1); | |
assertWorkflowCounters(0, 0, 0, 0, 0, jobRunData1); | |
assertTaskCounters(0, 0, 0, 0, 0, 0, 0, jobRunData1); | |
final String jobId2 = startJob(jobName); | |
assertJobRunning(jobName, jobId2); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobManager.finishJob(jobName, jobId2); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId2); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2); | |
} | |
/** | |
* test if canceling a finished job run fails. | |
*/ | |
public void testFailCancelJobAlreadyFinished() throws Exception { | |
final String jobName = "testFailCancelJobNotRunning"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobManager.finishJob(jobName, jobId); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2); | |
try { | |
_jobManager.cancelJob(jobName, jobId); | |
fail("should not work"); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
} | |
/** | |
* test if canceling a finished previous run fails, when a new run is active. | |
*/ | |
public void testFailCancelPreviousRun() throws Exception { | |
final String jobName = "testFailCancelJobNotRunning"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId1 = startJob(jobName); | |
assertJobRunning(jobName, jobId1); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobManager.finishJob(jobName, jobId1); | |
final AnyMap jobRunData1 = assertJobRunSucceeded(jobName, jobId1); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData1); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData1); | |
final String jobId2 = startJob(jobName); | |
try { | |
_jobManager.cancelJob(jobName, jobId1); | |
fail("should not work"); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobManager.finishJob(jobName, jobId2); | |
final AnyMap jobRunData2 = assertJobRunSucceeded(jobName, jobId2); | |
assertWorkflowCounters(1, 0, 1, 0, 0, jobRunData2); | |
assertTaskCounters(3, 3, 0, 0, 0, 0, 0, jobRunData2); | |
} | |
/** | |
* test if canceling an job run with an invalid run id fails. | |
*/ | |
public void testFailCancelInvalidRun() throws Exception { | |
final String jobName = "testFailCancelJobNotRunning"; | |
final String workerParameterValue = "test"; | |
final String tempStoreName = "tempstore"; | |
final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName); | |
addJob(jobName, "testWorkflow", parameters); | |
_objectStoreService.ensureStore(tempStoreName); | |
final String jobId = startJob(jobName); | |
assertJobRunning(jobName, jobId); | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
try { | |
_jobManager.cancelJob(jobName, jobId + "-invalid"); | |
fail("should not work"); | |
} catch (final JobManagerException ex) { | |
; // OK | |
} | |
new StandardJobClient(jobName, workerParameterValue, tempStoreName).call(); | |
_jobManager.finishJob(jobName, jobId); | |
final AnyMap jobRunData = assertJobRunSucceeded(jobName, jobId); | |
assertWorkflowCounters(2, 0, 2, 0, 0, jobRunData); | |
assertTaskCounters(6, 6, 0, 0, 0, 0, 0, jobRunData); | |
} | |
/** | |
* test canceling a runOnce job. | |
*/ | |
public void testCancelRunOnceJob() throws Exception { | |
final String jobName = "testCancelRunOnceJob"; | |
final String workerParameterValue = "test"; | |
final String storeName = "test-store"; | |
final int numberOfObjects = 10; | |
addJob(jobName, "triggeredWorkflow", createJobParameters(workerParameterValue, storeName, storeName)); | |
_objectStoreService.ensureStore(storeName); | |
final String bucketName = "finalBucket"; | |
for (int i = 0; i < numberOfObjects; i++) { | |
_objectStoreService.putObject(storeName, bucketName + "/objectnumber" + i, ("" + i).getBytes()); | |
} | |
final String jobId = _jobManager.startJob(jobName, JobRunMode.RUNONCE); | |
final AnyMap jobRunData1 = _jobManager.getJobRunData(jobName, jobId); | |
assertEquals(JobState.FINISHING.name(), jobRunData1.getStringValue(JobManagerConstants.DATA_JOB_STATE)); | |
assertEquals(JobRunMode.RUNONCE.name(), jobRunData1.getStringValue(JobManagerConstants.DATA_JOB_RUN_MODE)); | |
assertWorkflowCounters(1, 1, 0, 0, 0, jobRunData1); | |
assertTaskCounters(numberOfObjects, 0, 0, 0, 0, 0, 0, jobRunData1); | |
_jobManager.cancelJob(jobName, jobId); | |
final AnyMap jobRunData2 = assertJobRunCanceled(jobName, jobId); | |
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData2); | |
assertTaskCounters(numberOfObjects, 0, 0, 0, 0, 0, numberOfObjects, jobRunData2); | |
} | |
} |