/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial | |
* implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.jobmanager.test; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.Formatter; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.Value; | |
import org.eclipse.smila.jobmanager.JobManagerConstants; | |
import org.eclipse.smila.jobmanager.JobManagerException; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
/** | |
* Tests using workflow simulation workers. | |
*/ | |
public class TestWorkflowSimulation extends JobManagerTestBase { | |
/** | |
* Runnable that can be stopped. | |
*/ | |
private interface Stoppable extends Runnable { | |
/** Stop this Runnable from running. */ | |
void stop(); | |
} | |
/** | |
* Worker base class. | |
* | |
*/ | |
abstract class WorkerBase implements Stoppable { | |
/** The task queue map. */ | |
protected Map<String, BlockingQueue<Task>> _taskQueueMap; | |
/** Flag indicating whether this job should be running. */ | |
protected volatile boolean _run = true; | |
/** | |
* Constructs a new WorkerBase object. | |
* | |
* @param taskQueueMap | |
* The Task queue map. | |
*/ | |
public WorkerBase(final Map<String, BlockingQueue<Task>> taskQueueMap) { | |
_taskQueueMap = taskQueueMap; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void stop() { | |
_run = false; | |
} | |
/** | |
* Finish a task and put eventually created follow up tasks into the task queue corresponding to the | |
* follow-up-worker. | |
* | |
* @param currentTask | |
* the task to be finished | |
* @param resultDescription | |
* the result description | |
* @throws JobManagerException | |
* exception while handling the task. | |
*/ | |
protected void finishTask(final Task currentTask, final ResultDescription resultDescription) | |
throws JobManagerException { | |
final StringBuilder report = new StringBuilder(); | |
final Formatter f = new Formatter(report); | |
f.format("Worker %s was %s in job %s\n", currentTask.getWorkerName(), resultDescription.getStatus(), | |
currentTask.getProperties()); | |
try { | |
final Collection<Task> taskList = getNextTasks(currentTask, resultDescription); | |
if (taskList.isEmpty()) { | |
f.format(" No follow-up tasks.\n"); | |
} else { | |
for (final Task task : taskList) { | |
f.format(" Follow-up task for %s in job %s\n", task.getWorkerName(), task.getProperties()); | |
_taskQueueMap.get(task.getWorkerName()).add(task); | |
} | |
} | |
} finally { | |
; // System.out.println(report.toString()); | |
} | |
} | |
} | |
/** | |
* input-"Worker". | |
* | |
*/ | |
private class InputWorker extends WorkerBase { | |
/** | |
* Construct a new Worker. | |
* | |
* @param taskQueueMap | |
* The task queue map. | |
*/ | |
public InputWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) { | |
super(taskQueueMap); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void run() { | |
while (_run) { | |
try { | |
if (_taskQueueMap.containsKey(WORKER_1)) { | |
final Task task = _taskQueueMap.get(WORKER_1).poll(100, TimeUnit.MILLISECONDS); | |
if (task != null) { | |
try { | |
createBulk(task, "output", "{\"record\": \"" | |
+ task.getOutputBulks().get("output").get(0).getObjectName() + "\"}"); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} | |
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null)); | |
} | |
} | |
} catch (final InterruptedException e) { | |
_run = false; | |
Thread.currentThread().interrupt(); | |
} catch (final JobManagerException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
/** | |
* intermediate-"Worker". | |
* | |
*/ | |
private class IntermediateWorker extends WorkerBase { | |
/** | |
* Construct a new Worker. | |
* | |
* @param taskQueueMap | |
* The task queue map. | |
*/ | |
public IntermediateWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) { | |
super(taskQueueMap); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void run() { | |
while (_run) { | |
try { | |
if (_taskQueueMap.containsKey(WORKER_2)) { | |
final Task task = _taskQueueMap.get(WORKER_2).poll(100, TimeUnit.MILLISECONDS); | |
if (task != null) { | |
try { | |
final String newData = | |
readBulk(task.getInputBulks().get("input").get(0)).replaceFirst("}$", | |
",\n{\"" + WORKER_2 + "\":\"was here\"}}"); | |
createBulk(task, "output", newData); | |
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null)); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} catch (final InterruptedException e) { | |
_run = false; | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
} | |
/** | |
* final-"Worker". | |
* | |
*/ | |
private class FinalWorker extends WorkerBase { | |
/** | |
* Construct a new Worker. | |
* | |
* @param taskQueueMap | |
* The task queue map. | |
*/ | |
public FinalWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) { | |
super(taskQueueMap); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void run() { | |
while (_run) { | |
try { | |
if (_taskQueueMap.containsKey(WORKER_3)) { | |
final Task task = _taskQueueMap.get(WORKER_3).poll(100, TimeUnit.MILLISECONDS); | |
if (task != null) { | |
try { | |
final String newData = | |
readBulk(task.getInputBulks().get("input").get(0)).replaceFirst("}$", | |
",\n{\"" + WORKER_3 + "\":\"was here\"}}"); | |
createBulk(task, "output", newData); | |
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null)); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} catch (final InterruptedException e) { | |
_run = false; | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
} | |
/** | |
* triggered-"Worker". | |
* | |
*/ | |
private class TriggeredWorker extends WorkerBase { | |
/** | |
* Construct a new Worker. | |
* | |
* @param taskQueueMap | |
* The task queue map. | |
*/ | |
public TriggeredWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) { | |
super(taskQueueMap); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void run() { | |
while (_run) { | |
try { | |
if (_taskQueueMap.containsKey(WORKER_4)) { | |
final Task task = _taskQueueMap.get(WORKER_4).poll(100, TimeUnit.MILLISECONDS); | |
if (task != null) { | |
try { | |
final String newData = | |
readBulk(task.getInputBulks().get("input").get(0)).replaceFirst("}$", | |
",\n{\"" + WORKER_4 + "\":\"was here\"}}"); | |
createBulk(task, "output", newData); | |
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null)); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} catch (final InterruptedException e) { | |
_run = false; | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
} | |
/** | |
* Push a task into the queue map. create a blocking queue if no such queue exists for given worker. | |
* | |
* @param task | |
* the task to push. | |
* @param workerName | |
* the name of the receiving worker. | |
* @param queues | |
* the queues map. | |
* @return the queues map. | |
*/ | |
private Map<String, BlockingQueue<Task>> pushTask(final Task task, final String workerName, | |
final Map<String, BlockingQueue<Task>> queues) { | |
if (!queues.containsKey(workerName)) { | |
queues.put(workerName, new LinkedBlockingQueue<Task>()); | |
} | |
queues.get(workerName).add(task); | |
return queues; | |
} | |
/** | |
* read a given bulk. | |
* | |
* @param bulk | |
* the bulk to read. | |
* @return The input of the bulk as String. | |
* @throws ObjectStoreException | |
* exception reading the bulk or accessing service/store. | |
*/ | |
protected String readBulk(final BulkInfo bulk) throws ObjectStoreException { | |
return new String(_objectStoreService.getObject(bulk.getStoreName(), bulk.getObjectName())); | |
} | |
/** | |
* Tests that a test-workflow job run triggers a running triggered-workflow job run. | |
* | |
* @throws Exception | |
* any exception that was thrown while trying to get through this test case. | |
*/ | |
public void testTriggerJobMultiThreaded() throws Exception { | |
final int noOfRuns = 6; | |
final long sleepTime = 100L; | |
final String jobNameInputOne = "testTriggerJobMultiThreadedOne"; | |
final String jobNameInputTwo = "testTriggerJobMultiThreadedTwo"; | |
final String workflowNameInput = "testWorkflow"; | |
final String workflowNameTriggered = "triggeredWorkflow"; | |
final String jobNameTriggeredOne = "testTriggerJobMultiThreadedTriggeredOne"; | |
final String jobNameTriggeredTwo = "testTriggerJobMultiThreadedTriggeredTwo"; | |
final String indexNameOne = "index1"; | |
final String indexNameTwo = "index2"; | |
String tempStoreName = "tmp-" + indexNameOne; | |
String storeName = "store-" + indexNameOne; | |
_objectStoreService.ensureStore(storeName); | |
_objectStoreService.ensureStore(tempStoreName); | |
AnyMap parameters = createJobParameters(indexNameOne, tempStoreName, storeName); | |
addJob(jobNameInputOne, workflowNameInput, parameters); | |
addJob(jobNameTriggeredOne, workflowNameTriggered, parameters); | |
tempStoreName = "tmp-" + indexNameTwo; | |
storeName = "store-" + indexNameTwo; | |
_objectStoreService.ensureStore(storeName); | |
_objectStoreService.ensureStore(tempStoreName); | |
parameters = createJobParameters(indexNameTwo, tempStoreName, storeName); | |
addJob(jobNameInputTwo, workflowNameInput, parameters); | |
addJob(jobNameTriggeredTwo, workflowNameTriggered, parameters); | |
_objectStoreService.ensureStore(indexNameOne); | |
_objectStoreService.ensureStore(indexNameTwo); | |
final String jobIdInputOne = startJob(jobNameInputOne); | |
final String jobIdTriggeredOne = startJob(jobNameTriggeredOne); | |
final String jobIdInputTwo = startJob(jobNameInputTwo); | |
final String jobIdTriggeredTwo = startJob(jobNameTriggeredTwo); | |
final Map<String, BlockingQueue<Task>> taskQueueMap = new LinkedHashMap<String, BlockingQueue<Task>>(); | |
final ExecutorService executor = Executors.newFixedThreadPool(50); | |
final Collection<Stoppable> workers = createAndStartWorkers(taskQueueMap, executor); | |
try { | |
for (int i = 0; i < noOfRuns; i++) { | |
// create new bulks and push tasks | |
pushTask(getInitialTask(WORKER_1, jobNameInputOne), WORKER_1, taskQueueMap); | |
pushTask(getInitialTask(WORKER_1, jobNameInputTwo), WORKER_1, taskQueueMap); | |
} | |
while (!taskQueueMap.get(WORKER_1).isEmpty()) { | |
Thread.sleep(sleepTime); | |
} | |
// wait for the first jobs to run out of tasks | |
final long maxWaitTime = 60000; | |
waitForJobRunFinished(jobNameTriggeredOne, jobIdTriggeredOne, maxWaitTime); | |
waitForJobRunFinished(jobNameTriggeredTwo, jobIdTriggeredTwo, maxWaitTime); | |
_jobManager.finishJob(jobNameInputOne, jobIdInputOne); | |
_jobManager.finishJob(jobNameInputTwo, jobIdInputTwo); | |
waitForJobRunFinished(jobNameInputOne, jobIdInputOne, maxWaitTime); | |
waitForJobRunFinished(jobNameInputTwo, jobIdInputTwo, maxWaitTime); | |
waitForJobRunFinished(jobNameTriggeredOne, jobIdTriggeredOne, maxWaitTime); | |
waitForJobRunFinished(jobNameTriggeredTwo, jobIdTriggeredTwo, maxWaitTime); | |
_jobManager.finishJob(jobNameTriggeredOne, jobIdTriggeredOne); | |
_jobManager.finishJob(jobNameTriggeredTwo, jobIdTriggeredTwo); | |
waitForJobRunCompleted(jobNameInputOne, jobIdInputOne, maxWaitTime); | |
waitForJobRunCompleted(jobNameInputTwo, jobIdInputTwo, maxWaitTime); | |
waitForJobRunCompleted(jobNameTriggeredOne, jobIdTriggeredOne, maxWaitTime); | |
waitForJobRunCompleted(jobNameTriggeredTwo, jobIdTriggeredTwo, maxWaitTime); | |
assertJobRunSucceeded(jobNameInputOne, jobIdInputOne, noOfRuns); | |
assertJobRunSucceeded(jobNameInputTwo, jobIdInputTwo, noOfRuns); | |
// check for two successful tasks. | |
AnyMap jobRunData = assertJobRunSucceeded(jobNameTriggeredOne, jobIdTriggeredOne); | |
AnyMap workerCounter = jobRunData.getMap("worker").getMap(WORKER_4); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, noOfRuns); | |
jobRunData = assertJobRunSucceeded(jobNameTriggeredTwo, jobIdTriggeredTwo); | |
workerCounter = jobRunData.getMap("worker").getMap(WORKER_4); | |
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, noOfRuns); | |
} finally { | |
for (final Stoppable worker : workers) { | |
worker.stop(); | |
} | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* Creates all workers, the queue and starts the workers with the given executor service. | |
* | |
* @param taskQueueMap | |
* the task queue map. | |
* @param executor | |
* the executor service | |
* @return the workers list | |
*/ | |
private Collection<Stoppable> createAndStartWorkers(final Map<String, BlockingQueue<Task>> taskQueueMap, | |
final ExecutorService executor) { | |
final Collection<Stoppable> workers = new ArrayList<TestWorkflowSimulation.Stoppable>(); | |
taskQueueMap.put(WORKER_1, new LinkedBlockingQueue<Task>()); | |
taskQueueMap.put(WORKER_2, new LinkedBlockingQueue<Task>()); | |
taskQueueMap.put(WORKER_3, new LinkedBlockingQueue<Task>()); | |
taskQueueMap.put(WORKER_4, new LinkedBlockingQueue<Task>()); | |
Collections.addAll(workers, new IntermediateWorker(taskQueueMap), new IntermediateWorker(taskQueueMap), | |
new IntermediateWorker(taskQueueMap), new IntermediateWorker(taskQueueMap)); | |
Collections.addAll(workers, new FinalWorker(taskQueueMap), new FinalWorker(taskQueueMap), new FinalWorker( | |
taskQueueMap), new FinalWorker(taskQueueMap)); | |
Collections.addAll(workers, new InputWorker(taskQueueMap), new InputWorker(taskQueueMap), new InputWorker( | |
taskQueueMap), new InputWorker(taskQueueMap)); | |
Collections.addAll(workers, new TriggeredWorker(taskQueueMap), new TriggeredWorker(taskQueueMap), | |
new TriggeredWorker(taskQueueMap), new TriggeredWorker(taskQueueMap)); | |
for (final Runnable worker : workers) { | |
executor.execute(worker); | |
} | |
return workers; | |
} | |
/** | |
* Waits for a job to finish its workflow runs. | |
* | |
* @param jobName | |
* the name of the job | |
* @param jobId | |
* the id of the job run | |
* @param maxWaitTime | |
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail. | |
* @throws Exception | |
* an exception occurred while trying to get job run data | |
*/ | |
private void waitForJobRunFinished(final String jobName, final String jobId, final long maxWaitTime) | |
throws Exception { | |
final long sleepTime = 500L; | |
final long millisStarted = System.currentTimeMillis(); | |
AnyMap jobRunData = null; | |
AnyMap workflowRunData = null; | |
while (true) { | |
jobRunData = _jobManager.getJobRunData(jobName, jobId); | |
if (jobRunData != null) { | |
workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER); | |
final Any activeRunsAny = workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS); | |
if (activeRunsAny != null) { | |
assertTrue("wrong type of " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS, | |
activeRunsAny.isLong()); | |
final int activeRuns = ((Value) activeRunsAny).asLong().intValue(); | |
assertTrue("negative value for " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS, | |
activeRuns >= 0); | |
if (activeRuns == 0) { | |
return; | |
} | |
} | |
} | |
assertTrue("Waited too long for job to finish. Latest job run data: " + jobRunData, | |
System.currentTimeMillis() - millisStarted <= maxWaitTime); | |
Thread.sleep(sleepTime); | |
} | |
} | |
} |