remove tasks from Taskmanager for failed job runs
diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java
index eb12385..10f99f4 100644
--- a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java
@@ -75,6 +75,7 @@
suite.addTestSuite(TestWorkflowRunCanceling.class);
suite.addTestSuite(TestWorkflowRunControlHandler.class);
suite.addTestSuite(TestWorkflowRunInfo.class);
+ suite.addTestSuite(TestJobRunFails.class);
// $JUnit-END$
return suite;
}
diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java
index 3b88743..950ddb7 100644
--- a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java
@@ -24,6 +24,8 @@
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
+import org.eclipse.smila.taskmanager.TaskCounter;
+import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
/**
@@ -201,7 +203,7 @@
/** worker with optional parameter. */
public static final String WORKER_OPTIONAL_PARAM = "testOptionalParametersWorker";
-
+
/** worker counter name for input objects. */
public static final String INPUT_OBJECT_COUNT = "inputObjectCount";
@@ -226,7 +228,7 @@
_jobTaskProcessor = getService(JobTaskProcessor.class);
assertNotNull(_jobTaskProcessor);
_taskStorage = getService(TaskStorage.class);
- assertNotNull(_taskStorage);
+ assertNotNull(_taskStorage);
}
/**
@@ -561,4 +563,13 @@
workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_TRANSIENT_BULKS).intValue());
}
+ /** check task counters for worker. */
+ protected void assertTaskManagerCounter(final String workerName, final int expectedTodo,
+ final int expectedInProgress) throws TaskmanagerException {
+ final Map<String, TaskCounter> counters = _taskStorage.getTaskCounters();
+ final TaskCounter counter = counters.get(workerName);
+ assertEquals(expectedTodo, counter.getTasksTodo());
+ assertEquals(expectedInProgress, counter.getTasksInProgress());
+ }
+
}
diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java
index 2bbed28..9dbd6da 100644
--- a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java
@@ -31,7 +31,7 @@
*
*/
public class TestJobCanceling extends JobTaskProcessingTestBase {
- /** {@inheritDoc} */
+
@Override
protected void tearDown() throws Exception {
super.tearDown();
@@ -78,8 +78,7 @@
final Task intermediateTask =
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);
final ResultDescription intermediateResult =
- processWorker2Task(intermediateTask, workerParameterValue, tempStoreName);
-
+ processWorker2Task(intermediateTask, workerParameterValue, tempStoreName);
_jobRunEngine.cancelJob(jobName, jobId);
try {
getSingleNextTask(intermediateTask, intermediateResult, WORKER_3);
@@ -90,6 +89,8 @@
final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId);
assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData);
assertTaskCounters(2, 1, 0, 0, 0, 0, 1, jobRunData);
+ // assert that tasks were removed from taskmanager:
+ assertTaskManagerCounter(WORKER_2, 0, 0);
}
/**
@@ -113,7 +114,7 @@
getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);
_jobRunEngine.cancelJob(jobName, jobId);
- assertObjectsDeleted(transientObjects); // checks that transient data was removed
+ assertObjectsDeleted(transientObjects); // checks that transient data was removed
}
/**
@@ -311,15 +312,15 @@
_objectStoreService.putObject(storeName, "bigBucket/" + String.valueOf(i),
_ipcRecordWriter.writeBinaryObject(r));
}
- // add 1 object to inBucket
+ // add 1 object to inBucket
final Record startRecord = DataFactory.DEFAULT.createRecord("id");
_objectStoreService.putObject(storeName, "inBucket/startid", _ipcRecordWriter.writeBinaryObject(startRecord));
- // start run once job
+ // start run once job
// -> 1 task from inBucket will have 100 follow up tasks (test workflow uses combine-taskgenerator)
final String jobRunId = _jobRunEngine.startJob(jobName, JobRunMode.RUNONCE);
- // wait until follow up task generation has begun, afterwards cancel job
+ // wait until follow up task generation has begun, afterwards cancel job
TaskCounter counter = null;
final int maxNoOfLoops = 1000;
int noOfLoops = 0;
@@ -333,7 +334,7 @@
_jobRunEngine.cancelJob(jobName, jobRunId);
// there may be tasks that were added to Taskmanager after (resp. while) job canceling.
- // Make sure that they are proccessed by worker (with errors on finish of course, but that's ok)
+ // Make sure that they are proccessed by worker (with errors on finish of course, but that's ok)
noOfLoops = 0;
TaskCounter counterFinish = null;
do {
@@ -353,4 +354,5 @@
assertTrue("Remaining tasks in Taskmanager's '_finishingTasks' INPROGRESS queue",
counterFinish.getTasksTodo() == 0);
}
+
}
diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobRunFails.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobRunFails.java
new file mode 100644
index 0000000..3e97ec2
--- /dev/null
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobRunFails.java
@@ -0,0 +1,51 @@
+/**********************************************************************************************************************
+ * Copyright (c) 2008, 2014 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This
+ * program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors: Andreas Weber (Empolis Information Management GmbH) - initial implementation
+ **********************************************************************************************************************/
+package org.eclipse.smila.jobmanager.test;
+
+import org.eclipse.smila.datamodel.AnyMap;
+import org.eclipse.smila.taskmanager.ResultDescription;
+import org.eclipse.smila.taskmanager.Task;
+import org.eclipse.smila.taskmanager.TaskCompletionStatus;
+import org.eclipse.smila.taskmanager.TaskManager;
+
+/**
+ * Tests for Job Run Canceling.
+ *
+ */
+public class TestJobRunFails extends JobTaskProcessingTestBase {
+
+ /** check that tasks are removed from taskmanager when job run failed. */
+ public void testTasksAreRemoved() throws Exception {
+ final String jobName = "testFail";
+ final String workerParameterValue = "test";
+ final String tempStoreName = "tempstore";
+ final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);
+ addJob(jobName, "testWorkflow", parameters);
+ _objectStoreService.ensureStore(tempStoreName);
+ final String jobRunId = startJob(jobName);
+ assertJobRunning(jobName, jobRunId);
+
+ final Task inputTask = _jobTaskProcessor.getInitialTask(WORKER_1, jobName);
+ final Task intermediateTask =
+ getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);
+
+ final ResultDescription errorResult = new ResultDescription(TaskCompletionStatus.FATAL_ERROR, null, null, null);
+ final Task finishTaskForIntermediate =
+ intermediateTask.createFinishTask(errorResult, TaskManager.FINISHING_TASKS_WORKER);
+ _jobTaskProcessor.finishTask(finishTaskForIntermediate);
+ _jobRunEngine.finishJob(jobName, jobRunId);
+
+ final AnyMap jobRunData = assertJobRunFailed(jobName, jobRunId);
+ assertWorkflowCounters(1, 0, 0, 1, 0, jobRunData);
+ assertTaskCounters(2, 1, 0, 0, 1, 0, 0, jobRunData);
+ // assert that tasks were removed from taskmanager:
+ assertTaskManagerCounter(WORKER_1, 0, 0);
+ assertTaskManagerCounter(WORKER_2, 0, 0);
+ }
+
+}
diff --git a/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java b/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java
index 16d8984..32f03aa 100644
--- a/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java
+++ b/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java
@@ -256,10 +256,7 @@
try {
// remove tasks in taskmanager
- final AnyMap taskFilter = DataFactory.DEFAULT.createAnyMap();
- taskFilter.put(Task.PROPERTY_JOB_NAME, jobName);
- taskFilter.put(Task.PROPERTY_JOB_RUN_ID, jobRunId);
- _taskManager.removeTasks(taskFilter);
+ removeTasksQuietly(jobName, jobRunId);
// remove transient data for canceled workflow runs before canceling the workflow runs in run storage,
// otherwise transient data won't be deleted, because info about what to delete is lost.
@@ -273,9 +270,6 @@
// persist job run data in objectstore, delete job run data in ZK, set job run state on CANCELED
completeJobRun(jobName, jobRunId, JobState.CANCELED);
- } catch (final TaskmanagerException e) {
- throw new JobManagerException("Error while canceling job run '" + jobRunId + "' of job '" + jobName
- + "': TaskManager couldn't remove canceled tasks", e);
} catch (final Exception e) {
throw new JobManagerException("Error while canceling job run '" + jobRunId + "' of job '" + jobName + "'", e);
}
@@ -359,10 +353,10 @@
boolean hasActiveWorkflowRuns = _runStorage.checkAndCleanupActiveWorkflowRuns(jobName, jobRunId);
if (!hasActiveWorkflowRuns && _runStorage.getJobState(jobName) == JobState.FINISHING) {
// check if we need to create completion tasks
- final boolean success = _runStorage.setJobState(jobName, jobRunId, JobState.FINISHING, JobState.COMPLETING);
- if (success) {
+ final boolean ok = _runStorage.setJobState(jobName, jobRunId, JobState.FINISHING, JobState.COMPLETING);
+ if (ok) {
// only if we had a successful job run...
- if (jobRunWouldSucceed(_runStorage.getJobRunData(jobName, false))) {
+ if (jobRunWouldSucceed(jobName)) {
try {
addCompletionTaksForJobRun(jobName, jobRunId);
} catch (final TaskGeneratorException e) {
@@ -384,9 +378,15 @@
// if not, this has to wait until these completion tasks are completed.
hasActiveWorkflowRuns = _runStorage.checkAndCleanupActiveWorkflowRuns(jobName, jobRunId);
if (!hasActiveWorkflowRuns && _runStorage.getJobState(jobName) == JobState.COMPLETING) {
- final boolean success = _runStorage.setJobState(jobName, jobRunId, JobState.COMPLETING, JobState.CLEANINGUP);
- if (success) {
- completeJobRun(jobName, jobRunId, JobState.SUCCEEDED);
+ final boolean ok = _runStorage.setJobState(jobName, jobRunId, JobState.COMPLETING, JobState.CLEANINGUP);
+ if (ok) {
+ boolean succeeded = jobRunWouldSucceed(jobName);
+ if (succeeded) {
+ completeJobRun(jobName, jobRunId, JobState.SUCCEEDED);
+ } else {
+ completeJobRun(jobName, jobRunId, JobState.FAILED);
+ removeTasksQuietly(jobName, jobRunId);
+ }
} else {
_log.warn("Couldn't change job state from " + JobState.COMPLETING + " to " + JobState.CLEANINGUP
+ " for job run '" + jobRunId + "' of job '" + jobName + "'");
@@ -769,7 +769,7 @@
* @param jobRunId
* The id of the job run.
* @param finalState
- * final state: {@link JobState#SUCCEEDED} or {@link JobState#FAILED}.
+ * final state: {@link JobState#SUCCEEDED} or {@link JobState#FAILED or {@link JobState#CANCELED}.
* @throws JobManagerException
* error completing the job
*/
@@ -784,11 +784,6 @@
if (jobRunId.equals(currentJobRunId)) {
// get job run data (with details) from run storage
jobRunData.putAll(_runStorage.getJobRunData(jobName, true));
- if (finalState == JobState.SUCCEEDED) {
- if (!jobRunWouldSucceed(jobRunData)) {
- finalState = JobState.FAILED;
- }
- }
}
// store job run data in persistent storage
jobRunData.put(JobManagerConstants.DATA_JOB_STATE, finalState.name());
@@ -803,7 +798,8 @@
}
/** check if the job run would be successful. */
- private boolean jobRunWouldSucceed(final AnyMap jobRunData) {
+ private boolean jobRunWouldSucceed(final String jobName) throws RunStorageException {
+ final AnyMap jobRunData = _runStorage.getJobRunData(jobName, false);
// if workflow runs where started, but none was successful, the job actually failed.
final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
final int startedWorkflowRuns =
@@ -875,10 +871,10 @@
workerDefinition.getParametersByRange(JobManagerConstants.RANGE_JOB_NAME);
if (!jobNameParameters.isEmpty()) {
final AnyMap jobParameters = jobRunDefs.getJobDefinition().getParameters();
- AnyMap workflowParameters = jobRunDefs.getWorkflowDefinition()
- .getParameters();
+ AnyMap workflowParameters = jobRunDefs.getWorkflowDefinition().getParameters();
final AnyMap evaluatedParameters =
- TaskParameterUtils.mergeAndEvaluateParameters(jobParameters, workflowParameters, workflowAction.getParameters(), workflowAction.getWorker());
+ TaskParameterUtils.mergeAndEvaluateParameters(jobParameters, workflowParameters,
+ workflowAction.getParameters(), workflowAction.getWorker());
for (final ParameterDefinition jobNameParameter : jobNameParameters) {
final String jobNameValue = evaluatedParameters.getStringValue(jobNameParameter.getName());
if (jobNameValue != null) {
@@ -991,6 +987,19 @@
return referredJobs.containsKey(jobNameOfJobToBeFinished);
}
+ /** remove job run tasks in taskmanager. */
+ private void removeTasksQuietly(final String jobName, final String jobRunId) {
+ final AnyMap taskFilter = DataFactory.DEFAULT.createAnyMap();
+ taskFilter.put(Task.PROPERTY_JOB_NAME, jobName);
+ taskFilter.put(Task.PROPERTY_JOB_RUN_ID, jobRunId);
+ try {
+ _taskManager.removeTasks(taskFilter);
+ } catch (TaskmanagerException e) {
+ // remove quietly, so don't throw an exception here
+ _log.warn("Couldn't remove taskmanager tasks for job run " + jobRunId + " of job " + jobName);
+ }
+ }
+
/**
* Add a RequestHandler..
*