blob: 57d84bbc223aca640633d2ad8eda59e2b6d25209 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException.Cause;
import org.eclipse.smila.taskmanager.BulkInfo;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskList;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.zk.TaskStorageZk;
import org.eclipse.smila.taskmanager.persistence.zk.ZkTaskQueue;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZooKeeperService;
/**
* test basic functionality of ZkTaskQueue.
*/
public class TestZkTaskQueue extends DeclarativeServiceTestCase {
/** ZooKeeper service. */
private ZooKeeperService _zk;
/**
* test if ZooKeeper service was successfully started and registered.
*
* @throws Exception
* no service found.
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_zk = getService(ZooKeeperService.class);
assertNotNull(_zk);
}
/**
* @throws Exception
* test fails
*/
public void testBasic() throws Exception {
final String queueName = "testBasic";
final ZkTaskQueue queue = createQueue(queueName);
final Task putTask = createTask(1, queueName);
queue.put(putTask);
assertNull(queue.get(Collections.singleton("partition"), null));
final Task getTask = queue.get(null, null);
assertNotNull(getTask);
assertEqualTasks(putTask, getTask);
final Task ipTask = queue.getInProgressTask(getTask.getTaskId());
assertNotNull(ipTask);
assertEqualTasks(putTask, ipTask);
queue.delete(getTask.getTaskId());
assertNull(queue.get(null, null));
}
/**
* @throws Exception
* test fails
*/
public void testPutInProgress() throws Exception {
final String queueName = "testPutInProgress";
final ZkTaskQueue queue = createQueue(queueName);
final Task putTask = createTask(1, queueName);
queue.putInProgress(putTask);
assertNull(queue.get(Collections.singleton("partition"), null));
assertNull(queue.get(null, null));
final Task ipTask = queue.getInProgressTask(putTask.getTaskId());
assertNotNull(ipTask);
assertEqualTasks(putTask, ipTask);
queue.delete(ipTask.getTaskId());
assertNull(queue.get(null, null));
}
/**
* @throws Exception
* test fails
*/
public void testCounter() throws Exception {
final String queueName = "testCounter";
final ZkTaskQueue queue = createQueue(queueName);
final Task putTask = createTask(1, queueName);
queue.put(putTask);
TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, 1, 0);
final Task getTask = queue.get(null, null);
assertNotNull(getTask);
assertEqualTasks(putTask, getTask);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 1);
queue.getInProgressTask(getTask.getTaskId());
queue.delete(getTask.getTaskId());
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
assertNull(queue.get(null, null));
}
/**
* @throws Exception
* test fails
*/
public void testKeepAlive() throws Exception {
final String queueName = "testKeepAlive";
final ZkTaskQueue queue = createQueue(queueName);
final Task putTask = createTask(1, queueName);
queue.put(putTask);
assertNull(queue.get(Collections.singleton("partition"), null));
final Task getTask = queue.get(null, null);
assertNotNull(getTask);
assertEqualTasks(putTask, getTask);
queue.keepAlive(getTask.getTaskId());
queue.getInProgressTask(getTask.getTaskId());
queue.delete(getTask.getTaskId());
assertNull(queue.get(null, null));
try {
queue.keepAlive(getTask.getTaskId());
fail("should not work");
} catch (final Exception ex) {
assertTrue("wrong exception", ex instanceof TaskmanagerException);
}
}
/**
* check that task finishing can be retried in a new session.
*
* @throws Exception
*/
public void testRetryTaskFinishing() throws Exception {
final String queueName = "testDeleteWithoutFinishingError";
final ZkTaskQueue queue = createQueue(queueName);
final Task putTask = createTask(1, queueName);
queue.put(putTask);
final Task getTask = queue.get(null, null);
assertNotNull(getTask);
assertEqualTasks(putTask, getTask);
assertNotNull(queue.getInProgressTask(putTask.getTaskId()));
queue.disconnectZkSession();
assertNull(queue.get(null, null));
assertNotNull(queue.getInProgressTask(putTask.getTaskId()));
queue.delete(putTask.getTaskId());
assertNull(queue.get(null, null));
}
/**
* @throws Exception
* test fails
*/
public void testTaskOrdering() throws Exception {
final String queueName = "testTaskOrdering";
final ZkTaskQueue queue = createQueue(queueName);
final int noOfTasks = 16;
final Task[] putTasks = new Task[noOfTasks];
for (int i = 0; i < noOfTasks; ++i) {
putTasks[i] = createTask(i, queueName);
queue.put(putTasks[i]);
final TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, i + 1, 0);
}
for (int i = 0; i < noOfTasks; ++i) {
assertNull(queue.get(Collections.singleton("partition"), null));
final Task getTask = queue.get(null, null);
assertNotNull(getTask);
assertEqualTasks(putTasks[i], getTask);
TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks - (i + 1), 1);
queue.getInProgressTask(getTask.getTaskId());
queue.delete(getTask.getTaskId());
counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks - (i + 1), 0);
}
assertNull(queue.get(null, null));
}
/**
* @throws Exception
* test fails
*/
public void testQualifiedQueue() throws Exception {
final String queueName = "testQualifiedQueue";
final ZkTaskQueue queue = createQueue(queueName);
assertFalse(queue.hasQualifiedTasks());
final int noOfPartitions = 4;
final int noOfTasksPerPartition = 4;
final int noOfTasks = noOfPartitions * noOfTasksPerPartition;
final Task[] putTasks = new Task[noOfTasks];
for (int i = 0; i < noOfTasks; ++i) {
putTasks[i] = createTask(i, queueName);
final int partition = i % noOfPartitions;
putTasks[i].setQualifier("parts/" + Integer.toString(partition));
queue.put(putTasks[i]);
final TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, i + 1, 0);
}
assertTrue(queue.hasQualifiedTasks());
for (int i = 0; i < noOfTasks; ++i) {
assertNull(queue.get(null, null));
assertNull(queue.get(Collections.singleton("parts/partition"), null));
final int partition = i % noOfPartitions;
final Task getTask = queue.get(Collections.singleton("parts/" + Integer.toString(partition)), null);
assertNotNull(getTask);
assertEqualTasks(putTasks[i], getTask);
final Task ipTask = queue.getInProgressTask(getTask.getTaskId());
assertNotNull(ipTask);
assertEqualTasks(putTasks[i], ipTask);
TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks - (i + 1), 1);
queue.delete(getTask.getTaskId());
counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks - (i + 1), 0);
}
assertTrue(queue.hasQualifiedTasks()); // no tasks, but qualifiers are still there.
assertNull(queue.get(null, null));
}
/**
* test rollback after exceeded time-to-live.
*
* @throws Exception
* test fails
*/
public void testGetTimedOutTasks() throws Exception {
final String queueName = "testGetTimedOutTasks";
final ZkTaskQueue queue = createQueue(queueName);
TaskCounter counter = null;
final int timeToLive = 1000;
final int noOfTasks = 4;
final Task[] putTasks = new Task[noOfTasks];
for (int i = 0; i < noOfTasks; ++i) {
putTasks[i] = createTask(i, queueName);
queue.put(putTasks[i]);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, i + 1, 0);
}
final Task[] getTasks = new Task[noOfTasks];
for (int i = 0; i < noOfTasks; ++i) {
getTasks[i] = queue.get(null, null);
assertNotNull(getTasks[i]);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks - (i + 1), i + 1);
}
Thread.sleep(2 * timeToLive);
queue.keepAlive(getTasks[0].getTaskId());
Collection<String> taskIds = queue.getTimedOutTasks(timeToLive);
assertEquals(noOfTasks - 1, taskIds.size());
assertFalse(taskIds.contains(getTasks[0].getTaskId()));
for (final String taskId : taskIds) {
queue.getInProgressTask(taskId);
queue.delete(taskId);
}
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 1);
queue.getInProgressTask(getTasks[0].getTaskId());
queue.delete(getTasks[0].getTaskId());
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
taskIds = queue.getTimedOutTasks(timeToLive);
assertTrue(taskIds.isEmpty());
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
}
/**
* try to test parallel get requests.
*
* @throws Exception
* test fails.
*/
public void testParallelGet() throws Exception {
final String queueName = "testParallelGet";
final ZkTaskQueue queue = createQueue(queueName);
TaskCounter counter = null;
final int noOfTasks = 100;
final int noOfThreads = 10;
final Task[] putTasks = new Task[noOfTasks];
for (int i = 0; i < noOfTasks; ++i) {
putTasks[i] = createTask(i, queueName);
queue.put(putTasks[i]);
}
counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks, 0);
final Thread[] getThreads = new Thread[noOfThreads];
final Exception[] getExceptions = new Exception[noOfThreads];
for (int i = 0; i < noOfThreads; i++) {
final int threadNo = i;
getThreads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
final Task getTask = queue.get(null, null);
if (getTask == null) {
return;
}
final int taskNo = Integer.parseInt(getTask.getTaskId());
assertEqualTasks(putTasks[taskNo], getTask);
queue.getInProgressTask(getTask.getTaskId());
queue.delete(getTask.getTaskId());
putTasks[taskNo] = null;
}
} catch (final Exception ex) {
getExceptions[threadNo] = ex;
}
}
}, "testParallelGet-Thread #" + threadNo);
}
for (int i = 0; i < noOfThreads; i++) {
getThreads[i].start();
}
for (int i = 0; i < noOfThreads; i++) {
getThreads[i].join();
assertNull("Error in thread " + (i + 1) + ": " + getExceptions[i], getExceptions[i]);
}
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
}
/**
* try to test parallel put requests.
*
* @throws Exception
* test fails.
*/
public void testParallelPut() throws Exception {
final String queueName = "testParallelPut";
final ZkTaskQueue queue = createQueue(queueName);
TaskCounter counter = null;
final int noOfTasksPerThread = 10;
final int noOfThreads = 10;
final int noOfTasks = noOfTasksPerThread * noOfThreads;
final Task[] putTasks = new Task[noOfTasks];
final Thread[] putThreads = new Thread[noOfThreads];
final Exception[] putExceptions = new Exception[noOfThreads];
for (int i = 0; i < noOfThreads; i++) {
final int threadNo = i;
putThreads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int j = 0; j < noOfTasksPerThread; j++) {
final int taskNo = threadNo * noOfTasksPerThread + j;
putTasks[taskNo] = createTask(taskNo, queueName);
queue.put(putTasks[taskNo]);
}
} catch (final Exception ex) {
putExceptions[threadNo] = ex;
}
}
}, "testParallelPut-Thread #" + threadNo);
}
for (int i = 0; i < noOfThreads; i++) {
putThreads[i].start();
}
for (int i = 0; i < noOfThreads; i++) {
putThreads[i].join();
assertNull("Error in thread " + (i + 1) + ": " + putExceptions[i], putExceptions[i]);
}
counter = queue.getTaskCounter();
assertCounter(counter, queueName, noOfTasks, 0);
for (int i = 0; i < noOfTasks; i++) {
final Task getTask = queue.get(null, null);
assertNotNull(getTask);
final int taskNo = Integer.parseInt(getTask.getTaskId());
assertEqualTasks(putTasks[taskNo], getTask);
queue.getInProgressTask(getTask.getTaskId());
queue.delete(getTask.getTaskId());
}
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
}
/**
*
* test purge.
*
* @throws Exception
* test fails
*/
public void testPurge() throws Exception {
final String queueName = "testPurge";
final ZkTaskQueue queue = createQueue(queueName);
final Task task1 = createTask(1, queueName);
queue.put(task1);
final Task task2 = createTask(2, queueName);
queue.put(task2);
final Task task3 = createTask(3, queueName);
task3.setQualifier("parts/partition");
queue.put(task3);
final Task task4 = createTask(4, queueName);
task4.setQualifier("parts/partition");
queue.put(task4);
TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, 4, 0);
final Task getTask1 = queue.get(null, null);
assertNotNull(getTask1);
final Task getTask3 = queue.get(Collections.singleton("parts/partition"), null);
assertNotNull(getTask3);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 2, 2);
queue.purge();
assertNull(queue.get(null, null));
assertNull(queue.get(Collections.singleton("parts/partition"), null));
try {
queue.delete(getTask1.getTaskId());
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(Cause.taskId, ex.getCauseCode());
}
try {
queue.delete(getTask3.getTaskId());
fail("should not work");
} catch (final BadParameterTaskmanagerException ex) {
assertEquals(Cause.taskId, ex.getCauseCode());
}
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
}
/**
* checks cleanup of unqualified queues.
*
* @throws Exception
* on error
*/
public void testCleanUnqualifiedQueues() throws Exception {
final String queueName = "testCleanUnqualifiedQueues";
final ZkTaskQueue queue = createQueue(queueName);
final ZooKeeperService zkService = getService(ZooKeeperService.class);
assertNotNull(zkService);
final ZkConnection zk = new ZkConnection(zkService);
final String todoPartRoot = ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODODIR_SUFFIX;
final List<String> subqueueNames = new ArrayList<String>(Arrays.asList("subQueue1", "subQueue2"));
final List<String> subqueueTodoDirs =
new ArrayList<String>(Arrays.asList(todoPartRoot + "/subQueue1", todoPartRoot + "/subQueue2"));
assertFalse(queue.hasQualifiedTasks());
final Task task1 = createTask(1, queueName);
task1.getProperties().put("jobName", subqueueNames.get(0));
final Task task2 = createTask(2, queueName);
task2.getProperties().put("jobName", subqueueNames.get(1));
queue.put(task1);
queue.put(task2);
assertFalse(queue.hasQualifiedTasks());
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1);
getChildrenChecked(zk, subqueueTodoDirs.get(1), 1);
getChildrenChecked(zk, todoPartRoot, 2);
queue.cleanEmptyNodes(0); // should not cleanup anything
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1);
getChildrenChecked(zk, subqueueTodoDirs.get(1), 1);
getChildrenChecked(zk, todoPartRoot, 2);
final List<String> gotSubqueueNames = new ArrayList<String>(subqueueNames);
Task gotTask = queue.get(null, null);
assertNotNull(gotTask);
String subqueueName = gotTask.getProperties().get("jobName");
assertTrue(gotSubqueueNames.contains(subqueueName));
gotSubqueueNames.remove(subqueueName);
int i = subqueueNames.indexOf(subqueueName);
getChildrenChecked(zk, subqueueTodoDirs.get(i), 0);
getChildrenChecked(zk, subqueueTodoDirs.get(1 - i), 1);
getChildrenChecked(zk, todoPartRoot, 2);
queue.cleanEmptyNodes(0);
getChildrenChecked(zk, subqueueTodoDirs.get(1 - i), 1);
getChildrenChecked(zk, todoPartRoot, 1);
gotTask = queue.get(null, null);
assertNotNull(gotTask);
subqueueName = gotTask.getProperties().get("jobName");
gotSubqueueNames.remove(subqueueName);
assertTrue(gotSubqueueNames.isEmpty());
getChildrenChecked(zk, subqueueTodoDirs.get(1 - i), 0);
getChildrenChecked(zk, todoPartRoot, 1);
queue.cleanEmptyNodes(0);
getChildrenChecked(zk, todoPartRoot, 0);
assertFalse(queue.hasQualifiedTasks());
final Task task3 = createTask(3, queueName);
task3.getProperties().put("jobName", subqueueNames.get(0));
queue.put(task3);
assertFalse(queue.hasQualifiedTasks());
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1);
getChildrenChecked(zk, todoPartRoot, 1);
queue.cleanEmptyNodes(0);
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1);
getChildrenChecked(zk, todoPartRoot, 1);
gotTask = queue.get(null, null);
assertNotNull(gotTask);
assertEquals(subqueueNames.get(0), gotTask.getProperties().get("jobName"));
getChildrenChecked(zk, subqueueTodoDirs.get(0), 0);
getChildrenChecked(zk, todoPartRoot, 1);
queue.cleanEmptyNodes(0);
getChildrenChecked(zk, todoPartRoot, 0);
assertFalse(queue.hasQualifiedTasks());
}
/**
* check cleanup of unused partition queues.
*
* @throws Exception
* test fails
*/
public void testCleanQualifiedQueues() throws Exception {
final String queueName = "testCleanQualifiedQueues";
final ZkTaskQueue queue = createQueue(queueName);
assertFalse(queue.hasQualifiedTasks());
final ZooKeeperService zkService = getService(ZooKeeperService.class);
assertNotNull(zkService);
final ZkConnection zk = new ZkConnection(zkService);
final Task task1 = createTask(1, queueName);
task1.setQualifier("parts/partition1");
final Task task2 = createTask(2, queueName);
task2.setQualifier("parts/partition2");
queue.put(task1);
queue.put(task2);
assertTrue(queue.hasQualifiedTasks());
final String todoPartRoot = ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODOQUALIFIEDDIR_SUFFIX;
final String todoPartDir = todoPartRoot + '/' + ZkTaskQueue.DEFAULT_SUB_QUEUE;
getChildrenChecked(zk, todoPartDir, 2);
queue.cleanEmptyNodes(0); // should not cleanup anything, because there are still tasks.
getChildrenChecked(zk, todoPartDir, 2);
assertNotNull(queue.get(Collections.singleton("parts/partition1"), null));
assertNotNull(queue.get(Collections.singleton("parts/partition2"), null));
Thread.sleep(1000);
final Task task3 = createTask(3, queueName);
task3.setQualifier("parts/partition1");
queue.put(task3);
assertNotNull(queue.get(Collections.singleton("parts/partition1"), null));
final List<String> partitionDirs = getChildrenChecked(zk, todoPartDir, 2);
queue.cleanEmptyNodes(500); // should remove partitions
assertEquals("parts_partition1", partitionDirs.get(0));
assertTrue(queue.hasQualifiedTasks());
Thread.sleep(50);
queue.cleanEmptyNodes(1); // should remove partition1, too
getChildrenChecked(zk, todoPartRoot, 0);
assertFalse(queue.hasQualifiedTasks());
}
/**
* checks cleanup of multiple qualified subqueues.
*
* @throws Exception
* on error
*/
public void testCleanMultipleQualifiedSubqueues() throws Exception {
final String queueName = "testCleanMultipleQualifiedSubqueues";
final ZkTaskQueue queue = createQueue(queueName);
assertFalse(queue.hasQualifiedTasks());
final ZooKeeperService zkService = getService(ZooKeeperService.class);
assertNotNull(zkService);
final ZkConnection zk = new ZkConnection(zkService);
final List<String> part1 = Arrays.asList("parts/partition1");
final List<String> part2 = Arrays.asList("parts/partition2");
final String expected_part1 = "parts_partition1";
final String expected_part2 = "parts_partition2";
final Task task1 = createTask(1, queueName);
task1.getProperties().put("jobName", "subqueue1");
task1.setQualifier(part1.get(0));
final Task task2 = createTask(2, queueName);
task2.getProperties().put("jobName", "subqueue2");
task2.setQualifier(part2.get(0));
queue.put(task1);
queue.put(task2);
final String todoPartRoot = ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODOQUALIFIEDDIR_SUFFIX;
final String subqueue1TodoDir = todoPartRoot + "/subqueue1";
final String subqueue2TodoDir = todoPartRoot + "/subqueue2";
assertTrue(queue.hasQualifiedTasks());
List<String> partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1);
assertEquals(expected_part1, partDirs.get(0));
partDirs = getChildrenChecked(zk, subqueue2TodoDir, 1);
assertEquals(expected_part2, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 2);
queue.cleanEmptyNodes(0); // should not cleanup anything
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1);
assertEquals(expected_part1, partDirs.get(0));
partDirs = getChildrenChecked(zk, subqueue2TodoDir, 1);
assertEquals(expected_part2, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 2);
Task gotTask = queue.get(part1, null);
assertNotNull(gotTask);
assertEquals("subqueue1", gotTask.getProperties().get("jobName"));
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1);
assertEquals(expected_part1, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 2);
queue.cleanEmptyNodes(0);
partDirs = getChildrenChecked(zk, subqueue2TodoDir, 1);
assertEquals(expected_part2, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 1);
gotTask = queue.get(part2, null);
assertNotNull(gotTask);
assertEquals("subqueue2", gotTask.getProperties().get("jobName"));
getChildrenChecked(zk, todoPartRoot, 1);
queue.cleanEmptyNodes(0);
getChildrenChecked(zk, todoPartRoot, 0);
assertFalse(queue.hasQualifiedTasks());
final Task task3 = createTask(3, queueName);
task3.getProperties().put("jobName", "subqueue1");
task3.setQualifier(part1.get(0));
queue.put(task3);
assertTrue(queue.hasQualifiedTasks());
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1);
assertEquals(expected_part1, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 1);
queue.cleanEmptyNodes(0);
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1);
assertEquals(expected_part1, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 1);
gotTask = queue.get(part1, null);
assertNotNull(gotTask);
assertEquals("subqueue1", gotTask.getProperties().get("jobName"));
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1);
assertEquals(expected_part1, partDirs.get(0));
getChildrenChecked(zk, todoPartRoot, 1);
queue.cleanEmptyNodes(0);
getChildrenChecked(zk, todoPartRoot, 0);
assertFalse(queue.hasQualifiedTasks());
}
/**
* check removal of state tasks.
*
* @throws Exception
* test fails.
*/
public void testStaleTasks() throws Exception {
final String queueName = "testStaleTasks";
final ZkTaskQueue queue = createQueue(queueName);
final ZooKeeperService zkService = getService(ZooKeeperService.class);
assertNotNull(zkService);
final ZkConnection zk = new ZkConnection(zkService);
final Task putTask = createTask(1, queueName);
queue.put(putTask);
final String todoDir =
ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODODIR_SUFFIX + '/'
+ ZkTaskQueue.DEFAULT_SUB_QUEUE;
final String staleTask = todoDir + "/staletask";
zk.createNode(staleTask, ZkConnection.NO_DATA);
TaskCounter counter = queue.getTaskCounter();
assertCounter(counter, queueName, 2, 0);
queue.cleanEmptyNodes(1000);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 2, 0);
Thread.sleep(1000);
queue.cleanEmptyNodes(1);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 1, 0);
final Task getTask = queue.get(null, null);
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 1);
assertEqualTasks(putTask, getTask);
queue.getInProgressTask(getTask.getTaskId());
queue.delete(getTask.getTaskId());
counter = queue.getTaskCounter();
assertCounter(counter, queueName, 0, 0);
}
/** test task counters. */
public void testTaskInfos() throws Exception {
final String queueName = "testTaskInfos";
final ZkTaskQueue queue = createQueue(queueName);
final int noOfTasks = 4;
final Task[] tasks = new Task[4];
for (int i = 0; i < noOfTasks; i++) {
tasks[i] = createTask(i, queueName);
if (i >= noOfTasks / 2) {
tasks[i].setQualifier("parts/partition");
}
queue.put(tasks[i]);
}
final TaskCounter todoCounter = queue.getTaskCounter();
assertCounter(todoCounter, queueName, noOfTasks, 0);
final TaskList startIpList = queue.getTaskList("inprogress", 1);
assertEquals(0, startIpList.getSize());
assertTrue(startIpList.getTaskNames().isEmpty());
final TaskList todoTaskList = queue.getTaskList("todo", 10);
assertEquals(noOfTasks, todoTaskList.getSize());
final List<String> todoTaskNames = todoTaskList.getTaskNames();
for (int i = 0; i < noOfTasks; i++) {
final String taskPath = todoTaskNames.get(i);
assertTrue(taskPath.startsWith("/smila/tasks/testTaskInfos/"));
final String taskName = taskPath.substring("/smila/tasks/testTaskInfos/".length());
if (i < noOfTasks / 2) {
assertTrue(taskName.startsWith("todo/"));
} else {
assertTrue(taskName.startsWith("todo_qualified/parts_partition"));
}
final int index = taskName.indexOf('/');
final AnyMap taskInfo = queue.getTaskInfo(taskName.substring(0, index), taskName.substring(index + 1));
assertNotNull(taskInfo);
assertTrue(taskInfo.containsKey("created"));
assertTrue(taskInfo.containsKey("modified"));
assertTrue(taskInfo.containsKey("task"));
assertTrue(tasks[i].toAny().equals(taskInfo.get("task")));
if (i < noOfTasks / 2) {
queue.get(null, null);
} else {
queue.get(Collections.singleton("parts/partition"), null);
}
}
final TaskCounter ipCounter = queue.getTaskCounter();
assertCounter(ipCounter, queueName, 0, noOfTasks);
final TaskList endTodoList = queue.getTaskList("todo", 1);
assertEquals(0, endTodoList.getSize());
assertTrue(endTodoList.getTaskNames().isEmpty());
final TaskList ipTaskList = queue.getTaskList("inprogress", 10);
assertEquals(noOfTasks, ipTaskList.getSize());
final List<String> ipTaskNames = ipTaskList.getTaskNames();
for (int i = 0; i < noOfTasks; i++) {
final String taskPath = ipTaskNames.get(i);
assertTrue(taskPath.startsWith("/smila/tasks/testTaskInfos/"));
final String taskName = taskPath.substring("/smila/tasks/testTaskInfos/".length());
assertTrue(taskName.startsWith("inprogress/"));
final int index = taskName.indexOf('/');
final AnyMap taskInfo = queue.getTaskInfo(taskName.substring(0, index), taskName.substring(index + 1));
assertNotNull(taskInfo);
assertTrue(taskInfo.containsKey("created"));
assertTrue(taskInfo.containsKey("modified"));
assertTrue(taskInfo.containsKey("task"));
final Task infoTask = Task.fromAny(taskInfo.get("task"));
infoTask.getProperties().remove(ZkTaskQueue.OWNER_PROP);
// remove start and end time
infoTask.getProperties().remove(Task.PROPERTY_END_TIME);
infoTask.getProperties().remove(Task.PROPERTY_START_TIME);
infoTask.getProperties().remove(Task.PROPERY_TASK_AGE);
assertTrue(tasks[i].toAny().equals(infoTask.toAny()));
queue.getInProgressTask(infoTask.getTaskId());
queue.delete(infoTask.getTaskId());
}
final TaskCounter endCounter = queue.getTaskCounter();
assertCounter(endCounter, queueName, 0, 0);
final TaskList endIpList = queue.getTaskList("inprogress", 1);
assertEquals(0, endIpList.getSize());
assertTrue(endIpList.getTaskNames().isEmpty());
}
/**
* test that 'host' property is set.
*/
public void testHostProperty() throws Exception {
final String queueName = "testHost";
final ZkTaskQueue queue = createQueue(queueName);
// without host
Task putTask = createTask(1, queueName);
queue.put(putTask);
Task getTask = queue.get(null, null);
assertNotNull(getTask);
assertNull(getTask.getProperties().get(Task.PROPERTY_WORKER_HOST));
Task ipTask = queue.getInProgressTask(getTask.getTaskId());
assertNotNull(ipTask);
assertNull(ipTask.getProperties().get(Task.PROPERTY_WORKER_HOST));
assertEqualTasks(getTask, ipTask);
// with host
putTask = createTask(2, queueName);
queue.put(putTask);
getTask = queue.get(null, "host");
assertNotNull(getTask);
assertEquals("host", getTask.getProperties().get(Task.PROPERTY_WORKER_HOST));
ipTask = queue.getInProgressTask(getTask.getTaskId());
assertNotNull(ipTask);
assertNull(ipTask.getProperties().get(Task.PROPERTY_WORKER_HOST));
getTask.getProperties().remove(Task.PROPERTY_WORKER_HOST);
assertEqualTasks(getTask, ipTask);
}
/** test if task delivery is limited. */
public void testScaleUpControl() throws Exception {
final String queueName = "testScaleUpControl";
final String hostName = queueName;
final ZkTaskQueue queue = createQueue(queueName, 2);
queue.put(createTask(1, queueName));
queue.put(createTask(2, queueName));
queue.put(createTask(3, queueName));
final Task task1 = queue.get(null, hostName);
assertNotNull(task1);
final Task task2 = queue.get(null, hostName);
assertNotNull(task2);
Task task3 = queue.get(null, hostName);
assertNull(task3);
queue.getInProgressTask(task1.getTaskId());
queue.delete(task1.getTaskId());
task3 = queue.get(null, hostName);
assertNotNull(task3);
queue.getInProgressTask(task2.getTaskId());
queue.delete(task2.getTaskId());
queue.getInProgressTask(task3.getTaskId());
queue.delete(task3.getTaskId());
}
/** test if scaleup control does not affect runAlways workers. */
public void testNoLimitForRunAlways() throws Exception {
final String queueName = "testLimitForRunAlways";
final String hostName = null;
final ZkTaskQueue queue = createQueue(queueName, 2);
queue.put(createTask(1, queueName));
queue.put(createTask(2, queueName));
queue.put(createTask(3, queueName));
final Task task1 = queue.get(null, hostName);
assertNotNull(task1);
final Task task2 = queue.get(null, hostName);
assertNotNull(task2);
final Task task3 = queue.get(null, hostName);
assertNotNull(task3);
queue.getInProgressTask(task1.getTaskId());
queue.delete(task1.getTaskId());
queue.getInProgressTask(task2.getTaskId());
queue.delete(task2.getTaskId());
queue.getInProgressTask(task3.getTaskId());
queue.delete(task3.getTaskId());
}
/** simple test for getting tasks with a delivery delay. */
public void testDeliveryDelay() throws Exception {
final ZkTaskQueue queue = createQueue("workerDeliveryDelay");
final int noOfTasks = 10;
final int taskDelayMillis = 1000;
final int delta = 50; // test should not fail due to time measure imprecision.
// create and put some delayed tasks in the queues
for (int i = 1; i <= noOfTasks; i++) {
final Task t1 = createTask(i, queue.getWorkerName());
t1.getProperties().put(Task.PROPERTY_DELIVERY_DELAY, String.valueOf(taskDelayMillis));
queue.put(t1);
}
// first task should be delivered immediately
long start = System.currentTimeMillis();
Task getTask = queue.get(null, null);
long deliveryDuration = System.currentTimeMillis() - start;
assertNotNull(getTask);
assertTrue("first was: " + deliveryDuration, deliveryDuration < 100);
// check delivery delay when getting further tasks
start = System.currentTimeMillis();
int deliveredTasks = 1;
while (deliveredTasks < noOfTasks) {
getTask = queue.get(null, null);
if (getTask != null) {
deliveryDuration = System.currentTimeMillis() - start;
assertTrue(queue.getWorkerName() + " delivery too fast, was: " + deliveryDuration,
deliveryDuration >= taskDelayMillis - delta);
// assert delay not too long
assertTrue(queue.getWorkerName() + " delivery too slow, was: " + deliveryDuration,
deliveryDuration < taskDelayMillis + delta);
start = System.currentTimeMillis();
deliveredTasks++;
}
}
assertEquals(noOfTasks, deliveredTasks);
}
/** tests getting of tasks with a delivery delay for multiple task queues in parallel . */
public void testDeliveryDelayMultipleQueues() throws Exception {
final ZkTaskQueue queue1 = createQueue("worker1");
final ZkTaskQueue queue2 = createQueue("worker2");
final ZkTaskQueue queueNoDelay = createQueue("worker3");
final int noOfTasks = 10;
final int taskDelayMillis = 1000;
// create and put some delayed tasks in the queues
for (int i = 1; i <= noOfTasks; i++) {
final Task t1 = createTask(i, queue1.getWorkerName());
t1.getProperties().put(Task.PROPERTY_DELIVERY_DELAY, String.valueOf(taskDelayMillis));
queue1.put(t1);
final Task t2 = createTask(i, queue2.getWorkerName());
t2.getProperties().put(Task.PROPERTY_DELIVERY_DELAY, String.valueOf(taskDelayMillis));
queue2.put(t2);
final Task t3 = createTask(i, queueNoDelay.getWorkerName());
queueNoDelay.put(t3);
}
/** We want parallel threads so this is a Callable which runs our test for a given queue. */
final class TestCallable implements Callable<String> {
public static final String SUCCESS = "Success";
/** test should not fail due to time measure imprecision. */
private static final int DELTA = 50;
/** ensure that other threads got processing time. */
private static final int SLEEP_WHEN_GETTING_TASK = 10;
private final ZkTaskQueue _queue;
private final int _expectedDelay;
private String _result = SUCCESS;
/** params: the queue and the expected delay for task delivery. */
public TestCallable(final ZkTaskQueue queue, final int expectedDelay) {
_queue = queue;
_expectedDelay = expectedDelay;
}
@Override
public String call() {
try {
// first task should be delivered immediately
long start = System.currentTimeMillis();
Task getTask = _queue.get(null, null);
long deliveryDuration = System.currentTimeMillis() - start;
assertNotNull(getTask);
assertTrue("first was: " + deliveryDuration, deliveryDuration < 100);
// check delivery delay when getting further tasks
start = System.currentTimeMillis();
int deliveredTasks = 1;
while (deliveredTasks < noOfTasks) {
getTask = _queue.get(null, null);
if (getTask != null) {
deliveryDuration = System.currentTimeMillis() - start;
assertTrue(_queue.getWorkerName() + " delivery too fast, was: " + deliveryDuration,
deliveryDuration >= _expectedDelay - DELTA);
// assert delay not too long
assertTrue(_queue.getWorkerName() + " delivery too slow, was: " + deliveryDuration,
deliveryDuration < _expectedDelay + DELTA);
start = System.currentTimeMillis();
deliveredTasks++;
Thread.sleep(SLEEP_WHEN_GETTING_TASK);
}
}
assertEquals(noOfTasks, deliveredTasks);
} catch (final Throwable t) {
_result = t.getMessage();
t.printStackTrace();
}
return _result;
}
}
ExecutorService executor = null;
try {
executor = Executors.newFixedThreadPool(3);
final TestCallable testQueue1 = new TestCallable(queue1, taskDelayMillis);
final TestCallable testQueue2 = new TestCallable(queue2, taskDelayMillis);
final TestCallable testQueue3 = new TestCallable(queueNoDelay, 0);
final Future<String> result1 = executor.submit(testQueue1);
final Future<String> result2 = executor.submit(testQueue2);
final Future<String> result3 = executor.submit(testQueue3);
assertEquals(TestCallable.SUCCESS, result1.get(15, TimeUnit.SECONDS));
assertEquals(TestCallable.SUCCESS, result2.get(15, TimeUnit.SECONDS));
assertEquals(TestCallable.SUCCESS, result3.get(15, TimeUnit.SECONDS));
} finally {
executor.shutdownNow();
}
}
/** testing filterDuplicates() with more threads, ensure no unique task is lost. */
public void testFilterDuplicatesMassiveParallelCheck() throws Exception {
final ZkTaskQueue queue = createQueue("workerDuplicatesMassive");
final int noOfTasks = 10;
final int noOfParallelChecks = 40;
final AtomicInteger noOfExceptions = new AtomicInteger(0);
final ExecutorService exec = Executors.newFixedThreadPool(noOfParallelChecks);
final CountDownLatch latch = new CountDownLatch(1);
for (int checkNo = 0; checkNo < noOfParallelChecks; checkNo++) {
final int offset = checkNo;
exec.submit(new Runnable() {
@Override
public void run() {
final Random rand = new Random(System.nanoTime());
try {
latch.await();
// create and put some delayed tasks in the queues
final List<Task> taskList = new ArrayList<>();
for (int i = 0; i < noOfTasks; i++) {
Thread.sleep(rand.nextInt(50));
final Task t1 = createTask(offset * noOfTasks + i, queue.getWorkerName());
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i);
taskList.add(t1);
for (final Task uniqueTask : queue.filterDuplicates(taskList)) {
queue.put(uniqueTask);
}
}
} catch (final InterruptedException | TaskmanagerException e) {
e.printStackTrace();
noOfExceptions.getAndIncrement();
}
}
});
}
latch.countDown();
exec.shutdown();
exec.awaitTermination(1, TimeUnit.MINUTES);
// no exceptions
assertEquals(0, noOfExceptions.get());
// now check that each unique id is present
final Set<String> taskUniqueIds = new HashSet<>();
Task task = queue.get(null, null);
while (task != null) {
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG));
task = queue.get(null, null);
}
assertEquals(noOfTasks, taskUniqueIds.size());
}
/** testing filterDuplicates(). */
public void testFilterDuplicates() throws Exception {
final ZkTaskQueue queue = createQueue("workerDuplicates");
final int noOfTasks = 10;
final int noOfIterations = 3;
for (int iteration = 0; iteration < noOfIterations; iteration++) {
// create and put some delayed tasks in the queues
for (int i = 1; i <= noOfTasks; i++) {
final Task t1 = createTask(iteration * noOfTasks + i, queue.getWorkerName());
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i);
for (final Task uniqueTask : queue.filterDuplicates(Arrays.asList(t1))) {
queue.put(uniqueTask);
}
}
}
// check that only one set of tasks has been created...
assertEquals(noOfTasks, queue.getTaskList(ZkTaskQueue.SECTION_TODO, noOfIterations * noOfTasks).getSize());
// now check that each unique id is present
final Set<String> taskUniqueIds = new HashSet<>();
Task task = queue.get(null, null);
while (task != null) {
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG));
task = queue.get(null, null);
}
assertEquals(noOfTasks, taskUniqueIds.size());
queue.purge();
}
/** testing filterDuplicates() that only duplicates will be filtered for "todo" tasks. */
public void testFilterDuplicatesOnlyToDo() throws Exception {
final ZkTaskQueue queue = createQueue("workerDuplicates");
final int noOfTasks = 10;
final int noOfIterations = 3;
for (int iteration = 0; iteration < noOfIterations; iteration++) {
// create and put some delayed tasks in the queues
for (int i = 1; i <= noOfTasks; i++) {
final Task t1 = createTask(iteration * noOfTasks + i, queue.getWorkerName());
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i);
for (final Task uniqueTask : queue.filterDuplicates(Arrays.asList(t1))) {
queue.put(uniqueTask);
}
}
// put each task in progress
// now check that each unique id is present
final Set<String> taskUniqueIds = new HashSet<>();
Task task = queue.get(null, null);
while (task != null) {
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG));
queue.putInProgress(task);
task = queue.get(null, null);
}
assertEquals(noOfTasks, taskUniqueIds.size());
}
// check that only one set of tasks has been created for each run...
assertEquals(noOfIterations * noOfTasks,
queue.getTaskList(ZkTaskQueue.SECTION_INPROGRESS, noOfIterations * noOfTasks).getSize());
// no tasks in todo
assertEquals(0, queue.getTaskList(ZkTaskQueue.SECTION_TODO, noOfIterations * noOfTasks).getSize());
queue.purge();
}
/**
* get children of path and check for count.
*
* @param zk
* ZooKeeper connection
* @param path
* path
* @param expectedCount
* expected number of children
* @return children
* @throws Exception
* fail
*/
private List<String> getChildrenChecked(final ZkConnection zk, final String path, final int expectedCount)
throws Exception {
final List<String> partitionDirs = zk.getChildrenSorted(path);
assertEquals(expectedCount, partitionDirs.size());
return partitionDirs;
}
/**
* create queue.
*
* @param queueName
* name
* @return queue
*/
private ZkTaskQueue createQueue(final String queueName) {
return createQueue(queueName, TaskStorageZk.TASKSPERHOST_UNLIMITED);
}
/**
* create queue with scale-up limit.
*/
private ZkTaskQueue createQueue(final String queueName, final long maxScaleUp) {
return new ZkTaskQueue(_zk, queueName, "localhost", maxScaleUp);
}
/**
* @return created task
*/
protected static Task createTask(final int taskNo, final String workerName) {
final String taskId = Integer.toString(taskNo);
final Task task = new Task(taskId, workerName);
task.getProperties().put("worker", workerName);
task.getProperties().put("taskId", taskId);
task.getParameters().put("worker", workerName);
task.getParameters().put("taskId", taskId);
final List<BulkInfo> input = new ArrayList<BulkInfo>();
input.add(new BulkInfo("inputbucket", "inputstore", "input" + taskId));
task.getInputBulks().put("input", input);
final List<BulkInfo> output = new ArrayList<BulkInfo>();
output.add(new BulkInfo("outputbucket", "outputstore", "output" + taskId));
task.getInputBulks().put("output", output);
return task;
}
/**
* assert that all task members and properties are equal.
*
* @param putTask
* expected task
* @param getTask
* actual task
*/
protected static void assertEqualTasks(final Task putTask, final Task getTask) {
assertEquals(putTask.getTaskId(), getTask.getTaskId());
assertEquals(putTask.getWorkerName(), getTask.getWorkerName());
assertEquals(putTask.getQualifier(), getTask.getQualifier());
// remove start and end time
getTask.getProperties().remove(Task.PROPERTY_START_TIME);
getTask.getProperties().remove(Task.PROPERTY_END_TIME);
getTask.getProperties().remove(Task.PROPERY_TASK_AGE);
putTask.getProperties().remove(Task.PROPERTY_START_TIME);
putTask.getProperties().remove(Task.PROPERTY_END_TIME);
putTask.getProperties().remove(Task.PROPERY_TASK_AGE);
assertEquals(putTask.getProperties(), getTask.getProperties());
assertEquals(putTask.getParameters(), getTask.getParameters());
assertEquals(putTask.getInputBulks(), getTask.getInputBulks());
assertEquals(putTask.getOutputBulks(), getTask.getOutputBulks());
}
/**
* assert task counter values.
*
* @param counter
* actual counter
* @param name
* expected name
* @param todo
* expected todo count
* @param inProgress
* expected inprogress count.
*/
protected static void assertCounter(final TaskCounter counter, final String name, final int todo,
final int inProgress) {
assertNotNull(counter);
assertEquals("wrong name", name, counter.getWorkerName());
assertEquals("wrong todo counter", todo, counter.getTasksTodo());
assertEquals("wrong inprogress counter", inProgress, counter.getTasksInProgress());
}
}