blob: 5007fb1d4d7e0730225e3de89d678edc9141c9ed [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.eclipse.smila.taskmanager.persistence.zk.TaskStorageZk;
import org.eclipse.smila.taskmanager.persistence.zk.ZkTaskQueue;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/**
* Test case for TaskStorageZk.
*/
public class TestTaskStorageZk extends DeclarativeServiceTestCase {
/** pipename for tests. */
private static final String PIPE_NAME = "testPipe";
/** storage under test. */
private TaskStorageZk _service;
/**
* clean up test pipe.
*
* @throws Exception
* getting service
*/
@Override
protected void setUp() throws Exception {
super.setUp();
final TaskStorage serviceRef = getService(TaskStorage.class);
assertNotNull(serviceRef);
assertTrue(serviceRef instanceof TaskStorageZk);
_service = (TaskStorageZk) serviceRef;
_service.purge(PIPE_NAME);
_service.checkQualifierLockAge(Long.MIN_VALUE);
}
/** {@inheritDoc} */
@Override
protected void tearDown() throws Exception {
_service.clear();
super.tearDown();
}
/**
* @throws Exception
* unexpected error
*/
public void testService() throws Exception {
assertTrue(_service instanceof TaskStorageZk);
}
/**
* @throws Exception
* unexpected error
*/
public void testSendAndReceive() throws Exception {
// send task
final Task task = TestZkTaskQueue.createTask(1, PIPE_NAME);
_service.storeTask(task);
assertTaskCounts(_service, PIPE_NAME, 1, 0);
// receive task
final Task newTask = _service.getTask(PIPE_NAME, null, null);
TestZkTaskQueue.assertEqualTasks(task, newTask);
assertTaskCounts(_service, PIPE_NAME, 0, 1);
final Task ipTask = _service.getInProgressTask(PIPE_NAME, newTask.getTaskId());
TestZkTaskQueue.assertEqualTasks(newTask, ipTask);
assertTaskCounts(_service, PIPE_NAME, 0, 1);
// receive no further task
final Task nullTask = _service.getTask(PIPE_NAME, null, null);
assertNull(nullTask);
assertTaskCounts(_service, PIPE_NAME, 0, 1);
// commit invalid taskId
// expect BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.taskId);
BadParameterTaskmanagerException taskIdEx = null;
try {
_service.deleteTask(PIPE_NAME, "<invalid taskId>");
} catch (final BadParameterTaskmanagerException ex) {
taskIdEx = ex;
}
assertTaskCounts(_service, PIPE_NAME, 0, 1);
assertNotNull("BadParameterTaskmanagerException expected for deleteTask(..., <invalid taskId>)", taskIdEx);
assertEquals("Code Cause.taskId expected for deleteTask(..., <invalid taskId>)", taskIdEx.getCauseCode(),
BadParameterTaskmanagerException.Cause.taskId);
// commit
_service.deleteTask(PIPE_NAME, newTask.getTaskId());
assertTaskCounts(_service, PIPE_NAME, 0, 0);
}
/** check handling of qualified tasks. */
public void testQualifiedTasks() throws Exception {
List<String> qualifiers = Arrays.asList(new String[] { "a", "b", "c" });
assertNull(_service.getTask(PIPE_NAME, null, qualifiers));
Task task = TestZkTaskQueue.createTask(1, PIPE_NAME);
_service.storeTask(task);
assertNull(_service.getTask(PIPE_NAME, null, qualifiers));
Task getTask = _service.getTask(PIPE_NAME, null, null);
TestZkTaskQueue.assertEqualTasks(task, getTask);
_service.deleteTask(PIPE_NAME, getTask.getTaskId());
assertTaskCounts(_service, PIPE_NAME, 0, 0);
assertNull(_service.getTask(PIPE_NAME, null, qualifiers));
task = TestZkTaskQueue.createTask(2, PIPE_NAME);
task.setQualifier("a");
_service.storeTask(task);
assertNull(_service.getTask(PIPE_NAME, null, null));
getTask = _service.getTask(PIPE_NAME, null, qualifiers);
TestZkTaskQueue.assertEqualTasks(task, getTask);
_service.deleteTask(PIPE_NAME, getTask.getTaskId());
assertTaskCounts(_service, PIPE_NAME, 0, 0);
assertNull(_service.getTask(PIPE_NAME, null, qualifiers));
task = TestZkTaskQueue.createTask(3, PIPE_NAME);
task.setQualifier("d");
_service.storeTask(task);
assertNull(_service.getTask(PIPE_NAME, null, null));
assertNull(_service.getTask(PIPE_NAME, null, qualifiers));
qualifiers = Arrays.asList(new String[] { "c", "d", "e" });
getTask = _service.getTask(PIPE_NAME, null, qualifiers);
TestZkTaskQueue.assertEqualTasks(task, getTask);
_service.deleteTask(PIPE_NAME, getTask.getTaskId());
assertTaskCounts(_service, PIPE_NAME, 0, 0);
}
/**
* @throws Exception
* unexpected error
*/
public void testInitialTask() throws Exception {
// send task
final Task task = TestZkTaskQueue.createTask(1, PIPE_NAME);
_service.storeInProgressTask(task);
assertTaskCounts(_service, PIPE_NAME, 0, 1);
// receive no further task
assertNull(_service.getTask(PIPE_NAME, null, null));
assertTaskCounts(_service, PIPE_NAME, 0, 1);
final Task ipTask = _service.getInProgressTask(PIPE_NAME, task.getTaskId());
TestZkTaskQueue.assertEqualTasks(task, ipTask);
assertTaskCounts(_service, PIPE_NAME, 0, 1);
// commit invalid taskId
// expect BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.taskId);
BadParameterTaskmanagerException taskIdEx = null;
try {
_service.deleteTask(PIPE_NAME, "<invalid taskId>");
} catch (final BadParameterTaskmanagerException ex) {
taskIdEx = ex;
}
assertTaskCounts(_service, PIPE_NAME, 0, 1);
assertNotNull("BadParameterTaskmanagerException expected for deleteTask(..., <invalid taskId>)", taskIdEx);
assertEquals("Code Cause.taskId expected for deleteTask(..., <invalid taskId>)", taskIdEx.getCauseCode(),
BadParameterTaskmanagerException.Cause.taskId);
// commit
_service.deleteTask(PIPE_NAME, task.getTaskId());
assertTaskCounts(_service, PIPE_NAME, 0, 0);
}
/**
* test with scale up control.
*/
public void testSendAndReceiveWithScaleUpControl() throws Exception {
final int maxNoOfTasks = 3;
final String host1 = "host-1";
final String host2 = "host-2";
_service.setMaxNoOfTasksPerHost(maxNoOfTasks);
// create some tasks
Task task = TestZkTaskQueue.createTask(1, PIPE_NAME);
_service.storeTask(task);
task = TestZkTaskQueue.createTask(2, PIPE_NAME);
_service.storeTask(task);
task = TestZkTaskQueue.createTask(3, PIPE_NAME);
_service.storeTask(task);
task = TestZkTaskQueue.createTask(4, PIPE_NAME);
_service.storeTask(task);
task = TestZkTaskQueue.createTask(5, PIPE_NAME);
_service.storeTask(task);
// get task until limit (maxNoOfTasks) is reached
final Task getTask1 = _service.getTask(PIPE_NAME, host1, null);
assertNotNull(getTask1);
assertEquals(host1, getTask1.getProperties().get(Task.PROPERTY_WORKER_HOST));
final Task getTask2 = _service.getTask(PIPE_NAME, host1, null);
assertNotNull(getTask2);
assertEquals(host1, getTask2.getProperties().get(Task.PROPERTY_WORKER_HOST));
final Task getTask3 = _service.getTask(PIPE_NAME, host1, null);
assertNotNull(getTask3);
assertEquals(host1, getTask3.getProperties().get(Task.PROPERTY_WORKER_HOST));
Task getTask4 = _service.getTask(PIPE_NAME, host1, null);
// reached limit, no more tasks for host-1
assertNull(getTask4);
Map<String, Integer> counters = _service.getScaleUpCounters();
assertEquals(3, counters.get(host1).intValue());
// but host-2 should be able to get a task
getTask4 = _service.getTask(PIPE_NAME, host2, null);
assertNotNull(getTask4);
assertEquals(host2, getTask4.getProperties().get(Task.PROPERTY_WORKER_HOST));
counters = _service.getScaleUpCounters();
assertEquals(3, counters.get(host1).intValue());
assertEquals(1, counters.get(host2).intValue());
// finish one task
final Task markTask1 = _service.getInProgressTask(PIPE_NAME, getTask1.getTaskId());
// workerHost property should be removed again ...
assertNull(markTask1.getProperties().get(Task.PROPERTY_WORKER_HOST));
// ... so we adapt the task to compare
getTask1.getProperties().remove(Task.PROPERTY_WORKER_HOST);
TestZkTaskQueue.assertEqualTasks(getTask1, markTask1);
_service.deleteTask(PIPE_NAME, markTask1.getTaskId());
counters = _service.getScaleUpCounters();
assertEquals(2, counters.get(host1).intValue());
assertEquals(1, counters.get(host2).intValue());
// ... so now we should get another one for host-1
final Task getTask5 = _service.getTask(PIPE_NAME, host1, null);
assertNotNull(getTask5);
assertEquals(host1, getTask5.getProperties().get(Task.PROPERTY_WORKER_HOST));
counters = _service.getScaleUpCounters();
assertEquals(3, counters.get(host1).intValue());
assertEquals(1, counters.get(host2).intValue());
}
/**
* Tests for remove operations.
*/
public void testRemoveTasks() throws Exception {
// send task
Task task = TestZkTaskQueue.createTask(1, PIPE_NAME);
_service.storeTask(task);
task = TestZkTaskQueue.createTask(2, PIPE_NAME);
_service.storeTask(task);
task = TestZkTaskQueue.createTask(3, PIPE_NAME);
_service.storeTask(task);
assertTaskCounts(_service, PIPE_NAME, 3, 0);
// delete from todo list
final AnyMap matchingFilterMap = DataFactory.DEFAULT.createAnyMap();
matchingFilterMap.put("taskId", "1");
matchingFilterMap.put("worker", PIPE_NAME);
_service.removeTasks(matchingFilterMap);
assertTaskCounts(_service, PIPE_NAME, 2, 0);
// delete from in progress
_service.getTask(PIPE_NAME, null, null);
assertTaskCounts(_service, PIPE_NAME, 1, 1);
_service.getTask(PIPE_NAME, null, null);
assertTaskCounts(_service, PIPE_NAME, 0, 2);
matchingFilterMap.put("taskId", "2");
_service.removeTasks(matchingFilterMap);
assertTaskCounts(_service, PIPE_NAME, 0, 1);
matchingFilterMap.put("taskId", "3");
_service.removeTasks(matchingFilterMap);
assertTaskCounts(_service, PIPE_NAME, 0, 0);
// delete from qualified
task = TestZkTaskQueue.createTask(4, PIPE_NAME);
task.setQualifier("test");
_service.storeTask(task);
assertTaskCounts(_service, PIPE_NAME, 1, 0);
task = TestZkTaskQueue.createTask(5, PIPE_NAME);
task.setQualifier("test");
_service.storeTask(task);
assertTaskCounts(_service, PIPE_NAME, 2, 0);
matchingFilterMap.put("taskId", "4");
_service.removeTasks(matchingFilterMap);
assertTaskCounts(_service, PIPE_NAME, 1, 0);
// normally no task with the same id (hre 5) are not possible
task = TestZkTaskQueue.createTask(5, PIPE_NAME);
task.setQualifier("testNew");
_service.storeTask(task);
assertTaskCounts(_service, PIPE_NAME, 2, 0);
matchingFilterMap.put("taskId", "5");
_service.removeTasks(matchingFilterMap);
assertTaskCounts(_service, PIPE_NAME, 0, 0);
}
/** test qualifier lock handling. */
public void testQualifierLocks() throws Exception {
final String worker1 = "worker1";
final String worker2 = "worker2";
final String qualifier1 = "qualifier1";
final String qualifier2 = "qualifier2";
assertFalse(_service.isLockedQualifier(worker1, qualifier1));
assertFalse(_service.isLockedQualifier(worker1, qualifier2));
assertFalse(_service.isLockedQualifier(worker2, qualifier1));
assertFalse(_service.isLockedQualifier(worker2, qualifier2));
_service.lockQualifiers(worker1, Arrays.asList(qualifier1));
assertTrue(_service.isLockedQualifier(worker1, qualifier1));
assertFalse(_service.isLockedQualifier(worker1, qualifier2));
assertFalse(_service.isLockedQualifier(worker2, qualifier1));
assertFalse(_service.isLockedQualifier(worker2, qualifier2));
_service.lockQualifiers(worker1, Arrays.asList(qualifier2));
assertTrue(_service.isLockedQualifier(worker1, qualifier1));
assertTrue(_service.isLockedQualifier(worker1, qualifier2));
assertFalse(_service.isLockedQualifier(worker2, qualifier1));
assertFalse(_service.isLockedQualifier(worker2, qualifier2));
_service.lockQualifiers(worker2, Arrays.asList(qualifier2));
assertTrue(_service.isLockedQualifier(worker1, qualifier1));
assertTrue(_service.isLockedQualifier(worker1, qualifier2));
assertFalse(_service.isLockedQualifier(worker2, qualifier1));
assertTrue(_service.isLockedQualifier(worker2, qualifier2));
Thread.sleep(100);
_service.lockQualifiers(worker2, Arrays.asList(qualifier1));
assertTrue(_service.isLockedQualifier(worker1, qualifier1));
assertTrue(_service.isLockedQualifier(worker1, qualifier2));
assertTrue(_service.isLockedQualifier(worker2, qualifier1));
assertTrue(_service.isLockedQualifier(worker2, qualifier2));
_service.checkQualifierLockAge(90);
assertFalse(_service.isLockedQualifier(worker1, qualifier1));
assertFalse(_service.isLockedQualifier(worker1, qualifier2));
assertTrue(_service.isLockedQualifier(worker2, qualifier1));
assertFalse(_service.isLockedQualifier(worker2, qualifier2));
Thread.sleep(100);
_service.checkQualifierLockAge(10);
assertFalse(_service.isLockedQualifier(worker1, qualifier1));
assertFalse(_service.isLockedQualifier(worker1, qualifier2));
assertFalse(_service.isLockedQualifier(worker2, qualifier1));
assertFalse(_service.isLockedQualifier(worker2, qualifier2));
}
/** testing filterDuplicates() with more threads, ensure no unique task is lost. */
public void testFilterDuplicatesMassiveParallelCheck() throws Exception {
final String workerName = "parallelDuplicateWorker";
final int noOfTasks = 10;
final int noOfParallelChecks = 40;
final AtomicInteger noOfExceptions = new AtomicInteger(0);
final ExecutorService exec = Executors.newFixedThreadPool(noOfParallelChecks);
final CountDownLatch latch = new CountDownLatch(1);
for (int checkNo = 0; checkNo < noOfParallelChecks; checkNo++) {
final int offset = checkNo;
exec.submit(new Runnable() {
@Override
public void run() {
final Random rand = new Random(System.nanoTime());
try {
latch.await();
// create and put some delayed tasks in the queues
final List<Task> taskList = new ArrayList<>();
for (int i = 0; i < noOfTasks; i++) {
Thread.sleep(rand.nextInt(50));
final Task t1 = TestZkTaskQueue.createTask(offset * noOfTasks + i, workerName);
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i);
// this adds a bit more pressure, since the tasks stay in that list and will be checked again
taskList.add(t1);
for (final Task uniqueTask : _service.filterDuplicates(taskList)) {
_service.storeTask(workerName, uniqueTask);
}
}
} catch (final InterruptedException | TaskmanagerException e) {
e.printStackTrace();
noOfExceptions.getAndIncrement();
}
}
});
}
latch.countDown();
exec.shutdown();
exec.awaitTermination(1, TimeUnit.MINUTES);
// no exceptions
assertEquals(0, noOfExceptions.get());
// now check that each unique id is present
final Set<String> taskUniqueIds = new HashSet<>();
Task task = _service.getTask(workerName, null, null);
while (task != null) {
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG));
task = _service.getTask(workerName, null, null);
}
assertEquals(noOfTasks, taskUniqueIds.size());
}
/** testing filterDuplicates(). */
public void testFilterDuplicates() throws Exception {
final String workerName = "duplicateWorker";
final int noOfTasks = 10;
final int noOfIterations = 3;
for (int iteration = 0; iteration < noOfIterations; iteration++) {
// create and put some delayed tasks in the queues
for (int i = 1; i <= noOfTasks; i++) {
final Task t1 = TestZkTaskQueue.createTask(iteration * noOfTasks + i, workerName);
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i);
for (final Task uniqueTask : _service.filterDuplicates(Arrays.asList(t1))) {
_service.storeTask(workerName, uniqueTask);
}
}
}
// check that only one set of tasks has been created...
assertEquals(noOfTasks, _service.getTaskList(workerName, "todo", noOfIterations * noOfTasks).getSize());
// now check that each unique id is present
final Set<String> taskUniqueIds = new HashSet<>();
Task task = _service.getTask(workerName, null, null);
while (task != null) {
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG));
task = _service.getTask(workerName, null, null);
}
assertEquals(noOfTasks, taskUniqueIds.size());
}
/** testing filterDuplicates() that only duplicates will be filtered for "todo" tasks. */
public void testFilterDuplicatesOnlyToDo() throws Exception {
final String workerName = "duplicateWorker";
final int noOfTasks = 10;
final int noOfIterations = 3;
for (int iteration = 0; iteration < noOfIterations; iteration++) {
// create and put some delayed tasks in the queues
for (int i = 1; i <= noOfTasks; i++) {
final Task t1 = TestZkTaskQueue.createTask(iteration * noOfTasks + i, workerName);
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i);
for (final Task uniqueTask : _service.filterDuplicates(Arrays.asList(t1))) {
_service.storeTask(workerName, uniqueTask);
}
}
// put each task in progress
// now check that each unique id is present
final Set<String> taskUniqueIds = new HashSet<>();
Task task = _service.getTask(workerName, null, null);
while (task != null) {
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG));
_service.storeInProgressTask(task);
task = _service.getTask(workerName, null, null);
}
assertEquals(noOfTasks, taskUniqueIds.size());
}
// check that only one set of tasks has been created for each run...
assertEquals(noOfIterations * noOfTasks,
_service.getTaskList(workerName, ZkTaskQueue.SECTION_INPROGRESS, noOfIterations * noOfTasks).getSize());
// no tasks in todo
assertEquals(0, _service.getTaskList(workerName, ZkTaskQueue.SECTION_TODO, noOfIterations * noOfTasks)
.getSize());
}
/**
* Asserts the correct task counts in todo and in progress.
*
* @param service
* the TaskStorage
* @param workerName
* name of the worker
* @param expectedTodo
* expected todo count
* @param expectedInProgress
* expected in progress count
* @throws Exception
* if any error occurs
*/
private void assertTaskCounts(final TaskStorage service, final String workerName, final int expectedTodo,
final int expectedInProgress) throws Exception {
final Map<String, TaskCounter> counters = service.getTaskCounters();
assertNotNull(counters);
assertTrue(counters.containsKey(workerName));
assertEquals(workerName, counters.get(workerName).getWorkerName());
assertEquals(expectedTodo, counters.get(workerName).getTasksTodo());
assertEquals(expectedInProgress, counters.get(workerName).getTasksInProgress());
}
/** check writing and reading of TS configuration. */
public void testConfigurationHandling() throws Exception {
AnyMap config = DataFactory.DEFAULT.createAnyMap();
config.put("configKey0", 0);
config.put("configKey1", 1);
config.put("maxScaleUp", 8);
_service.storeConfiguration(config);
final AnyMap readConfig = _service.readConfiguration();
assertEquals(8, readConfig.getLongValue("maxScaleUp").longValue());
assertTrue(readConfig.containsKey("configKey0"));
assertTrue(readConfig.containsKey("configKey1"));
}
}