blob: fda016968b98c40fed3ce1d441906ba99867ef51 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher, Andreas Weber, Drazen Cindric, Andreas Schank (all Attensity Europe GmbH) - initial
* implementation
**********************************************************************************************************************/
package org.eclipse.smila.jobmanager.test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Formatter;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.jobmanager.JobManagerConstants;
import org.eclipse.smila.jobmanager.JobManagerException;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
/**
* Tests using workflow simulation workers.
*/
public class TestWorkflowSimulation extends JobManagerTestBase {
/**
* Runnable that can be stopped.
*/
private interface Stoppable extends Runnable {
/** Stop this Runnable from running. */
void stop();
}
/**
* Worker base class.
*
*/
abstract class WorkerBase implements Stoppable {
/** The task queue map. */
protected Map<String, BlockingQueue<Task>> _taskQueueMap;
/** Flag indicating whether this job should be running. */
protected volatile boolean _run = true;
/**
* Constructs a new WorkerBase object.
*
* @param taskQueueMap
* The Task queue map.
*/
public WorkerBase(final Map<String, BlockingQueue<Task>> taskQueueMap) {
_taskQueueMap = taskQueueMap;
}
/**
* {@inheritDoc}
*/
@Override
public void stop() {
_run = false;
}
/**
* Finish a task and put eventually created follow up tasks into the task queue corresponding to the
* follow-up-worker.
*
* @param currentTask
* the task to be finished
* @param resultDescription
* the result description
* @throws JobManagerException
* exception while handling the task.
*/
protected void finishTask(final Task currentTask, final ResultDescription resultDescription)
throws JobManagerException {
final StringBuilder report = new StringBuilder();
final Formatter f = new Formatter(report);
f.format("Worker %s was %s in job %s\n", currentTask.getWorkerName(), resultDescription.getStatus(),
currentTask.getProperties());
try {
final Collection<Task> taskList = getNextTasks(currentTask, resultDescription);
if (taskList.isEmpty()) {
f.format(" No follow-up tasks.\n");
} else {
for (final Task task : taskList) {
f.format(" Follow-up task for %s in job %s\n", task.getWorkerName(), task.getProperties());
_taskQueueMap.get(task.getWorkerName()).add(task);
}
}
} finally {
; // System.out.println(report.toString());
}
}
}
/**
* input-"Worker".
*
*/
private class InputWorker extends WorkerBase {
/**
* Construct a new Worker.
*
* @param taskQueueMap
* The task queue map.
*/
public InputWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) {
super(taskQueueMap);
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
while (_run) {
try {
if (_taskQueueMap.containsKey(WORKER_1)) {
final Task task = _taskQueueMap.get(WORKER_1).poll(100, TimeUnit.MILLISECONDS);
if (task != null) {
try {
createBulk(task, "output", "{\"record\": \""
+ task.getOutputBulks().get("output").get(0).getObjectName() + "\"}");
} catch (final Exception e) {
e.printStackTrace();
}
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null));
}
}
} catch (final InterruptedException e) {
_run = false;
Thread.currentThread().interrupt();
} catch (final JobManagerException e) {
e.printStackTrace();
}
}
}
}
/**
* intermediate-"Worker".
*
*/
private class IntermediateWorker extends WorkerBase {
/**
* Construct a new Worker.
*
* @param taskQueueMap
* The task queue map.
*/
public IntermediateWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) {
super(taskQueueMap);
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
while (_run) {
try {
if (_taskQueueMap.containsKey(WORKER_2)) {
final Task task = _taskQueueMap.get(WORKER_2).poll(100, TimeUnit.MILLISECONDS);
if (task != null) {
try {
final String newData =
readBulk(task.getInputBulks().get("input").get(0)).replaceFirst("}$",
",\n{\"" + WORKER_2 + "\":\"was here\"}}");
createBulk(task, "output", newData);
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null));
} catch (final Exception e) {
e.printStackTrace();
}
}
}
} catch (final InterruptedException e) {
_run = false;
Thread.currentThread().interrupt();
}
}
}
}
/**
* final-"Worker".
*
*/
private class FinalWorker extends WorkerBase {
/**
* Construct a new Worker.
*
* @param taskQueueMap
* The task queue map.
*/
public FinalWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) {
super(taskQueueMap);
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
while (_run) {
try {
if (_taskQueueMap.containsKey(WORKER_3)) {
final Task task = _taskQueueMap.get(WORKER_3).poll(100, TimeUnit.MILLISECONDS);
if (task != null) {
try {
final String newData =
readBulk(task.getInputBulks().get("input").get(0)).replaceFirst("}$",
",\n{\"" + WORKER_3 + "\":\"was here\"}}");
createBulk(task, "output", newData);
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null));
} catch (final Exception e) {
e.printStackTrace();
}
}
}
} catch (final InterruptedException e) {
_run = false;
Thread.currentThread().interrupt();
}
}
}
}
/**
* triggered-"Worker".
*
*/
private class TriggeredWorker extends WorkerBase {
/**
* Construct a new Worker.
*
* @param taskQueueMap
* The task queue map.
*/
public TriggeredWorker(final Map<String, BlockingQueue<Task>> taskQueueMap) {
super(taskQueueMap);
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
while (_run) {
try {
if (_taskQueueMap.containsKey(WORKER_4)) {
final Task task = _taskQueueMap.get(WORKER_4).poll(100, TimeUnit.MILLISECONDS);
if (task != null) {
try {
final String newData =
readBulk(task.getInputBulks().get("input").get(0)).replaceFirst("}$",
",\n{\"" + WORKER_4 + "\":\"was here\"}}");
createBulk(task, "output", newData);
finishTask(task, new ResultDescription(TaskCompletionStatus.SUCCESSFUL, null, null, null));
} catch (final Exception e) {
e.printStackTrace();
}
}
}
} catch (final InterruptedException e) {
_run = false;
Thread.currentThread().interrupt();
}
}
}
}
/**
* Push a task into the queue map. create a blocking queue if no such queue exists for given worker.
*
* @param task
* the task to push.
* @param workerName
* the name of the receiving worker.
* @param queues
* the queues map.
* @return the queues map.
*/
private Map<String, BlockingQueue<Task>> pushTask(final Task task, final String workerName,
final Map<String, BlockingQueue<Task>> queues) {
if (!queues.containsKey(workerName)) {
queues.put(workerName, new LinkedBlockingQueue<Task>());
}
queues.get(workerName).add(task);
return queues;
}
/**
* read a given bulk.
*
* @param bulk
* the bulk to read.
* @return The input of the bulk as String.
* @throws ObjectStoreException
* exception reading the bulk or accessing service/store.
*/
protected String readBulk(final BulkInfo bulk) throws ObjectStoreException {
return new String(_objectStoreService.getObject(bulk.getStoreName(), bulk.getObjectName()));
}
/**
* Tests that a test-workflow job run triggers a running triggered-workflow job run.
*
* @throws Exception
* any exception that was thrown while trying to get through this test case.
*/
public void testTriggerJobMultiThreaded() throws Exception {
final int noOfRuns = 6;
final long sleepTime = 100L;
final String jobNameInputOne = "testTriggerJobMultiThreadedOne";
final String jobNameInputTwo = "testTriggerJobMultiThreadedTwo";
final String workflowNameInput = "testWorkflow";
final String workflowNameTriggered = "triggeredWorkflow";
final String jobNameTriggeredOne = "testTriggerJobMultiThreadedTriggeredOne";
final String jobNameTriggeredTwo = "testTriggerJobMultiThreadedTriggeredTwo";
final String indexNameOne = "index1";
final String indexNameTwo = "index2";
String tempStoreName = "tmp-" + indexNameOne;
String storeName = "store-" + indexNameOne;
_objectStoreService.ensureStore(storeName);
_objectStoreService.ensureStore(tempStoreName);
AnyMap parameters = createJobParameters(indexNameOne, tempStoreName, storeName);
addJob(jobNameInputOne, workflowNameInput, parameters);
addJob(jobNameTriggeredOne, workflowNameTriggered, parameters);
tempStoreName = "tmp-" + indexNameTwo;
storeName = "store-" + indexNameTwo;
_objectStoreService.ensureStore(storeName);
_objectStoreService.ensureStore(tempStoreName);
parameters = createJobParameters(indexNameTwo, tempStoreName, storeName);
addJob(jobNameInputTwo, workflowNameInput, parameters);
addJob(jobNameTriggeredTwo, workflowNameTriggered, parameters);
_objectStoreService.ensureStore(indexNameOne);
_objectStoreService.ensureStore(indexNameTwo);
final String jobIdInputOne = startJob(jobNameInputOne);
final String jobIdTriggeredOne = startJob(jobNameTriggeredOne);
final String jobIdInputTwo = startJob(jobNameInputTwo);
final String jobIdTriggeredTwo = startJob(jobNameTriggeredTwo);
final Map<String, BlockingQueue<Task>> taskQueueMap = new LinkedHashMap<String, BlockingQueue<Task>>();
final ExecutorService executor = Executors.newFixedThreadPool(50);
final Collection<Stoppable> workers = createAndStartWorkers(taskQueueMap, executor);
try {
for (int i = 0; i < noOfRuns; i++) {
// create new bulks and push tasks
pushTask(getInitialTask(WORKER_1, jobNameInputOne), WORKER_1, taskQueueMap);
pushTask(getInitialTask(WORKER_1, jobNameInputTwo), WORKER_1, taskQueueMap);
}
while (!taskQueueMap.get(WORKER_1).isEmpty()) {
Thread.sleep(sleepTime);
}
// wait for the first jobs to run out of tasks
final long maxWaitTime = 60000;
waitForJobRunFinished(jobNameTriggeredOne, jobIdTriggeredOne, maxWaitTime);
waitForJobRunFinished(jobNameTriggeredTwo, jobIdTriggeredTwo, maxWaitTime);
_jobManager.finishJob(jobNameInputOne, jobIdInputOne);
_jobManager.finishJob(jobNameInputTwo, jobIdInputTwo);
waitForJobRunFinished(jobNameInputOne, jobIdInputOne, maxWaitTime);
waitForJobRunFinished(jobNameInputTwo, jobIdInputTwo, maxWaitTime);
waitForJobRunFinished(jobNameTriggeredOne, jobIdTriggeredOne, maxWaitTime);
waitForJobRunFinished(jobNameTriggeredTwo, jobIdTriggeredTwo, maxWaitTime);
_jobManager.finishJob(jobNameTriggeredOne, jobIdTriggeredOne);
_jobManager.finishJob(jobNameTriggeredTwo, jobIdTriggeredTwo);
waitForJobRunCompleted(jobNameInputOne, jobIdInputOne, maxWaitTime);
waitForJobRunCompleted(jobNameInputTwo, jobIdInputTwo, maxWaitTime);
waitForJobRunCompleted(jobNameTriggeredOne, jobIdTriggeredOne, maxWaitTime);
waitForJobRunCompleted(jobNameTriggeredTwo, jobIdTriggeredTwo, maxWaitTime);
assertJobRunSucceeded(jobNameInputOne, jobIdInputOne, noOfRuns);
assertJobRunSucceeded(jobNameInputTwo, jobIdInputTwo, noOfRuns);
// check for two successful tasks.
AnyMap jobRunData = assertJobRunSucceeded(jobNameTriggeredOne, jobIdTriggeredOne);
AnyMap workerCounter = jobRunData.getMap("worker").getMap(WORKER_4);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, noOfRuns);
jobRunData = assertJobRunSucceeded(jobNameTriggeredTwo, jobIdTriggeredTwo);
workerCounter = jobRunData.getMap("worker").getMap(WORKER_4);
assertWorkerTaskCounter(workerCounter, JobManagerConstants.DATA_JOB_NO_OF_SUCCESSFUL_TASKS, noOfRuns);
} finally {
for (final Stoppable worker : workers) {
worker.stop();
}
executor.shutdownNow();
}
}
/**
* Creates all workers, the queue and starts the workers with the given executor service.
*
* @param taskQueueMap
* the task queue map.
* @param executor
* the executor service
* @return the workers list
*/
private Collection<Stoppable> createAndStartWorkers(final Map<String, BlockingQueue<Task>> taskQueueMap,
final ExecutorService executor) {
final Collection<Stoppable> workers = new ArrayList<TestWorkflowSimulation.Stoppable>();
taskQueueMap.put(WORKER_1, new LinkedBlockingQueue<Task>());
taskQueueMap.put(WORKER_2, new LinkedBlockingQueue<Task>());
taskQueueMap.put(WORKER_3, new LinkedBlockingQueue<Task>());
taskQueueMap.put(WORKER_4, new LinkedBlockingQueue<Task>());
Collections.addAll(workers, new IntermediateWorker(taskQueueMap), new IntermediateWorker(taskQueueMap),
new IntermediateWorker(taskQueueMap), new IntermediateWorker(taskQueueMap));
Collections.addAll(workers, new FinalWorker(taskQueueMap), new FinalWorker(taskQueueMap), new FinalWorker(
taskQueueMap), new FinalWorker(taskQueueMap));
Collections.addAll(workers, new InputWorker(taskQueueMap), new InputWorker(taskQueueMap), new InputWorker(
taskQueueMap), new InputWorker(taskQueueMap));
Collections.addAll(workers, new TriggeredWorker(taskQueueMap), new TriggeredWorker(taskQueueMap),
new TriggeredWorker(taskQueueMap), new TriggeredWorker(taskQueueMap));
for (final Runnable worker : workers) {
executor.execute(worker);
}
return workers;
}
/**
* Waits for a job to finish its workflow runs.
*
* @param jobName
* the name of the job
* @param jobId
* the id of the job run
* @param maxWaitTime
* max wait time in milliseconds, if the waiting exceeds this wait time an assertion will fail.
* @throws Exception
* an exception occurred while trying to get job run data
*/
private void waitForJobRunFinished(final String jobName, final String jobId, final long maxWaitTime)
throws Exception {
final long sleepTime = 500L;
final long millisStarted = System.currentTimeMillis();
AnyMap jobRunData = null;
AnyMap workflowRunData = null;
while (true) {
jobRunData = _jobManager.getJobRunData(jobName, jobId);
if (jobRunData != null) {
workflowRunData = jobRunData.getMap(JobManagerConstants.WORKFLOW_RUN_COUNTER);
final Any activeRunsAny = workflowRunData.get(JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS);
if (activeRunsAny != null) {
assertTrue("wrong type of " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS,
activeRunsAny.isLong());
final int activeRuns = ((Value) activeRunsAny).asLong().intValue();
assertTrue("negative value for " + JobManagerConstants.DATA_JOB_NO_OF_ACTIVE_WORKFLOW_RUNS,
activeRuns >= 0);
if (activeRuns == 0) {
return;
}
}
}
assertTrue("Waited too long for job to finish. Latest job run data: " + jobRunData,
System.currentTimeMillis() - millisStarted <= maxWaitTime);
Thread.sleep(sleepTime);
}
}
}