remove tasks from Taskmanager for failed job runs
diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java
index eb12385..10f99f4 100644
--- a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/AllTests.java
@@ -75,6 +75,7 @@
     suite.addTestSuite(TestWorkflowRunCanceling.class);

     suite.addTestSuite(TestWorkflowRunControlHandler.class);

     suite.addTestSuite(TestWorkflowRunInfo.class);

+    suite.addTestSuite(TestJobRunFails.class);

     // $JUnit-END$

     return suite;

   }

diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java
index 3b88743..950ddb7 100644
--- a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/JobTaskProcessingTestBase.java
@@ -24,6 +24,8 @@
 import org.eclipse.smila.taskmanager.ResultDescription;

 import org.eclipse.smila.taskmanager.Task;

 import org.eclipse.smila.taskmanager.TaskCompletionStatus;

+import org.eclipse.smila.taskmanager.TaskCounter;

+import org.eclipse.smila.taskmanager.TaskmanagerException;

 import org.eclipse.smila.taskmanager.persistence.TaskStorage;

 

 /**

@@ -201,7 +203,7 @@
 

   /** worker with optional parameter. */

   public static final String WORKER_OPTIONAL_PARAM = "testOptionalParametersWorker";

-  

+

   /** worker counter name for input objects. */

   public static final String INPUT_OBJECT_COUNT = "inputObjectCount";

 

@@ -226,7 +228,7 @@
     _jobTaskProcessor = getService(JobTaskProcessor.class);

     assertNotNull(_jobTaskProcessor);

     _taskStorage = getService(TaskStorage.class);

-    assertNotNull(_taskStorage);    

+    assertNotNull(_taskStorage);

   }

 

   /**

@@ -561,4 +563,13 @@
       workflowData.getLongValue(JobManagerConstants.DATA_WORKFLOW_RUN_NO_OF_TRANSIENT_BULKS).intValue());

   }

 

+  /** check task counters for worker. */

+  protected void assertTaskManagerCounter(final String workerName, final int expectedTodo,

+    final int expectedInProgress) throws TaskmanagerException {

+    final Map<String, TaskCounter> counters = _taskStorage.getTaskCounters();

+    final TaskCounter counter = counters.get(workerName);

+    assertEquals(expectedTodo, counter.getTasksTodo());

+    assertEquals(expectedInProgress, counter.getTasksInProgress());

+  }

+

 }

diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java
index 2bbed28..9dbd6da 100644
--- a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobCanceling.java
@@ -31,7 +31,7 @@
  * 

  */

 public class TestJobCanceling extends JobTaskProcessingTestBase {

-  /** {@inheritDoc} */

+

   @Override

   protected void tearDown() throws Exception {

     super.tearDown();

@@ -78,8 +78,7 @@
     final Task intermediateTask =

       getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);

     final ResultDescription intermediateResult =

-      processWorker2Task(intermediateTask, workerParameterValue, tempStoreName);

-

+      processWorker2Task(intermediateTask, workerParameterValue, tempStoreName);    

     _jobRunEngine.cancelJob(jobName, jobId);

     try {

       getSingleNextTask(intermediateTask, intermediateResult, WORKER_3);

@@ -90,6 +89,8 @@
     final AnyMap jobRunData = assertJobRunCanceled(jobName, jobId);

     assertWorkflowCounters(1, 0, 0, 0, 1, jobRunData);

     assertTaskCounters(2, 1, 0, 0, 0, 0, 1, jobRunData);

+    // assert that tasks were removed from taskmanager:

+    assertTaskManagerCounter(WORKER_2, 0, 0);

   }

 

   /**

@@ -113,7 +114,7 @@
     getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);

 

     _jobRunEngine.cancelJob(jobName, jobId);

-    assertObjectsDeleted(transientObjects); // checks that transient data was removed    

+    assertObjectsDeleted(transientObjects); // checks that transient data was removed

   }

 

   /**

@@ -311,15 +312,15 @@
       _objectStoreService.putObject(storeName, "bigBucket/" + String.valueOf(i),

         _ipcRecordWriter.writeBinaryObject(r));

     }

-    // add 1 object to inBucket 

+    // add 1 object to inBucket

     final Record startRecord = DataFactory.DEFAULT.createRecord("id");

     _objectStoreService.putObject(storeName, "inBucket/startid", _ipcRecordWriter.writeBinaryObject(startRecord));

 

-    // start run once job 

+    // start run once job

     // -> 1 task from inBucket will have 100 follow up tasks (test workflow uses combine-taskgenerator)

     final String jobRunId = _jobRunEngine.startJob(jobName, JobRunMode.RUNONCE);

 

-    // wait until follow up task generation has begun, afterwards cancel job    

+    // wait until follow up task generation has begun, afterwards cancel job

     TaskCounter counter = null;

     final int maxNoOfLoops = 1000;

     int noOfLoops = 0;

@@ -333,7 +334,7 @@
     _jobRunEngine.cancelJob(jobName, jobRunId);

 

     // there may be tasks that were added to Taskmanager after (resp. while) job canceling.

-    // Make sure that they are proccessed by worker (with errors on finish of course, but that's ok)    

+    // Make sure that they are proccessed by worker (with errors on finish of course, but that's ok)

     noOfLoops = 0;

     TaskCounter counterFinish = null;

     do {

@@ -353,4 +354,5 @@
     assertTrue("Remaining tasks in Taskmanager's '_finishingTasks' INPROGRESS queue",

       counterFinish.getTasksTodo() == 0);

   }

+

 }

diff --git a/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobRunFails.java b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobRunFails.java
new file mode 100644
index 0000000..3e97ec2
--- /dev/null
+++ b/core/org.eclipse.smila.jobmanager.test/code/src/org/eclipse/smila/jobmanager/test/TestJobRunFails.java
@@ -0,0 +1,51 @@
+/**********************************************************************************************************************

+ * Copyright (c) 2008, 2014 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This

+ * program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which

+ * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html

+ *

+ * Contributors: Andreas Weber (Empolis Information Management GmbH) - initial implementation

+ **********************************************************************************************************************/

+package org.eclipse.smila.jobmanager.test;

+

+import org.eclipse.smila.datamodel.AnyMap;

+import org.eclipse.smila.taskmanager.ResultDescription;

+import org.eclipse.smila.taskmanager.Task;

+import org.eclipse.smila.taskmanager.TaskCompletionStatus;

+import org.eclipse.smila.taskmanager.TaskManager;

+

+/**

+ * Tests for Job Run Canceling.

+ * 

+ */

+public class TestJobRunFails extends JobTaskProcessingTestBase {

+

+  /** check that tasks are removed from taskmanager when job run failed. */

+  public void testTasksAreRemoved() throws Exception {

+    final String jobName = "testFail";

+    final String workerParameterValue = "test";

+    final String tempStoreName = "tempstore";

+    final AnyMap parameters = createJobParameters(workerParameterValue, tempStoreName, tempStoreName);

+    addJob(jobName, "testWorkflow", parameters);

+    _objectStoreService.ensureStore(tempStoreName);

+    final String jobRunId = startJob(jobName);

+    assertJobRunning(jobName, jobRunId);

+

+    final Task inputTask = _jobTaskProcessor.getInitialTask(WORKER_1, jobName);

+    final Task intermediateTask =

+      getSingleNextTask(inputTask, processInitialTask(inputTask, tempStoreName), WORKER_2);

+

+    final ResultDescription errorResult = new ResultDescription(TaskCompletionStatus.FATAL_ERROR, null, null, null);

+    final Task finishTaskForIntermediate =

+      intermediateTask.createFinishTask(errorResult, TaskManager.FINISHING_TASKS_WORKER);

+    _jobTaskProcessor.finishTask(finishTaskForIntermediate);

+    _jobRunEngine.finishJob(jobName, jobRunId);

+

+    final AnyMap jobRunData = assertJobRunFailed(jobName, jobRunId);

+    assertWorkflowCounters(1, 0, 0, 1, 0, jobRunData);

+    assertTaskCounters(2, 1, 0, 0, 1, 0, 0, jobRunData);

+    // assert that tasks were removed from taskmanager:

+    assertTaskManagerCounter(WORKER_1, 0, 0);

+    assertTaskManagerCounter(WORKER_2, 0, 0);

+  }

+

+}

diff --git a/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java b/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java
index 16d8984..32f03aa 100644
--- a/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java
+++ b/core/org.eclipse.smila.jobmanager/code/src/org/eclipse/smila/jobmanager/internal/JobRunEngineImpl.java
@@ -256,10 +256,7 @@
 

     try {

       // remove tasks in taskmanager

-      final AnyMap taskFilter = DataFactory.DEFAULT.createAnyMap();

-      taskFilter.put(Task.PROPERTY_JOB_NAME, jobName);

-      taskFilter.put(Task.PROPERTY_JOB_RUN_ID, jobRunId);

-      _taskManager.removeTasks(taskFilter);

+      removeTasksQuietly(jobName, jobRunId);

 

       // remove transient data for canceled workflow runs before canceling the workflow runs in run storage,

       // otherwise transient data won't be deleted, because info about what to delete is lost.

@@ -273,9 +270,6 @@
 

       // persist job run data in objectstore, delete job run data in ZK, set job run state on CANCELED

       completeJobRun(jobName, jobRunId, JobState.CANCELED);

-    } catch (final TaskmanagerException e) {

-      throw new JobManagerException("Error while canceling job run '" + jobRunId + "' of job '" + jobName

-        + "': TaskManager couldn't remove canceled tasks", e);

     } catch (final Exception e) {

       throw new JobManagerException("Error while canceling job run '" + jobRunId + "' of job '" + jobName + "'", e);

     }

@@ -359,10 +353,10 @@
     boolean hasActiveWorkflowRuns = _runStorage.checkAndCleanupActiveWorkflowRuns(jobName, jobRunId);

     if (!hasActiveWorkflowRuns && _runStorage.getJobState(jobName) == JobState.FINISHING) {

       // check if we need to create completion tasks

-      final boolean success = _runStorage.setJobState(jobName, jobRunId, JobState.FINISHING, JobState.COMPLETING);

-      if (success) {

+      final boolean ok = _runStorage.setJobState(jobName, jobRunId, JobState.FINISHING, JobState.COMPLETING);

+      if (ok) {

         // only if we had a successful job run...

-        if (jobRunWouldSucceed(_runStorage.getJobRunData(jobName, false))) {

+        if (jobRunWouldSucceed(jobName)) {

           try {

             addCompletionTaksForJobRun(jobName, jobRunId);

           } catch (final TaskGeneratorException e) {

@@ -384,9 +378,15 @@
     // if not, this has to wait until these completion tasks are completed.

     hasActiveWorkflowRuns = _runStorage.checkAndCleanupActiveWorkflowRuns(jobName, jobRunId);

     if (!hasActiveWorkflowRuns && _runStorage.getJobState(jobName) == JobState.COMPLETING) {

-      final boolean success = _runStorage.setJobState(jobName, jobRunId, JobState.COMPLETING, JobState.CLEANINGUP);

-      if (success) {

-        completeJobRun(jobName, jobRunId, JobState.SUCCEEDED);

+      final boolean ok = _runStorage.setJobState(jobName, jobRunId, JobState.COMPLETING, JobState.CLEANINGUP);

+      if (ok) {

+        boolean succeeded = jobRunWouldSucceed(jobName);

+        if (succeeded) {

+          completeJobRun(jobName, jobRunId, JobState.SUCCEEDED);

+        } else {

+          completeJobRun(jobName, jobRunId, JobState.FAILED);

+          removeTasksQuietly(jobName, jobRunId);

+        }

       } else {

         _log.warn("Couldn't change job state from " + JobState.COMPLETING + " to " + JobState.CLEANINGUP

           + " for job run '" + jobRunId + "' of job '" + jobName + "'");

@@ -769,7 +769,7 @@
    * @param jobRunId

    *          The id of the job run.

    * @param finalState

-   *          final state: {@link JobState#SUCCEEDED} or {@link JobState#FAILED}.

+   *          final state: {@link JobState#SUCCEEDED} or {@link JobState#FAILED or {@link JobState#CANCELED}.

    * @throws JobManagerException

    *           error completing the job

    */

@@ -784,11 +784,6 @@
     if (jobRunId.equals(currentJobRunId)) {

       // get job run data (with details) from run storage

       jobRunData.putAll(_runStorage.getJobRunData(jobName, true));

-      if (finalState == JobState.SUCCEEDED) {

-        if (!jobRunWouldSucceed(jobRunData)) {

-          finalState = JobState.FAILED;

-        }

-      }

     }

     // store job run data in persistent storage

     jobRunData.put(JobManagerConstants.DATA_JOB_STATE, finalState.name());

@@ -803,7 +798,8 @@
   }

 

   /** check if the job run would be successful. */

-  private boolean jobRunWouldSucceed(final AnyMap jobRunData) {

+  private boolean jobRunWouldSucceed(final String jobName) throws RunStorageException {

+    final AnyMap jobRunData = _runStorage.getJobRunData(jobName, false);

     // if workflow runs where started, but none was successful, the job actually failed.

     final AnyMap workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);

     final int startedWorkflowRuns =

@@ -875,10 +871,10 @@
         workerDefinition.getParametersByRange(JobManagerConstants.RANGE_JOB_NAME);

       if (!jobNameParameters.isEmpty()) {

         final AnyMap jobParameters = jobRunDefs.getJobDefinition().getParameters();

-        AnyMap workflowParameters = jobRunDefs.getWorkflowDefinition()

-          .getParameters();

+        AnyMap workflowParameters = jobRunDefs.getWorkflowDefinition().getParameters();

         final AnyMap evaluatedParameters =

-          TaskParameterUtils.mergeAndEvaluateParameters(jobParameters, workflowParameters, workflowAction.getParameters(), workflowAction.getWorker());

+          TaskParameterUtils.mergeAndEvaluateParameters(jobParameters, workflowParameters,

+            workflowAction.getParameters(), workflowAction.getWorker());

         for (final ParameterDefinition jobNameParameter : jobNameParameters) {

           final String jobNameValue = evaluatedParameters.getStringValue(jobNameParameter.getName());

           if (jobNameValue != null) {

@@ -991,6 +987,19 @@
     return referredJobs.containsKey(jobNameOfJobToBeFinished);

   }

 

+  /** remove job run tasks in taskmanager. */

+  private void removeTasksQuietly(final String jobName, final String jobRunId) {

+    final AnyMap taskFilter = DataFactory.DEFAULT.createAnyMap();

+    taskFilter.put(Task.PROPERTY_JOB_NAME, jobName);

+    taskFilter.put(Task.PROPERTY_JOB_RUN_ID, jobRunId);

+    try {

+      _taskManager.removeTasks(taskFilter);

+    } catch (TaskmanagerException e) {

+      // remove quietly, so don't throw an exception here

+      _log.warn("Couldn't remove taskmanager tasks for job run " + jobRunId + " of job " + jobName);

+    }

+  }

+

   /**

    * Add a RequestHandler..

    *