/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.persistence.zk.test; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CompletionService; | |
import java.util.concurrent.ExecutorCompletionService; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.persistence.TaskCounter; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
import org.eclipse.smila.taskmanager.persistence.zk.TaskStorageZk; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
/** | |
* Test case to check multithreaded access to the TaskStorageJmsImpl. | |
*/ | |
public class TestTaskStorageZkMultithreaded extends DeclarativeServiceTestCase { | |
/** number of worker hosts needed for scale up control. */ | |
private static final int NO_OF_WORKER_HOSTS = 10; | |
/** Randomizer for random sleep values. */ | |
private static final Random RANDOMIZER = new Random(); | |
/** scale up control. */ | |
private static final long MAX_NO_OF_TASKS_PER_WORKER_HOST = 3; | |
/** pipename for tests. */ | |
private static final String PIPE_NAME = "testPipe"; | |
/** storage under test. */ | |
private TaskStorage _service; | |
private final AtomicInteger _taskIdGenerator = new AtomicInteger(); | |
/** | |
* clean up test pipe. | |
* | |
* @throws Exception | |
* getting service | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_service = getService(TaskStorage.class); | |
assertNotNull(_service); | |
assertTrue(_service instanceof TaskStorageZk); | |
_service.clear(); | |
((TaskStorageZk) _service).checkQualifierLockAge(0); | |
} | |
/** | |
* Test multithreaded send and receive. | |
*/ | |
public void testSendAndReceive() throws Exception { | |
final String workerName = PIPE_NAME; | |
assertTaskCounter(_service.getTaskCounters(), workerName, 0, 0); | |
final int maxNumberOfParallellThreads = 100; | |
final ExecutorService executor = Executors.newFixedThreadPool(maxNumberOfParallellThreads); | |
try { | |
final CompletionService<Task> completionService = new ExecutorCompletionService<Task>(executor); | |
// for all instances: execute 'command' | |
for (int i = 0; i < maxNumberOfParallellThreads; i++) { | |
completionService.submit(new TaskCommand(_service, workerName, null)); | |
} | |
// wait for termination of submitted parallel threads | |
for (int i = 0; i < maxNumberOfParallellThreads; i++) { | |
final Future<Task> f = completionService.take(); | |
final Task task = f.get(); | |
assertNotNull(task); | |
} | |
} finally { | |
executor.shutdownNow(); | |
} | |
Thread.sleep(TaskCommand.MAX_WORKING_TIME + 1000); // to be sure that all workers have finished | |
assertTaskCounter(_service.getTaskCounters(), workerName, 0, 0); | |
final Task task = _service.getTask(workerName, null, null); | |
assertNull(task); | |
} | |
/** | |
* Test multithreaded send and receive with scale up control. | |
*/ | |
public void testSendAndReceiveWithScaleUpControl() throws Exception { | |
final String workerName = PIPE_NAME; | |
assertTaskCounter(_service.getTaskCounters(), workerName, 0, 0); | |
_service.setMaxNoOfTasksPerHost(MAX_NO_OF_TASKS_PER_WORKER_HOST); | |
assertTrue("scale up counters should be empty but was: " + _service.getScaleUpCounters(), _service | |
.getScaleUpCounters().isEmpty()); | |
final int maxNumberOfParallellThreads = 100; | |
final ExecutorService executor = Executors.newFixedThreadPool(maxNumberOfParallellThreads); | |
try { | |
final CompletionService<Task> completionService = new ExecutorCompletionService<Task>(executor); | |
// for all instances: execute 'command' | |
for (int i = 0; i < maxNumberOfParallellThreads; i++) { | |
// simulate 10 worker hosts | |
completionService.submit(new TaskCommand(_service, workerName, "host-" + i % NO_OF_WORKER_HOSTS)); | |
} | |
// wait for termination of submitted parallel threads | |
for (int i = 0; i < maxNumberOfParallellThreads; i++) { | |
final Future<Task> f = completionService.take(); | |
f.get(); | |
} | |
} finally { | |
executor.shutdownNow(); | |
} | |
Thread.sleep(TaskCommand.MAX_WORKING_TIME + 1000); // to be sure that all workers have finished | |
assertScaleUpCounter(_service.getScaleUpCounters(), 0); | |
} | |
/** | |
* Assert the task counters for a specific workerName. | |
* | |
* @param counters | |
* the map of TaskCounters | |
* @param workerName | |
* the name of the workerName | |
* @param expectedTodo | |
* the expected number of todo tasks | |
* @param expectedInProgress | |
* the expected number of in progress tasks | |
* @throws Exception | |
* if any error occurs | |
*/ | |
private void assertTaskCounter(final Map<String, TaskCounter> counters, final String workerName, | |
final int expectedTodo, final int expectedInProgress) throws Exception { | |
int todo = 0; | |
int inProgress = 0; | |
final TaskCounter counter = counters.get(workerName); | |
if (counter != null) { | |
todo = counter.getTasksTodo(); | |
inProgress = counter.getTasksInProgress(); | |
} | |
assertEquals(expectedTodo, todo); | |
assertEquals(expectedInProgress, inProgress); | |
} | |
/** | |
* Assert the scale up counters containing the number of tasks currently processed by worker host. | |
* | |
* @param expectedCount | |
* the expected count for each entry | |
*/ | |
private void assertScaleUpCounter(final Map<String, Integer> counters, final Integer expectedCount) | |
throws Exception { | |
assertEquals("expecting entry for each worker host", NO_OF_WORKER_HOSTS, counters.size()); | |
for (final Integer count : counters.values()) { | |
assertEquals(expectedCount, count); | |
} | |
} | |
/** | |
* @author stuc07 | |
*/ | |
private class TaskCommand implements Callable<Task> { | |
/** Maximum time a worker simulates to do some work. */ | |
public static final int MAX_WORKING_TIME = 3000; // in milliseconds | |
/** Maximum time for threads to wait to receive a task. */ | |
public static final int MAX_WAIT_TO_GET_TASK = 30000; // in milliseconds | |
/** Maximum time for threads to wait to commit a task. */ | |
public static final int MAX_WAIT_TO_COMMIT_TASK = 10000; // in milliseconds | |
/** The TaskStorage to send to / receive from. */ | |
private final TaskStorage _taskStorage; | |
/** The name of the task pipe. */ | |
private final String _workerName; | |
/** The host where the worker is running. */ | |
private final String _workerHost; | |
/** | |
* TaskCommand Constructor. | |
*/ | |
public TaskCommand(final TaskStorage taskStorage, final String workerName, final String workerHost) { | |
_taskStorage = taskStorage; | |
_workerName = workerName; | |
_workerHost = workerHost; | |
} | |
/** | |
* check assertions for scale up control. | |
*/ | |
private void assertScaleUpControl(final Task task) throws Exception { | |
assertEquals(_workerHost, task.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
if (_workerHost != null) { | |
assertTrue(_taskStorage.getScaleUpCounters().get(_workerHost) <= MAX_NO_OF_TASKS_PER_WORKER_HOST); | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public Task call() throws Exception { | |
final Task newTask = TestZkTaskQueue.createTask(_taskIdGenerator.incrementAndGet(), _workerName); | |
_taskStorage.storeTask(newTask); | |
Task task = null; | |
long maxRuntime = System.currentTimeMillis() + MAX_WAIT_TO_GET_TASK; | |
while (task == null && System.currentTimeMillis() < maxRuntime) { | |
task = _taskStorage.getTask(_workerName, _workerHost, null); | |
if (task == null) { | |
Thread.sleep(RANDOMIZER.nextInt(1000)); // sleep some time before trying again | |
} | |
} | |
if (task != null) { | |
assertScaleUpControl(task); | |
// simulate some work | |
Thread.sleep(RANDOMIZER.nextInt(MAX_WORKING_TIME)); | |
maxRuntime = System.currentTimeMillis() + MAX_WAIT_TO_COMMIT_TASK; | |
boolean committed = false; | |
while (!committed && System.currentTimeMillis() < maxRuntime) { | |
try { | |
_taskStorage.getInProgressTask(_workerName, task.getTaskId()); | |
_taskStorage.deleteTask(_workerName, task.getTaskId()); | |
committed = true; | |
} catch (final TaskmanagerException e) { | |
; // retry | |
} | |
} | |
} | |
return task; | |
} | |
} | |
} |