/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.persistence.zk.test; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.Set; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
import org.eclipse.smila.taskmanager.persistence.zk.TaskStorageZk; | |
import org.eclipse.smila.taskmanager.persistence.zk.ZkTaskQueue; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
/** | |
* Test case for TaskStorageZk. | |
*/ | |
public class TestTaskStorageZk extends DeclarativeServiceTestCase { | |
/** pipename for tests. */ | |
private static final String PIPE_NAME = "testPipe"; | |
/** storage under test. */ | |
private TaskStorageZk _service; | |
/** | |
* clean up test pipe. | |
* | |
* @throws Exception | |
* getting service | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
final TaskStorage serviceRef = getService(TaskStorage.class); | |
assertNotNull(serviceRef); | |
assertTrue(serviceRef instanceof TaskStorageZk); | |
_service = (TaskStorageZk) serviceRef; | |
_service.purge(PIPE_NAME); | |
_service.checkQualifierLockAge(Long.MIN_VALUE); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
protected void tearDown() throws Exception { | |
_service.clear(); | |
super.tearDown(); | |
} | |
/** | |
* @throws Exception | |
* unexpected error | |
*/ | |
public void testService() throws Exception { | |
assertTrue(_service instanceof TaskStorageZk); | |
} | |
/** | |
* @throws Exception | |
* unexpected error | |
*/ | |
public void testSendAndReceive() throws Exception { | |
// send task | |
final Task task = TestZkTaskQueue.createTask(1, PIPE_NAME); | |
_service.storeTask(task); | |
assertTaskCounts(_service, PIPE_NAME, 1, 0); | |
// receive task | |
final Task newTask = _service.getTask(PIPE_NAME, null, null); | |
TestZkTaskQueue.assertEqualTasks(task, newTask); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
final Task ipTask = _service.getInProgressTask(PIPE_NAME, newTask.getTaskId()); | |
TestZkTaskQueue.assertEqualTasks(newTask, ipTask); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
// receive no further task | |
final Task nullTask = _service.getTask(PIPE_NAME, null, null); | |
assertNull(nullTask); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
// commit invalid taskId | |
// expect BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.taskId); | |
BadParameterTaskmanagerException taskIdEx = null; | |
try { | |
_service.deleteTask(PIPE_NAME, "<invalid taskId>"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
taskIdEx = ex; | |
} | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
assertNotNull("BadParameterTaskmanagerException expected for deleteTask(..., <invalid taskId>)", taskIdEx); | |
assertEquals("Code Cause.taskId expected for deleteTask(..., <invalid taskId>)", taskIdEx.getCauseCode(), | |
BadParameterTaskmanagerException.Cause.taskId); | |
// commit | |
_service.deleteTask(PIPE_NAME, newTask.getTaskId()); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
} | |
/** check handling of qualified tasks. */ | |
public void testQualifiedTasks() throws Exception { | |
List<String> qualifiers = Arrays.asList(new String[] { "a", "b", "c" }); | |
assertNull(_service.getTask(PIPE_NAME, null, qualifiers)); | |
Task task = TestZkTaskQueue.createTask(1, PIPE_NAME); | |
_service.storeTask(task); | |
assertNull(_service.getTask(PIPE_NAME, null, qualifiers)); | |
Task getTask = _service.getTask(PIPE_NAME, null, null); | |
TestZkTaskQueue.assertEqualTasks(task, getTask); | |
_service.deleteTask(PIPE_NAME, getTask.getTaskId()); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
assertNull(_service.getTask(PIPE_NAME, null, qualifiers)); | |
task = TestZkTaskQueue.createTask(2, PIPE_NAME); | |
task.setQualifier("a"); | |
_service.storeTask(task); | |
assertNull(_service.getTask(PIPE_NAME, null, null)); | |
getTask = _service.getTask(PIPE_NAME, null, qualifiers); | |
TestZkTaskQueue.assertEqualTasks(task, getTask); | |
_service.deleteTask(PIPE_NAME, getTask.getTaskId()); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
assertNull(_service.getTask(PIPE_NAME, null, qualifiers)); | |
task = TestZkTaskQueue.createTask(3, PIPE_NAME); | |
task.setQualifier("d"); | |
_service.storeTask(task); | |
assertNull(_service.getTask(PIPE_NAME, null, null)); | |
assertNull(_service.getTask(PIPE_NAME, null, qualifiers)); | |
qualifiers = Arrays.asList(new String[] { "c", "d", "e" }); | |
getTask = _service.getTask(PIPE_NAME, null, qualifiers); | |
TestZkTaskQueue.assertEqualTasks(task, getTask); | |
_service.deleteTask(PIPE_NAME, getTask.getTaskId()); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
} | |
/** | |
* @throws Exception | |
* unexpected error | |
*/ | |
public void testInitialTask() throws Exception { | |
// send task | |
final Task task = TestZkTaskQueue.createTask(1, PIPE_NAME); | |
_service.storeInProgressTask(task); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
// receive no further task | |
assertNull(_service.getTask(PIPE_NAME, null, null)); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
final Task ipTask = _service.getInProgressTask(PIPE_NAME, task.getTaskId()); | |
TestZkTaskQueue.assertEqualTasks(task, ipTask); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
// commit invalid taskId | |
// expect BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.taskId); | |
BadParameterTaskmanagerException taskIdEx = null; | |
try { | |
_service.deleteTask(PIPE_NAME, "<invalid taskId>"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
taskIdEx = ex; | |
} | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
assertNotNull("BadParameterTaskmanagerException expected for deleteTask(..., <invalid taskId>)", taskIdEx); | |
assertEquals("Code Cause.taskId expected for deleteTask(..., <invalid taskId>)", taskIdEx.getCauseCode(), | |
BadParameterTaskmanagerException.Cause.taskId); | |
// commit | |
_service.deleteTask(PIPE_NAME, task.getTaskId()); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
} | |
/** | |
* test with scale up control. | |
*/ | |
public void testSendAndReceiveWithScaleUpControl() throws Exception { | |
final int maxNoOfTasks = 3; | |
final String host1 = "host-1"; | |
final String host2 = "host-2"; | |
_service.setMaxNoOfTasksPerHost(maxNoOfTasks); | |
// create some tasks | |
Task task = TestZkTaskQueue.createTask(1, PIPE_NAME); | |
_service.storeTask(task); | |
task = TestZkTaskQueue.createTask(2, PIPE_NAME); | |
_service.storeTask(task); | |
task = TestZkTaskQueue.createTask(3, PIPE_NAME); | |
_service.storeTask(task); | |
task = TestZkTaskQueue.createTask(4, PIPE_NAME); | |
_service.storeTask(task); | |
task = TestZkTaskQueue.createTask(5, PIPE_NAME); | |
_service.storeTask(task); | |
// get task until limit (maxNoOfTasks) is reached | |
final Task getTask1 = _service.getTask(PIPE_NAME, host1, null); | |
assertNotNull(getTask1); | |
assertEquals(host1, getTask1.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
final Task getTask2 = _service.getTask(PIPE_NAME, host1, null); | |
assertNotNull(getTask2); | |
assertEquals(host1, getTask2.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
final Task getTask3 = _service.getTask(PIPE_NAME, host1, null); | |
assertNotNull(getTask3); | |
assertEquals(host1, getTask3.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
Task getTask4 = _service.getTask(PIPE_NAME, host1, null); | |
// reached limit, no more tasks for host-1 | |
assertNull(getTask4); | |
Map<String, Integer> counters = _service.getScaleUpCounters(); | |
assertEquals(3, counters.get(host1).intValue()); | |
// but host-2 should be able to get a task | |
getTask4 = _service.getTask(PIPE_NAME, host2, null); | |
assertNotNull(getTask4); | |
assertEquals(host2, getTask4.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
counters = _service.getScaleUpCounters(); | |
assertEquals(3, counters.get(host1).intValue()); | |
assertEquals(1, counters.get(host2).intValue()); | |
// finish one task | |
final Task markTask1 = _service.getInProgressTask(PIPE_NAME, getTask1.getTaskId()); | |
// workerHost property should be removed again ... | |
assertNull(markTask1.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
// ... so we adapt the task to compare | |
getTask1.getProperties().remove(Task.PROPERTY_WORKER_HOST); | |
TestZkTaskQueue.assertEqualTasks(getTask1, markTask1); | |
_service.deleteTask(PIPE_NAME, markTask1.getTaskId()); | |
counters = _service.getScaleUpCounters(); | |
assertEquals(2, counters.get(host1).intValue()); | |
assertEquals(1, counters.get(host2).intValue()); | |
// ... so now we should get another one for host-1 | |
final Task getTask5 = _service.getTask(PIPE_NAME, host1, null); | |
assertNotNull(getTask5); | |
assertEquals(host1, getTask5.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
counters = _service.getScaleUpCounters(); | |
assertEquals(3, counters.get(host1).intValue()); | |
assertEquals(1, counters.get(host2).intValue()); | |
} | |
/** | |
* Tests for remove operations. | |
*/ | |
public void testRemoveTasks() throws Exception { | |
// send task | |
Task task = TestZkTaskQueue.createTask(1, PIPE_NAME); | |
_service.storeTask(task); | |
task = TestZkTaskQueue.createTask(2, PIPE_NAME); | |
_service.storeTask(task); | |
task = TestZkTaskQueue.createTask(3, PIPE_NAME); | |
_service.storeTask(task); | |
assertTaskCounts(_service, PIPE_NAME, 3, 0); | |
// delete from todo list | |
final AnyMap matchingFilterMap = DataFactory.DEFAULT.createAnyMap(); | |
matchingFilterMap.put("taskId", "1"); | |
matchingFilterMap.put("worker", PIPE_NAME); | |
_service.removeTasks(matchingFilterMap); | |
assertTaskCounts(_service, PIPE_NAME, 2, 0); | |
// delete from in progress | |
_service.getTask(PIPE_NAME, null, null); | |
assertTaskCounts(_service, PIPE_NAME, 1, 1); | |
_service.getTask(PIPE_NAME, null, null); | |
assertTaskCounts(_service, PIPE_NAME, 0, 2); | |
matchingFilterMap.put("taskId", "2"); | |
_service.removeTasks(matchingFilterMap); | |
assertTaskCounts(_service, PIPE_NAME, 0, 1); | |
matchingFilterMap.put("taskId", "3"); | |
_service.removeTasks(matchingFilterMap); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
// delete from qualified | |
task = TestZkTaskQueue.createTask(4, PIPE_NAME); | |
task.setQualifier("test"); | |
_service.storeTask(task); | |
assertTaskCounts(_service, PIPE_NAME, 1, 0); | |
task = TestZkTaskQueue.createTask(5, PIPE_NAME); | |
task.setQualifier("test"); | |
_service.storeTask(task); | |
assertTaskCounts(_service, PIPE_NAME, 2, 0); | |
matchingFilterMap.put("taskId", "4"); | |
_service.removeTasks(matchingFilterMap); | |
assertTaskCounts(_service, PIPE_NAME, 1, 0); | |
// normally no task with the same id (hre 5) are not possible | |
task = TestZkTaskQueue.createTask(5, PIPE_NAME); | |
task.setQualifier("testNew"); | |
_service.storeTask(task); | |
assertTaskCounts(_service, PIPE_NAME, 2, 0); | |
matchingFilterMap.put("taskId", "5"); | |
_service.removeTasks(matchingFilterMap); | |
assertTaskCounts(_service, PIPE_NAME, 0, 0); | |
} | |
/** test qualifier lock handling. */ | |
public void testQualifierLocks() throws Exception { | |
final String worker1 = "worker1"; | |
final String worker2 = "worker2"; | |
final String qualifier1 = "qualifier1"; | |
final String qualifier2 = "qualifier2"; | |
assertFalse(_service.isLockedQualifier(worker1, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker1, qualifier2)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier2)); | |
_service.lockQualifiers(worker1, Arrays.asList(qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker1, qualifier2)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier2)); | |
_service.lockQualifiers(worker1, Arrays.asList(qualifier2)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier2)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier2)); | |
_service.lockQualifiers(worker2, Arrays.asList(qualifier2)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier2)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker2, qualifier2)); | |
Thread.sleep(100); | |
_service.lockQualifiers(worker2, Arrays.asList(qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker1, qualifier2)); | |
assertTrue(_service.isLockedQualifier(worker2, qualifier1)); | |
assertTrue(_service.isLockedQualifier(worker2, qualifier2)); | |
_service.checkQualifierLockAge(90); | |
assertFalse(_service.isLockedQualifier(worker1, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker1, qualifier2)); | |
assertTrue(_service.isLockedQualifier(worker2, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier2)); | |
Thread.sleep(100); | |
_service.checkQualifierLockAge(10); | |
assertFalse(_service.isLockedQualifier(worker1, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker1, qualifier2)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier1)); | |
assertFalse(_service.isLockedQualifier(worker2, qualifier2)); | |
} | |
/** testing filterDuplicates() with more threads, ensure no unique task is lost. */ | |
public void testFilterDuplicatesMassiveParallelCheck() throws Exception { | |
final String workerName = "parallelDuplicateWorker"; | |
final int noOfTasks = 10; | |
final int noOfParallelChecks = 40; | |
final AtomicInteger noOfExceptions = new AtomicInteger(0); | |
final ExecutorService exec = Executors.newFixedThreadPool(noOfParallelChecks); | |
final CountDownLatch latch = new CountDownLatch(1); | |
for (int checkNo = 0; checkNo < noOfParallelChecks; checkNo++) { | |
final int offset = checkNo; | |
exec.submit(new Runnable() { | |
@Override | |
public void run() { | |
final Random rand = new Random(System.nanoTime()); | |
try { | |
latch.await(); | |
// create and put some delayed tasks in the queues | |
final List<Task> taskList = new ArrayList<>(); | |
for (int i = 0; i < noOfTasks; i++) { | |
Thread.sleep(rand.nextInt(50)); | |
final Task t1 = TestZkTaskQueue.createTask(offset * noOfTasks + i, workerName); | |
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i); | |
// this adds a bit more pressure, since the tasks stay in that list and will be checked again | |
taskList.add(t1); | |
for (final Task uniqueTask : _service.filterDuplicates(taskList)) { | |
_service.storeTask(workerName, uniqueTask); | |
} | |
} | |
} catch (final InterruptedException | TaskmanagerException e) { | |
e.printStackTrace(); | |
noOfExceptions.getAndIncrement(); | |
} | |
} | |
}); | |
} | |
latch.countDown(); | |
exec.shutdown(); | |
exec.awaitTermination(1, TimeUnit.MINUTES); | |
// no exceptions | |
assertEquals(0, noOfExceptions.get()); | |
// now check that each unique id is present | |
final Set<String> taskUniqueIds = new HashSet<>(); | |
Task task = _service.getTask(workerName, null, null); | |
while (task != null) { | |
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)); | |
task = _service.getTask(workerName, null, null); | |
} | |
assertEquals(noOfTasks, taskUniqueIds.size()); | |
} | |
/** testing filterDuplicates(). */ | |
public void testFilterDuplicates() throws Exception { | |
final String workerName = "duplicateWorker"; | |
final int noOfTasks = 10; | |
final int noOfIterations = 3; | |
for (int iteration = 0; iteration < noOfIterations; iteration++) { | |
// create and put some delayed tasks in the queues | |
for (int i = 1; i <= noOfTasks; i++) { | |
final Task t1 = TestZkTaskQueue.createTask(iteration * noOfTasks + i, workerName); | |
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i); | |
for (final Task uniqueTask : _service.filterDuplicates(Arrays.asList(t1))) { | |
_service.storeTask(workerName, uniqueTask); | |
} | |
} | |
} | |
// check that only one set of tasks has been created... | |
assertEquals(noOfTasks, _service.getTaskList(workerName, "todo", noOfIterations * noOfTasks).getSize()); | |
// now check that each unique id is present | |
final Set<String> taskUniqueIds = new HashSet<>(); | |
Task task = _service.getTask(workerName, null, null); | |
while (task != null) { | |
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)); | |
task = _service.getTask(workerName, null, null); | |
} | |
assertEquals(noOfTasks, taskUniqueIds.size()); | |
} | |
/** testing filterDuplicates() that only duplicates will be filtered for "todo" tasks. */ | |
public void testFilterDuplicatesOnlyToDo() throws Exception { | |
final String workerName = "duplicateWorker"; | |
final int noOfTasks = 10; | |
final int noOfIterations = 3; | |
for (int iteration = 0; iteration < noOfIterations; iteration++) { | |
// create and put some delayed tasks in the queues | |
for (int i = 1; i <= noOfTasks; i++) { | |
final Task t1 = TestZkTaskQueue.createTask(iteration * noOfTasks + i, workerName); | |
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i); | |
for (final Task uniqueTask : _service.filterDuplicates(Arrays.asList(t1))) { | |
_service.storeTask(workerName, uniqueTask); | |
} | |
} | |
// put each task in progress | |
// now check that each unique id is present | |
final Set<String> taskUniqueIds = new HashSet<>(); | |
Task task = _service.getTask(workerName, null, null); | |
while (task != null) { | |
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)); | |
_service.storeInProgressTask(task); | |
task = _service.getTask(workerName, null, null); | |
} | |
assertEquals(noOfTasks, taskUniqueIds.size()); | |
} | |
// check that only one set of tasks has been created for each run... | |
assertEquals(noOfIterations * noOfTasks, | |
_service.getTaskList(workerName, ZkTaskQueue.SECTION_INPROGRESS, noOfIterations * noOfTasks).getSize()); | |
// no tasks in todo | |
assertEquals(0, _service.getTaskList(workerName, ZkTaskQueue.SECTION_TODO, noOfIterations * noOfTasks) | |
.getSize()); | |
} | |
/** | |
* Asserts the correct task counts in todo and in progress. | |
* | |
* @param service | |
* the TaskStorage | |
* @param workerName | |
* name of the worker | |
* @param expectedTodo | |
* expected todo count | |
* @param expectedInProgress | |
* expected in progress count | |
* @throws Exception | |
* if any error occurs | |
*/ | |
private void assertTaskCounts(final TaskStorage service, final String workerName, final int expectedTodo, | |
final int expectedInProgress) throws Exception { | |
final Map<String, TaskCounter> counters = service.getTaskCounters(); | |
assertNotNull(counters); | |
assertTrue(counters.containsKey(workerName)); | |
assertEquals(workerName, counters.get(workerName).getWorkerName()); | |
assertEquals(expectedTodo, counters.get(workerName).getTasksTodo()); | |
assertEquals(expectedInProgress, counters.get(workerName).getTasksInProgress()); | |
} | |
/** check writing and reading of TS configuration. */ | |
public void testConfigurationHandling() throws Exception { | |
AnyMap config = DataFactory.DEFAULT.createAnyMap(); | |
config.put("configKey0", 0); | |
config.put("configKey1", 1); | |
config.put("maxScaleUp", 8); | |
_service.storeConfiguration(config); | |
final AnyMap readConfig = _service.readConfiguration(); | |
assertEquals(8, readConfig.getLongValue("maxScaleUp").longValue()); | |
assertTrue(readConfig.containsKey("configKey0")); | |
assertTrue(readConfig.containsKey("configKey1")); | |
} | |
} |