/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.persistence.zk.test; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.Set; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException.Cause; | |
import org.eclipse.smila.taskmanager.BulkInfo; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskList; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.persistence.zk.TaskStorageZk; | |
import org.eclipse.smila.taskmanager.persistence.zk.ZkTaskQueue; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
import org.eclipse.smila.zookeeper.ZkConnection; | |
import org.eclipse.smila.zookeeper.ZooKeeperService; | |
/** | |
* test basic functionality of ZkTaskQueue. | |
*/ | |
public class TestZkTaskQueue extends DeclarativeServiceTestCase { | |
/** ZooKeeper service. */ | |
private ZooKeeperService _zk; | |
/** | |
* test if ZooKeeper service was successfully started and registered. | |
* | |
* @throws Exception | |
* no service found. | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_zk = getService(ZooKeeperService.class); | |
assertNotNull(_zk); | |
} | |
/** | |
* @throws Exception | |
* test fails | |
*/ | |
public void testBasic() throws Exception { | |
final String queueName = "testBasic"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final Task putTask = createTask(1, queueName); | |
queue.put(putTask); | |
assertNull(queue.get(Collections.singleton("partition"), null)); | |
final Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
assertEqualTasks(putTask, getTask); | |
final Task ipTask = queue.getInProgressTask(getTask.getTaskId()); | |
assertNotNull(ipTask); | |
assertEqualTasks(putTask, ipTask); | |
queue.delete(getTask.getTaskId()); | |
assertNull(queue.get(null, null)); | |
} | |
/** | |
* @throws Exception | |
* test fails | |
*/ | |
public void testPutInProgress() throws Exception { | |
final String queueName = "testPutInProgress"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final Task putTask = createTask(1, queueName); | |
queue.putInProgress(putTask); | |
assertNull(queue.get(Collections.singleton("partition"), null)); | |
assertNull(queue.get(null, null)); | |
final Task ipTask = queue.getInProgressTask(putTask.getTaskId()); | |
assertNotNull(ipTask); | |
assertEqualTasks(putTask, ipTask); | |
queue.delete(ipTask.getTaskId()); | |
assertNull(queue.get(null, null)); | |
} | |
/** | |
* @throws Exception | |
* test fails | |
*/ | |
public void testCounter() throws Exception { | |
final String queueName = "testCounter"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final Task putTask = createTask(1, queueName); | |
queue.put(putTask); | |
TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 1, 0); | |
final Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
assertEqualTasks(putTask, getTask); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 1); | |
queue.getInProgressTask(getTask.getTaskId()); | |
queue.delete(getTask.getTaskId()); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
assertNull(queue.get(null, null)); | |
} | |
/** | |
* @throws Exception | |
* test fails | |
*/ | |
public void testKeepAlive() throws Exception { | |
final String queueName = "testKeepAlive"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final Task putTask = createTask(1, queueName); | |
queue.put(putTask); | |
assertNull(queue.get(Collections.singleton("partition"), null)); | |
final Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
assertEqualTasks(putTask, getTask); | |
queue.keepAlive(getTask.getTaskId()); | |
queue.getInProgressTask(getTask.getTaskId()); | |
queue.delete(getTask.getTaskId()); | |
assertNull(queue.get(null, null)); | |
try { | |
queue.keepAlive(getTask.getTaskId()); | |
fail("should not work"); | |
} catch (final Exception ex) { | |
assertTrue("wrong exception", ex instanceof TaskmanagerException); | |
} | |
} | |
/** | |
* check that task finishing can be retried in a new session. | |
* | |
* @throws Exception | |
*/ | |
public void testRetryTaskFinishing() throws Exception { | |
final String queueName = "testDeleteWithoutFinishingError"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final Task putTask = createTask(1, queueName); | |
queue.put(putTask); | |
final Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
assertEqualTasks(putTask, getTask); | |
assertNotNull(queue.getInProgressTask(putTask.getTaskId())); | |
queue.disconnectZkSession(); | |
assertNull(queue.get(null, null)); | |
assertNotNull(queue.getInProgressTask(putTask.getTaskId())); | |
queue.delete(putTask.getTaskId()); | |
assertNull(queue.get(null, null)); | |
} | |
/** | |
* @throws Exception | |
* test fails | |
*/ | |
public void testTaskOrdering() throws Exception { | |
final String queueName = "testTaskOrdering"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final int noOfTasks = 16; | |
final Task[] putTasks = new Task[noOfTasks]; | |
for (int i = 0; i < noOfTasks; ++i) { | |
putTasks[i] = createTask(i, queueName); | |
queue.put(putTasks[i]); | |
final TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, i + 1, 0); | |
} | |
for (int i = 0; i < noOfTasks; ++i) { | |
assertNull(queue.get(Collections.singleton("partition"), null)); | |
final Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
assertEqualTasks(putTasks[i], getTask); | |
TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks - (i + 1), 1); | |
queue.getInProgressTask(getTask.getTaskId()); | |
queue.delete(getTask.getTaskId()); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks - (i + 1), 0); | |
} | |
assertNull(queue.get(null, null)); | |
} | |
/** | |
* @throws Exception | |
* test fails | |
*/ | |
public void testQualifiedQueue() throws Exception { | |
final String queueName = "testQualifiedQueue"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
assertFalse(queue.hasQualifiedTasks()); | |
final int noOfPartitions = 4; | |
final int noOfTasksPerPartition = 4; | |
final int noOfTasks = noOfPartitions * noOfTasksPerPartition; | |
final Task[] putTasks = new Task[noOfTasks]; | |
for (int i = 0; i < noOfTasks; ++i) { | |
putTasks[i] = createTask(i, queueName); | |
final int partition = i % noOfPartitions; | |
putTasks[i].setQualifier("parts/" + Integer.toString(partition)); | |
queue.put(putTasks[i]); | |
final TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, i + 1, 0); | |
} | |
assertTrue(queue.hasQualifiedTasks()); | |
for (int i = 0; i < noOfTasks; ++i) { | |
assertNull(queue.get(null, null)); | |
assertNull(queue.get(Collections.singleton("parts/partition"), null)); | |
final int partition = i % noOfPartitions; | |
final Task getTask = queue.get(Collections.singleton("parts/" + Integer.toString(partition)), null); | |
assertNotNull(getTask); | |
assertEqualTasks(putTasks[i], getTask); | |
final Task ipTask = queue.getInProgressTask(getTask.getTaskId()); | |
assertNotNull(ipTask); | |
assertEqualTasks(putTasks[i], ipTask); | |
TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks - (i + 1), 1); | |
queue.delete(getTask.getTaskId()); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks - (i + 1), 0); | |
} | |
assertTrue(queue.hasQualifiedTasks()); // no tasks, but qualifiers are still there. | |
assertNull(queue.get(null, null)); | |
} | |
/** | |
* test rollback after exceeded time-to-live. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testGetTimedOutTasks() throws Exception { | |
final String queueName = "testGetTimedOutTasks"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
TaskCounter counter = null; | |
final int timeToLive = 1000; | |
final int noOfTasks = 4; | |
final Task[] putTasks = new Task[noOfTasks]; | |
for (int i = 0; i < noOfTasks; ++i) { | |
putTasks[i] = createTask(i, queueName); | |
queue.put(putTasks[i]); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, i + 1, 0); | |
} | |
final Task[] getTasks = new Task[noOfTasks]; | |
for (int i = 0; i < noOfTasks; ++i) { | |
getTasks[i] = queue.get(null, null); | |
assertNotNull(getTasks[i]); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks - (i + 1), i + 1); | |
} | |
Thread.sleep(2 * timeToLive); | |
queue.keepAlive(getTasks[0].getTaskId()); | |
Collection<String> taskIds = queue.getTimedOutTasks(timeToLive); | |
assertEquals(noOfTasks - 1, taskIds.size()); | |
assertFalse(taskIds.contains(getTasks[0].getTaskId())); | |
for (final String taskId : taskIds) { | |
queue.getInProgressTask(taskId); | |
queue.delete(taskId); | |
} | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 1); | |
queue.getInProgressTask(getTasks[0].getTaskId()); | |
queue.delete(getTasks[0].getTaskId()); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
taskIds = queue.getTimedOutTasks(timeToLive); | |
assertTrue(taskIds.isEmpty()); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
} | |
/** | |
* try to test parallel get requests. | |
* | |
* @throws Exception | |
* test fails. | |
*/ | |
public void testParallelGet() throws Exception { | |
final String queueName = "testParallelGet"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
TaskCounter counter = null; | |
final int noOfTasks = 100; | |
final int noOfThreads = 10; | |
final Task[] putTasks = new Task[noOfTasks]; | |
for (int i = 0; i < noOfTasks; ++i) { | |
putTasks[i] = createTask(i, queueName); | |
queue.put(putTasks[i]); | |
} | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks, 0); | |
final Thread[] getThreads = new Thread[noOfThreads]; | |
final Exception[] getExceptions = new Exception[noOfThreads]; | |
for (int i = 0; i < noOfThreads; i++) { | |
final int threadNo = i; | |
getThreads[i] = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
while (true) { | |
final Task getTask = queue.get(null, null); | |
if (getTask == null) { | |
return; | |
} | |
final int taskNo = Integer.parseInt(getTask.getTaskId()); | |
assertEqualTasks(putTasks[taskNo], getTask); | |
queue.getInProgressTask(getTask.getTaskId()); | |
queue.delete(getTask.getTaskId()); | |
putTasks[taskNo] = null; | |
} | |
} catch (final Exception ex) { | |
getExceptions[threadNo] = ex; | |
} | |
} | |
}, "testParallelGet-Thread #" + threadNo); | |
} | |
for (int i = 0; i < noOfThreads; i++) { | |
getThreads[i].start(); | |
} | |
for (int i = 0; i < noOfThreads; i++) { | |
getThreads[i].join(); | |
assertNull("Error in thread " + (i + 1) + ": " + getExceptions[i], getExceptions[i]); | |
} | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
} | |
/** | |
* try to test parallel put requests. | |
* | |
* @throws Exception | |
* test fails. | |
*/ | |
public void testParallelPut() throws Exception { | |
final String queueName = "testParallelPut"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
TaskCounter counter = null; | |
final int noOfTasksPerThread = 10; | |
final int noOfThreads = 10; | |
final int noOfTasks = noOfTasksPerThread * noOfThreads; | |
final Task[] putTasks = new Task[noOfTasks]; | |
final Thread[] putThreads = new Thread[noOfThreads]; | |
final Exception[] putExceptions = new Exception[noOfThreads]; | |
for (int i = 0; i < noOfThreads; i++) { | |
final int threadNo = i; | |
putThreads[i] = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
for (int j = 0; j < noOfTasksPerThread; j++) { | |
final int taskNo = threadNo * noOfTasksPerThread + j; | |
putTasks[taskNo] = createTask(taskNo, queueName); | |
queue.put(putTasks[taskNo]); | |
} | |
} catch (final Exception ex) { | |
putExceptions[threadNo] = ex; | |
} | |
} | |
}, "testParallelPut-Thread #" + threadNo); | |
} | |
for (int i = 0; i < noOfThreads; i++) { | |
putThreads[i].start(); | |
} | |
for (int i = 0; i < noOfThreads; i++) { | |
putThreads[i].join(); | |
assertNull("Error in thread " + (i + 1) + ": " + putExceptions[i], putExceptions[i]); | |
} | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, noOfTasks, 0); | |
for (int i = 0; i < noOfTasks; i++) { | |
final Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
final int taskNo = Integer.parseInt(getTask.getTaskId()); | |
assertEqualTasks(putTasks[taskNo], getTask); | |
queue.getInProgressTask(getTask.getTaskId()); | |
queue.delete(getTask.getTaskId()); | |
} | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
} | |
/** | |
* | |
* test purge. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testPurge() throws Exception { | |
final String queueName = "testPurge"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final Task task1 = createTask(1, queueName); | |
queue.put(task1); | |
final Task task2 = createTask(2, queueName); | |
queue.put(task2); | |
final Task task3 = createTask(3, queueName); | |
task3.setQualifier("parts/partition"); | |
queue.put(task3); | |
final Task task4 = createTask(4, queueName); | |
task4.setQualifier("parts/partition"); | |
queue.put(task4); | |
TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 4, 0); | |
final Task getTask1 = queue.get(null, null); | |
assertNotNull(getTask1); | |
final Task getTask3 = queue.get(Collections.singleton("parts/partition"), null); | |
assertNotNull(getTask3); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 2, 2); | |
queue.purge(); | |
assertNull(queue.get(null, null)); | |
assertNull(queue.get(Collections.singleton("parts/partition"), null)); | |
try { | |
queue.delete(getTask1.getTaskId()); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(Cause.taskId, ex.getCauseCode()); | |
} | |
try { | |
queue.delete(getTask3.getTaskId()); | |
fail("should not work"); | |
} catch (final BadParameterTaskmanagerException ex) { | |
assertEquals(Cause.taskId, ex.getCauseCode()); | |
} | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
} | |
/** | |
* checks cleanup of unqualified queues. | |
* | |
* @throws Exception | |
* on error | |
*/ | |
public void testCleanUnqualifiedQueues() throws Exception { | |
final String queueName = "testCleanUnqualifiedQueues"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final ZooKeeperService zkService = getService(ZooKeeperService.class); | |
assertNotNull(zkService); | |
final ZkConnection zk = new ZkConnection(zkService); | |
final String todoPartRoot = ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODODIR_SUFFIX; | |
final List<String> subqueueNames = new ArrayList<String>(Arrays.asList("subQueue1", "subQueue2")); | |
final List<String> subqueueTodoDirs = | |
new ArrayList<String>(Arrays.asList(todoPartRoot + "/subQueue1", todoPartRoot + "/subQueue2")); | |
assertFalse(queue.hasQualifiedTasks()); | |
final Task task1 = createTask(1, queueName); | |
task1.getProperties().put("jobName", subqueueNames.get(0)); | |
final Task task2 = createTask(2, queueName); | |
task2.getProperties().put("jobName", subqueueNames.get(1)); | |
queue.put(task1); | |
queue.put(task2); | |
assertFalse(queue.hasQualifiedTasks()); | |
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1); | |
getChildrenChecked(zk, subqueueTodoDirs.get(1), 1); | |
getChildrenChecked(zk, todoPartRoot, 2); | |
queue.cleanEmptyNodes(0); // should not cleanup anything | |
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1); | |
getChildrenChecked(zk, subqueueTodoDirs.get(1), 1); | |
getChildrenChecked(zk, todoPartRoot, 2); | |
final List<String> gotSubqueueNames = new ArrayList<String>(subqueueNames); | |
Task gotTask = queue.get(null, null); | |
assertNotNull(gotTask); | |
String subqueueName = gotTask.getProperties().get("jobName"); | |
assertTrue(gotSubqueueNames.contains(subqueueName)); | |
gotSubqueueNames.remove(subqueueName); | |
int i = subqueueNames.indexOf(subqueueName); | |
getChildrenChecked(zk, subqueueTodoDirs.get(i), 0); | |
getChildrenChecked(zk, subqueueTodoDirs.get(1 - i), 1); | |
getChildrenChecked(zk, todoPartRoot, 2); | |
queue.cleanEmptyNodes(0); | |
getChildrenChecked(zk, subqueueTodoDirs.get(1 - i), 1); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
gotTask = queue.get(null, null); | |
assertNotNull(gotTask); | |
subqueueName = gotTask.getProperties().get("jobName"); | |
gotSubqueueNames.remove(subqueueName); | |
assertTrue(gotSubqueueNames.isEmpty()); | |
getChildrenChecked(zk, subqueueTodoDirs.get(1 - i), 0); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
queue.cleanEmptyNodes(0); | |
getChildrenChecked(zk, todoPartRoot, 0); | |
assertFalse(queue.hasQualifiedTasks()); | |
final Task task3 = createTask(3, queueName); | |
task3.getProperties().put("jobName", subqueueNames.get(0)); | |
queue.put(task3); | |
assertFalse(queue.hasQualifiedTasks()); | |
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
queue.cleanEmptyNodes(0); | |
getChildrenChecked(zk, subqueueTodoDirs.get(0), 1); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
gotTask = queue.get(null, null); | |
assertNotNull(gotTask); | |
assertEquals(subqueueNames.get(0), gotTask.getProperties().get("jobName")); | |
getChildrenChecked(zk, subqueueTodoDirs.get(0), 0); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
queue.cleanEmptyNodes(0); | |
getChildrenChecked(zk, todoPartRoot, 0); | |
assertFalse(queue.hasQualifiedTasks()); | |
} | |
/** | |
* check cleanup of unused partition queues. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testCleanQualifiedQueues() throws Exception { | |
final String queueName = "testCleanQualifiedQueues"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
assertFalse(queue.hasQualifiedTasks()); | |
final ZooKeeperService zkService = getService(ZooKeeperService.class); | |
assertNotNull(zkService); | |
final ZkConnection zk = new ZkConnection(zkService); | |
final Task task1 = createTask(1, queueName); | |
task1.setQualifier("parts/partition1"); | |
final Task task2 = createTask(2, queueName); | |
task2.setQualifier("parts/partition2"); | |
queue.put(task1); | |
queue.put(task2); | |
assertTrue(queue.hasQualifiedTasks()); | |
final String todoPartRoot = ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODOQUALIFIEDDIR_SUFFIX; | |
final String todoPartDir = todoPartRoot + '/' + ZkTaskQueue.DEFAULT_SUB_QUEUE; | |
getChildrenChecked(zk, todoPartDir, 2); | |
queue.cleanEmptyNodes(0); // should not cleanup anything, because there are still tasks. | |
getChildrenChecked(zk, todoPartDir, 2); | |
assertNotNull(queue.get(Collections.singleton("parts/partition1"), null)); | |
assertNotNull(queue.get(Collections.singleton("parts/partition2"), null)); | |
Thread.sleep(1000); | |
final Task task3 = createTask(3, queueName); | |
task3.setQualifier("parts/partition1"); | |
queue.put(task3); | |
assertNotNull(queue.get(Collections.singleton("parts/partition1"), null)); | |
final List<String> partitionDirs = getChildrenChecked(zk, todoPartDir, 2); | |
queue.cleanEmptyNodes(500); // should remove partitions | |
assertEquals("parts_partition1", partitionDirs.get(0)); | |
assertTrue(queue.hasQualifiedTasks()); | |
Thread.sleep(50); | |
queue.cleanEmptyNodes(1); // should remove partition1, too | |
getChildrenChecked(zk, todoPartRoot, 0); | |
assertFalse(queue.hasQualifiedTasks()); | |
} | |
/** | |
* checks cleanup of multiple qualified subqueues. | |
* | |
* @throws Exception | |
* on error | |
*/ | |
public void testCleanMultipleQualifiedSubqueues() throws Exception { | |
final String queueName = "testCleanMultipleQualifiedSubqueues"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
assertFalse(queue.hasQualifiedTasks()); | |
final ZooKeeperService zkService = getService(ZooKeeperService.class); | |
assertNotNull(zkService); | |
final ZkConnection zk = new ZkConnection(zkService); | |
final List<String> part1 = Arrays.asList("parts/partition1"); | |
final List<String> part2 = Arrays.asList("parts/partition2"); | |
final String expected_part1 = "parts_partition1"; | |
final String expected_part2 = "parts_partition2"; | |
final Task task1 = createTask(1, queueName); | |
task1.getProperties().put("jobName", "subqueue1"); | |
task1.setQualifier(part1.get(0)); | |
final Task task2 = createTask(2, queueName); | |
task2.getProperties().put("jobName", "subqueue2"); | |
task2.setQualifier(part2.get(0)); | |
queue.put(task1); | |
queue.put(task2); | |
final String todoPartRoot = ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODOQUALIFIEDDIR_SUFFIX; | |
final String subqueue1TodoDir = todoPartRoot + "/subqueue1"; | |
final String subqueue2TodoDir = todoPartRoot + "/subqueue2"; | |
assertTrue(queue.hasQualifiedTasks()); | |
List<String> partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1); | |
assertEquals(expected_part1, partDirs.get(0)); | |
partDirs = getChildrenChecked(zk, subqueue2TodoDir, 1); | |
assertEquals(expected_part2, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 2); | |
queue.cleanEmptyNodes(0); // should not cleanup anything | |
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1); | |
assertEquals(expected_part1, partDirs.get(0)); | |
partDirs = getChildrenChecked(zk, subqueue2TodoDir, 1); | |
assertEquals(expected_part2, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 2); | |
Task gotTask = queue.get(part1, null); | |
assertNotNull(gotTask); | |
assertEquals("subqueue1", gotTask.getProperties().get("jobName")); | |
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1); | |
assertEquals(expected_part1, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 2); | |
queue.cleanEmptyNodes(0); | |
partDirs = getChildrenChecked(zk, subqueue2TodoDir, 1); | |
assertEquals(expected_part2, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
gotTask = queue.get(part2, null); | |
assertNotNull(gotTask); | |
assertEquals("subqueue2", gotTask.getProperties().get("jobName")); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
queue.cleanEmptyNodes(0); | |
getChildrenChecked(zk, todoPartRoot, 0); | |
assertFalse(queue.hasQualifiedTasks()); | |
final Task task3 = createTask(3, queueName); | |
task3.getProperties().put("jobName", "subqueue1"); | |
task3.setQualifier(part1.get(0)); | |
queue.put(task3); | |
assertTrue(queue.hasQualifiedTasks()); | |
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1); | |
assertEquals(expected_part1, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
queue.cleanEmptyNodes(0); | |
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1); | |
assertEquals(expected_part1, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
gotTask = queue.get(part1, null); | |
assertNotNull(gotTask); | |
assertEquals("subqueue1", gotTask.getProperties().get("jobName")); | |
partDirs = getChildrenChecked(zk, subqueue1TodoDir, 1); | |
assertEquals(expected_part1, partDirs.get(0)); | |
getChildrenChecked(zk, todoPartRoot, 1); | |
queue.cleanEmptyNodes(0); | |
getChildrenChecked(zk, todoPartRoot, 0); | |
assertFalse(queue.hasQualifiedTasks()); | |
} | |
/** | |
* check removal of state tasks. | |
* | |
* @throws Exception | |
* test fails. | |
*/ | |
public void testStaleTasks() throws Exception { | |
final String queueName = "testStaleTasks"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final ZooKeeperService zkService = getService(ZooKeeperService.class); | |
assertNotNull(zkService); | |
final ZkConnection zk = new ZkConnection(zkService); | |
final Task putTask = createTask(1, queueName); | |
queue.put(putTask); | |
final String todoDir = | |
ZkTaskQueue.TASKDIR_PREFIX + '/' + queueName + ZkTaskQueue.TODODIR_SUFFIX + '/' | |
+ ZkTaskQueue.DEFAULT_SUB_QUEUE; | |
final String staleTask = todoDir + "/staletask"; | |
zk.createNode(staleTask, ZkConnection.NO_DATA); | |
TaskCounter counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 2, 0); | |
queue.cleanEmptyNodes(1000); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 2, 0); | |
Thread.sleep(1000); | |
queue.cleanEmptyNodes(1); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 1, 0); | |
final Task getTask = queue.get(null, null); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 1); | |
assertEqualTasks(putTask, getTask); | |
queue.getInProgressTask(getTask.getTaskId()); | |
queue.delete(getTask.getTaskId()); | |
counter = queue.getTaskCounter(); | |
assertCounter(counter, queueName, 0, 0); | |
} | |
/** test task counters. */ | |
public void testTaskInfos() throws Exception { | |
final String queueName = "testTaskInfos"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
final int noOfTasks = 4; | |
final Task[] tasks = new Task[4]; | |
for (int i = 0; i < noOfTasks; i++) { | |
tasks[i] = createTask(i, queueName); | |
if (i >= noOfTasks / 2) { | |
tasks[i].setQualifier("parts/partition"); | |
} | |
queue.put(tasks[i]); | |
} | |
final TaskCounter todoCounter = queue.getTaskCounter(); | |
assertCounter(todoCounter, queueName, noOfTasks, 0); | |
final TaskList startIpList = queue.getTaskList("inprogress", 1); | |
assertEquals(0, startIpList.getSize()); | |
assertTrue(startIpList.getTaskNames().isEmpty()); | |
final TaskList todoTaskList = queue.getTaskList("todo", 10); | |
assertEquals(noOfTasks, todoTaskList.getSize()); | |
final List<String> todoTaskNames = todoTaskList.getTaskNames(); | |
for (int i = 0; i < noOfTasks; i++) { | |
final String taskPath = todoTaskNames.get(i); | |
assertTrue(taskPath.startsWith("/smila/tasks/testTaskInfos/")); | |
final String taskName = taskPath.substring("/smila/tasks/testTaskInfos/".length()); | |
if (i < noOfTasks / 2) { | |
assertTrue(taskName.startsWith("todo/")); | |
} else { | |
assertTrue(taskName.startsWith("todo_qualified/parts_partition")); | |
} | |
final int index = taskName.indexOf('/'); | |
final AnyMap taskInfo = queue.getTaskInfo(taskName.substring(0, index), taskName.substring(index + 1)); | |
assertNotNull(taskInfo); | |
assertTrue(taskInfo.containsKey("created")); | |
assertTrue(taskInfo.containsKey("modified")); | |
assertTrue(taskInfo.containsKey("task")); | |
assertTrue(tasks[i].toAny().equals(taskInfo.get("task"))); | |
if (i < noOfTasks / 2) { | |
queue.get(null, null); | |
} else { | |
queue.get(Collections.singleton("parts/partition"), null); | |
} | |
} | |
final TaskCounter ipCounter = queue.getTaskCounter(); | |
assertCounter(ipCounter, queueName, 0, noOfTasks); | |
final TaskList endTodoList = queue.getTaskList("todo", 1); | |
assertEquals(0, endTodoList.getSize()); | |
assertTrue(endTodoList.getTaskNames().isEmpty()); | |
final TaskList ipTaskList = queue.getTaskList("inprogress", 10); | |
assertEquals(noOfTasks, ipTaskList.getSize()); | |
final List<String> ipTaskNames = ipTaskList.getTaskNames(); | |
for (int i = 0; i < noOfTasks; i++) { | |
final String taskPath = ipTaskNames.get(i); | |
assertTrue(taskPath.startsWith("/smila/tasks/testTaskInfos/")); | |
final String taskName = taskPath.substring("/smila/tasks/testTaskInfos/".length()); | |
assertTrue(taskName.startsWith("inprogress/")); | |
final int index = taskName.indexOf('/'); | |
final AnyMap taskInfo = queue.getTaskInfo(taskName.substring(0, index), taskName.substring(index + 1)); | |
assertNotNull(taskInfo); | |
assertTrue(taskInfo.containsKey("created")); | |
assertTrue(taskInfo.containsKey("modified")); | |
assertTrue(taskInfo.containsKey("task")); | |
final Task infoTask = Task.fromAny(taskInfo.get("task")); | |
infoTask.getProperties().remove(ZkTaskQueue.OWNER_PROP); | |
// remove start and end time | |
infoTask.getProperties().remove(Task.PROPERTY_END_TIME); | |
infoTask.getProperties().remove(Task.PROPERTY_START_TIME); | |
infoTask.getProperties().remove(Task.PROPERY_TASK_AGE); | |
assertTrue(tasks[i].toAny().equals(infoTask.toAny())); | |
queue.getInProgressTask(infoTask.getTaskId()); | |
queue.delete(infoTask.getTaskId()); | |
} | |
final TaskCounter endCounter = queue.getTaskCounter(); | |
assertCounter(endCounter, queueName, 0, 0); | |
final TaskList endIpList = queue.getTaskList("inprogress", 1); | |
assertEquals(0, endIpList.getSize()); | |
assertTrue(endIpList.getTaskNames().isEmpty()); | |
} | |
/** | |
* test that 'host' property is set. | |
*/ | |
public void testHostProperty() throws Exception { | |
final String queueName = "testHost"; | |
final ZkTaskQueue queue = createQueue(queueName); | |
// without host | |
Task putTask = createTask(1, queueName); | |
queue.put(putTask); | |
Task getTask = queue.get(null, null); | |
assertNotNull(getTask); | |
assertNull(getTask.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
Task ipTask = queue.getInProgressTask(getTask.getTaskId()); | |
assertNotNull(ipTask); | |
assertNull(ipTask.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
assertEqualTasks(getTask, ipTask); | |
// with host | |
putTask = createTask(2, queueName); | |
queue.put(putTask); | |
getTask = queue.get(null, "host"); | |
assertNotNull(getTask); | |
assertEquals("host", getTask.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
ipTask = queue.getInProgressTask(getTask.getTaskId()); | |
assertNotNull(ipTask); | |
assertNull(ipTask.getProperties().get(Task.PROPERTY_WORKER_HOST)); | |
getTask.getProperties().remove(Task.PROPERTY_WORKER_HOST); | |
assertEqualTasks(getTask, ipTask); | |
} | |
/** test if task delivery is limited. */ | |
public void testScaleUpControl() throws Exception { | |
final String queueName = "testScaleUpControl"; | |
final String hostName = queueName; | |
final ZkTaskQueue queue = createQueue(queueName, 2); | |
queue.put(createTask(1, queueName)); | |
queue.put(createTask(2, queueName)); | |
queue.put(createTask(3, queueName)); | |
final Task task1 = queue.get(null, hostName); | |
assertNotNull(task1); | |
final Task task2 = queue.get(null, hostName); | |
assertNotNull(task2); | |
Task task3 = queue.get(null, hostName); | |
assertNull(task3); | |
queue.getInProgressTask(task1.getTaskId()); | |
queue.delete(task1.getTaskId()); | |
task3 = queue.get(null, hostName); | |
assertNotNull(task3); | |
queue.getInProgressTask(task2.getTaskId()); | |
queue.delete(task2.getTaskId()); | |
queue.getInProgressTask(task3.getTaskId()); | |
queue.delete(task3.getTaskId()); | |
} | |
/** test if scaleup control does not affect runAlways workers. */ | |
public void testNoLimitForRunAlways() throws Exception { | |
final String queueName = "testLimitForRunAlways"; | |
final String hostName = null; | |
final ZkTaskQueue queue = createQueue(queueName, 2); | |
queue.put(createTask(1, queueName)); | |
queue.put(createTask(2, queueName)); | |
queue.put(createTask(3, queueName)); | |
final Task task1 = queue.get(null, hostName); | |
assertNotNull(task1); | |
final Task task2 = queue.get(null, hostName); | |
assertNotNull(task2); | |
final Task task3 = queue.get(null, hostName); | |
assertNotNull(task3); | |
queue.getInProgressTask(task1.getTaskId()); | |
queue.delete(task1.getTaskId()); | |
queue.getInProgressTask(task2.getTaskId()); | |
queue.delete(task2.getTaskId()); | |
queue.getInProgressTask(task3.getTaskId()); | |
queue.delete(task3.getTaskId()); | |
} | |
/** simple test for getting tasks with a delivery delay. */ | |
public void testDeliveryDelay() throws Exception { | |
final ZkTaskQueue queue = createQueue("workerDeliveryDelay"); | |
final int noOfTasks = 10; | |
final int taskDelayMillis = 1000; | |
final int delta = 50; // test should not fail due to time measure imprecision. | |
// create and put some delayed tasks in the queues | |
for (int i = 1; i <= noOfTasks; i++) { | |
final Task t1 = createTask(i, queue.getWorkerName()); | |
t1.getProperties().put(Task.PROPERTY_DELIVERY_DELAY, String.valueOf(taskDelayMillis)); | |
queue.put(t1); | |
} | |
// first task should be delivered immediately | |
long start = System.currentTimeMillis(); | |
Task getTask = queue.get(null, null); | |
long deliveryDuration = System.currentTimeMillis() - start; | |
assertNotNull(getTask); | |
assertTrue("first was: " + deliveryDuration, deliveryDuration < 100); | |
// check delivery delay when getting further tasks | |
start = System.currentTimeMillis(); | |
int deliveredTasks = 1; | |
while (deliveredTasks < noOfTasks) { | |
getTask = queue.get(null, null); | |
if (getTask != null) { | |
deliveryDuration = System.currentTimeMillis() - start; | |
assertTrue(queue.getWorkerName() + " delivery too fast, was: " + deliveryDuration, | |
deliveryDuration >= taskDelayMillis - delta); | |
// assert delay not too long | |
assertTrue(queue.getWorkerName() + " delivery too slow, was: " + deliveryDuration, | |
deliveryDuration < taskDelayMillis + delta); | |
start = System.currentTimeMillis(); | |
deliveredTasks++; | |
} | |
} | |
assertEquals(noOfTasks, deliveredTasks); | |
} | |
/** tests getting of tasks with a delivery delay for multiple task queues in parallel . */ | |
public void testDeliveryDelayMultipleQueues() throws Exception { | |
final ZkTaskQueue queue1 = createQueue("worker1"); | |
final ZkTaskQueue queue2 = createQueue("worker2"); | |
final ZkTaskQueue queueNoDelay = createQueue("worker3"); | |
final int noOfTasks = 10; | |
final int taskDelayMillis = 1000; | |
// create and put some delayed tasks in the queues | |
for (int i = 1; i <= noOfTasks; i++) { | |
final Task t1 = createTask(i, queue1.getWorkerName()); | |
t1.getProperties().put(Task.PROPERTY_DELIVERY_DELAY, String.valueOf(taskDelayMillis)); | |
queue1.put(t1); | |
final Task t2 = createTask(i, queue2.getWorkerName()); | |
t2.getProperties().put(Task.PROPERTY_DELIVERY_DELAY, String.valueOf(taskDelayMillis)); | |
queue2.put(t2); | |
final Task t3 = createTask(i, queueNoDelay.getWorkerName()); | |
queueNoDelay.put(t3); | |
} | |
/** We want parallel threads so this is a Callable which runs our test for a given queue. */ | |
final class TestCallable implements Callable<String> { | |
public static final String SUCCESS = "Success"; | |
/** test should not fail due to time measure imprecision. */ | |
private static final int DELTA = 50; | |
/** ensure that other threads got processing time. */ | |
private static final int SLEEP_WHEN_GETTING_TASK = 10; | |
private final ZkTaskQueue _queue; | |
private final int _expectedDelay; | |
private String _result = SUCCESS; | |
/** params: the queue and the expected delay for task delivery. */ | |
public TestCallable(final ZkTaskQueue queue, final int expectedDelay) { | |
_queue = queue; | |
_expectedDelay = expectedDelay; | |
} | |
@Override | |
public String call() { | |
try { | |
// first task should be delivered immediately | |
long start = System.currentTimeMillis(); | |
Task getTask = _queue.get(null, null); | |
long deliveryDuration = System.currentTimeMillis() - start; | |
assertNotNull(getTask); | |
assertTrue("first was: " + deliveryDuration, deliveryDuration < 100); | |
// check delivery delay when getting further tasks | |
start = System.currentTimeMillis(); | |
int deliveredTasks = 1; | |
while (deliveredTasks < noOfTasks) { | |
getTask = _queue.get(null, null); | |
if (getTask != null) { | |
deliveryDuration = System.currentTimeMillis() - start; | |
assertTrue(_queue.getWorkerName() + " delivery too fast, was: " + deliveryDuration, | |
deliveryDuration >= _expectedDelay - DELTA); | |
// assert delay not too long | |
assertTrue(_queue.getWorkerName() + " delivery too slow, was: " + deliveryDuration, | |
deliveryDuration < _expectedDelay + DELTA); | |
start = System.currentTimeMillis(); | |
deliveredTasks++; | |
Thread.sleep(SLEEP_WHEN_GETTING_TASK); | |
} | |
} | |
assertEquals(noOfTasks, deliveredTasks); | |
} catch (final Throwable t) { | |
_result = t.getMessage(); | |
t.printStackTrace(); | |
} | |
return _result; | |
} | |
} | |
ExecutorService executor = null; | |
try { | |
executor = Executors.newFixedThreadPool(3); | |
final TestCallable testQueue1 = new TestCallable(queue1, taskDelayMillis); | |
final TestCallable testQueue2 = new TestCallable(queue2, taskDelayMillis); | |
final TestCallable testQueue3 = new TestCallable(queueNoDelay, 0); | |
final Future<String> result1 = executor.submit(testQueue1); | |
final Future<String> result2 = executor.submit(testQueue2); | |
final Future<String> result3 = executor.submit(testQueue3); | |
assertEquals(TestCallable.SUCCESS, result1.get(15, TimeUnit.SECONDS)); | |
assertEquals(TestCallable.SUCCESS, result2.get(15, TimeUnit.SECONDS)); | |
assertEquals(TestCallable.SUCCESS, result3.get(15, TimeUnit.SECONDS)); | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** testing filterDuplicates() with more threads, ensure no unique task is lost. */ | |
public void testFilterDuplicatesMassiveParallelCheck() throws Exception { | |
final ZkTaskQueue queue = createQueue("workerDuplicatesMassive"); | |
final int noOfTasks = 10; | |
final int noOfParallelChecks = 40; | |
final AtomicInteger noOfExceptions = new AtomicInteger(0); | |
final ExecutorService exec = Executors.newFixedThreadPool(noOfParallelChecks); | |
final CountDownLatch latch = new CountDownLatch(1); | |
for (int checkNo = 0; checkNo < noOfParallelChecks; checkNo++) { | |
final int offset = checkNo; | |
exec.submit(new Runnable() { | |
@Override | |
public void run() { | |
final Random rand = new Random(System.nanoTime()); | |
try { | |
latch.await(); | |
// create and put some delayed tasks in the queues | |
final List<Task> taskList = new ArrayList<>(); | |
for (int i = 0; i < noOfTasks; i++) { | |
Thread.sleep(rand.nextInt(50)); | |
final Task t1 = createTask(offset * noOfTasks + i, queue.getWorkerName()); | |
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i); | |
taskList.add(t1); | |
for (final Task uniqueTask : queue.filterDuplicates(taskList)) { | |
queue.put(uniqueTask); | |
} | |
} | |
} catch (final InterruptedException | TaskmanagerException e) { | |
e.printStackTrace(); | |
noOfExceptions.getAndIncrement(); | |
} | |
} | |
}); | |
} | |
latch.countDown(); | |
exec.shutdown(); | |
exec.awaitTermination(1, TimeUnit.MINUTES); | |
// no exceptions | |
assertEquals(0, noOfExceptions.get()); | |
// now check that each unique id is present | |
final Set<String> taskUniqueIds = new HashSet<>(); | |
Task task = queue.get(null, null); | |
while (task != null) { | |
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)); | |
task = queue.get(null, null); | |
} | |
assertEquals(noOfTasks, taskUniqueIds.size()); | |
} | |
/** testing filterDuplicates(). */ | |
public void testFilterDuplicates() throws Exception { | |
final ZkTaskQueue queue = createQueue("workerDuplicates"); | |
final int noOfTasks = 10; | |
final int noOfIterations = 3; | |
for (int iteration = 0; iteration < noOfIterations; iteration++) { | |
// create and put some delayed tasks in the queues | |
for (int i = 1; i <= noOfTasks; i++) { | |
final Task t1 = createTask(iteration * noOfTasks + i, queue.getWorkerName()); | |
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i); | |
for (final Task uniqueTask : queue.filterDuplicates(Arrays.asList(t1))) { | |
queue.put(uniqueTask); | |
} | |
} | |
} | |
// check that only one set of tasks has been created... | |
assertEquals(noOfTasks, queue.getTaskList(ZkTaskQueue.SECTION_TODO, noOfIterations * noOfTasks).getSize()); | |
// now check that each unique id is present | |
final Set<String> taskUniqueIds = new HashSet<>(); | |
Task task = queue.get(null, null); | |
while (task != null) { | |
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)); | |
task = queue.get(null, null); | |
} | |
assertEquals(noOfTasks, taskUniqueIds.size()); | |
queue.purge(); | |
} | |
/** testing filterDuplicates() that only duplicates will be filtered for "todo" tasks. */ | |
public void testFilterDuplicatesOnlyToDo() throws Exception { | |
final ZkTaskQueue queue = createQueue("workerDuplicates"); | |
final int noOfTasks = 10; | |
final int noOfIterations = 3; | |
for (int iteration = 0; iteration < noOfIterations; iteration++) { | |
// create and put some delayed tasks in the queues | |
for (int i = 1; i <= noOfTasks; i++) { | |
final Task t1 = createTask(iteration * noOfTasks + i, queue.getWorkerName()); | |
t1.getProperties().put(Task.PROPERTY_UNIQUENESS_TAG, "tag" + i); | |
for (final Task uniqueTask : queue.filterDuplicates(Arrays.asList(t1))) { | |
queue.put(uniqueTask); | |
} | |
} | |
// put each task in progress | |
// now check that each unique id is present | |
final Set<String> taskUniqueIds = new HashSet<>(); | |
Task task = queue.get(null, null); | |
while (task != null) { | |
taskUniqueIds.add(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)); | |
queue.putInProgress(task); | |
task = queue.get(null, null); | |
} | |
assertEquals(noOfTasks, taskUniqueIds.size()); | |
} | |
// check that only one set of tasks has been created for each run... | |
assertEquals(noOfIterations * noOfTasks, | |
queue.getTaskList(ZkTaskQueue.SECTION_INPROGRESS, noOfIterations * noOfTasks).getSize()); | |
// no tasks in todo | |
assertEquals(0, queue.getTaskList(ZkTaskQueue.SECTION_TODO, noOfIterations * noOfTasks).getSize()); | |
queue.purge(); | |
} | |
/** | |
* get children of path and check for count. | |
* | |
* @param zk | |
* ZooKeeper connection | |
* @param path | |
* path | |
* @param expectedCount | |
* expected number of children | |
* @return children | |
* @throws Exception | |
* fail | |
*/ | |
private List<String> getChildrenChecked(final ZkConnection zk, final String path, final int expectedCount) | |
throws Exception { | |
final List<String> partitionDirs = zk.getChildrenSorted(path); | |
assertEquals(expectedCount, partitionDirs.size()); | |
return partitionDirs; | |
} | |
/** | |
* create queue. | |
* | |
* @param queueName | |
* name | |
* @return queue | |
*/ | |
private ZkTaskQueue createQueue(final String queueName) { | |
return createQueue(queueName, TaskStorageZk.TASKSPERHOST_UNLIMITED); | |
} | |
/** | |
* create queue with scale-up limit. | |
*/ | |
private ZkTaskQueue createQueue(final String queueName, final long maxScaleUp) { | |
return new ZkTaskQueue(_zk, queueName, "localhost", maxScaleUp); | |
} | |
/** | |
* @return created task | |
*/ | |
protected static Task createTask(final int taskNo, final String workerName) { | |
final String taskId = Integer.toString(taskNo); | |
final Task task = new Task(taskId, workerName); | |
task.getProperties().put("worker", workerName); | |
task.getProperties().put("taskId", taskId); | |
task.getParameters().put("worker", workerName); | |
task.getParameters().put("taskId", taskId); | |
final List<BulkInfo> input = new ArrayList<BulkInfo>(); | |
input.add(new BulkInfo("inputbucket", "inputstore", "input" + taskId)); | |
task.getInputBulks().put("input", input); | |
final List<BulkInfo> output = new ArrayList<BulkInfo>(); | |
output.add(new BulkInfo("outputbucket", "outputstore", "output" + taskId)); | |
task.getInputBulks().put("output", output); | |
return task; | |
} | |
/** | |
* assert that all task members and properties are equal. | |
* | |
* @param putTask | |
* expected task | |
* @param getTask | |
* actual task | |
*/ | |
protected static void assertEqualTasks(final Task putTask, final Task getTask) { | |
assertEquals(putTask.getTaskId(), getTask.getTaskId()); | |
assertEquals(putTask.getWorkerName(), getTask.getWorkerName()); | |
assertEquals(putTask.getQualifier(), getTask.getQualifier()); | |
// remove start and end time | |
getTask.getProperties().remove(Task.PROPERTY_START_TIME); | |
getTask.getProperties().remove(Task.PROPERTY_END_TIME); | |
getTask.getProperties().remove(Task.PROPERY_TASK_AGE); | |
putTask.getProperties().remove(Task.PROPERTY_START_TIME); | |
putTask.getProperties().remove(Task.PROPERTY_END_TIME); | |
putTask.getProperties().remove(Task.PROPERY_TASK_AGE); | |
assertEquals(putTask.getProperties(), getTask.getProperties()); | |
assertEquals(putTask.getParameters(), getTask.getParameters()); | |
assertEquals(putTask.getInputBulks(), getTask.getInputBulks()); | |
assertEquals(putTask.getOutputBulks(), getTask.getOutputBulks()); | |
} | |
/** | |
* assert task counter values. | |
* | |
* @param counter | |
* actual counter | |
* @param name | |
* expected name | |
* @param todo | |
* expected todo count | |
* @param inProgress | |
* expected inprogress count. | |
*/ | |
protected static void assertCounter(final TaskCounter counter, final String name, final int todo, | |
final int inProgress) { | |
assertNotNull(counter); | |
assertEquals("wrong name", name, counter.getWorkerName()); | |
assertEquals("wrong todo counter", todo, counter.getTasksTodo()); | |
assertEquals("wrong inprogress counter", inProgress, counter.getTasksInProgress()); | |
} | |
} |