blob: f7a8c73965042781191ee94b93b4db378785284a [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk.test;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskCounter;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.eclipse.smila.taskmanager.persistence.zk.TaskStorageZk;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/**
* Test case to check multithreaded access to the TaskStorageJmsImpl.
*/
public class TestTaskStorageZkMultithreaded extends DeclarativeServiceTestCase {
/** number of worker hosts needed for scale up control. */
private static final int NO_OF_WORKER_HOSTS = 10;
/** Randomizer for random sleep values. */
private static final Random RANDOMIZER = new Random();
/** scale up control. */
private static final long MAX_NO_OF_TASKS_PER_WORKER_HOST = 3;
/** pipename for tests. */
private static final String PIPE_NAME = "testPipe";
/** storage under test. */
private TaskStorage _service;
private final AtomicInteger _taskIdGenerator = new AtomicInteger();
/**
* clean up test pipe.
*
* @throws Exception
* getting service
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_service = getService(TaskStorage.class);
assertNotNull(_service);
assertTrue(_service instanceof TaskStorageZk);
_service.clear();
((TaskStorageZk) _service).checkQualifierLockAge(0);
}
/**
* Test multithreaded send and receive.
*/
public void testSendAndReceive() throws Exception {
final String workerName = PIPE_NAME;
assertTaskCounter(_service.getTaskCounters(), workerName, 0, 0);
final int maxNumberOfParallellThreads = 100;
final ExecutorService executor = Executors.newFixedThreadPool(maxNumberOfParallellThreads);
try {
final CompletionService<Task> completionService = new ExecutorCompletionService<Task>(executor);
// for all instances: execute 'command'
for (int i = 0; i < maxNumberOfParallellThreads; i++) {
completionService.submit(new TaskCommand(_service, workerName, null));
}
// wait for termination of submitted parallel threads
for (int i = 0; i < maxNumberOfParallellThreads; i++) {
final Future<Task> f = completionService.take();
final Task task = f.get();
assertNotNull(task);
}
} finally {
executor.shutdownNow();
}
Thread.sleep(TaskCommand.MAX_WORKING_TIME + 1000); // to be sure that all workers have finished
assertTaskCounter(_service.getTaskCounters(), workerName, 0, 0);
final Task task = _service.getTask(workerName, null, null);
assertNull(task);
}
/**
* Test multithreaded send and receive with scale up control.
*/
public void testSendAndReceiveWithScaleUpControl() throws Exception {
final String workerName = PIPE_NAME;
assertTaskCounter(_service.getTaskCounters(), workerName, 0, 0);
_service.setMaxNoOfTasksPerHost(MAX_NO_OF_TASKS_PER_WORKER_HOST);
assertTrue("scale up counters should be empty but was: " + _service.getScaleUpCounters(), _service
.getScaleUpCounters().isEmpty());
final int maxNumberOfParallellThreads = 100;
final ExecutorService executor = Executors.newFixedThreadPool(maxNumberOfParallellThreads);
try {
final CompletionService<Task> completionService = new ExecutorCompletionService<Task>(executor);
// for all instances: execute 'command'
for (int i = 0; i < maxNumberOfParallellThreads; i++) {
// simulate 10 worker hosts
completionService.submit(new TaskCommand(_service, workerName, "host-" + i % NO_OF_WORKER_HOSTS));
}
// wait for termination of submitted parallel threads
for (int i = 0; i < maxNumberOfParallellThreads; i++) {
final Future<Task> f = completionService.take();
f.get();
}
} finally {
executor.shutdownNow();
}
Thread.sleep(TaskCommand.MAX_WORKING_TIME + 1000); // to be sure that all workers have finished
assertScaleUpCounter(_service.getScaleUpCounters(), 0);
}
/**
* Assert the task counters for a specific workerName.
*
* @param counters
* the map of TaskCounters
* @param workerName
* the name of the workerName
* @param expectedTodo
* the expected number of todo tasks
* @param expectedInProgress
* the expected number of in progress tasks
* @throws Exception
* if any error occurs
*/
private void assertTaskCounter(final Map<String, TaskCounter> counters, final String workerName,
final int expectedTodo, final int expectedInProgress) throws Exception {
int todo = 0;
int inProgress = 0;
final TaskCounter counter = counters.get(workerName);
if (counter != null) {
todo = counter.getTasksTodo();
inProgress = counter.getTasksInProgress();
}
assertEquals(expectedTodo, todo);
assertEquals(expectedInProgress, inProgress);
}
/**
* Assert the scale up counters containing the number of tasks currently processed by worker host.
*
* @param expectedCount
* the expected count for each entry
*/
private void assertScaleUpCounter(final Map<String, Integer> counters, final Integer expectedCount)
throws Exception {
assertEquals("expecting entry for each worker host", NO_OF_WORKER_HOSTS, counters.size());
for (final Integer count : counters.values()) {
assertEquals(expectedCount, count);
}
}
/**
* @author stuc07
*/
private class TaskCommand implements Callable<Task> {
/** Maximum time a worker simulates to do some work. */
public static final int MAX_WORKING_TIME = 3000; // in milliseconds
/** Maximum time for threads to wait to receive a task. */
public static final int MAX_WAIT_TO_GET_TASK = 30000; // in milliseconds
/** Maximum time for threads to wait to commit a task. */
public static final int MAX_WAIT_TO_COMMIT_TASK = 10000; // in milliseconds
/** The TaskStorage to send to / receive from. */
private final TaskStorage _taskStorage;
/** The name of the task pipe. */
private final String _workerName;
/** The host where the worker is running. */
private final String _workerHost;
/**
* TaskCommand Constructor.
*/
public TaskCommand(final TaskStorage taskStorage, final String workerName, final String workerHost) {
_taskStorage = taskStorage;
_workerName = workerName;
_workerHost = workerHost;
}
/**
* check assertions for scale up control.
*/
private void assertScaleUpControl(final Task task) throws Exception {
assertEquals(_workerHost, task.getProperties().get(Task.PROPERTY_WORKER_HOST));
if (_workerHost != null) {
assertTrue(_taskStorage.getScaleUpCounters().get(_workerHost) <= MAX_NO_OF_TASKS_PER_WORKER_HOST);
}
}
/**
* {@inheritDoc}
*/
@Override
public Task call() throws Exception {
final Task newTask = TestZkTaskQueue.createTask(_taskIdGenerator.incrementAndGet(), _workerName);
_taskStorage.storeTask(newTask);
Task task = null;
long maxRuntime = System.currentTimeMillis() + MAX_WAIT_TO_GET_TASK;
while (task == null && System.currentTimeMillis() < maxRuntime) {
task = _taskStorage.getTask(_workerName, _workerHost, null);
if (task == null) {
Thread.sleep(RANDOMIZER.nextInt(1000)); // sleep some time before trying again
}
}
if (task != null) {
assertScaleUpControl(task);
// simulate some work
Thread.sleep(RANDOMIZER.nextInt(MAX_WORKING_TIME));
maxRuntime = System.currentTimeMillis() + MAX_WAIT_TO_COMMIT_TASK;
boolean committed = false;
while (!committed && System.currentTimeMillis() < maxRuntime) {
try {
_taskStorage.getInProgressTask(_workerName, task.getTaskId());
_taskStorage.deleteTask(_workerName, task.getTaskId());
committed = true;
} catch (final TaskmanagerException e) {
; // retry
}
}
}
return task;
}
}
}