blob: 170e198b2e7089b3078de08361aa409d9e902d82 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.datamodel.ipc.IpcAnyReader;
import org.eclipse.smila.datamodel.ipc.IpcAnyWriter;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskList;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.zookeeper.ZkConcurrentMap;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZooKeeperService;
/**
* Queue implementation wrapping a zookeeper client. Tasks are stored in different (virtual) sub queues identified by
* task property. For those sub queues, tasks are delivered round robin (-> fairness).
*
* Does not do ZooKeeper session handling, it must be done from the calling context.
*
* Zookeeper structure: (task-id nodes contain Task as data)
*
* <tt>
* /smila/tasks/<worker-name>/todo/&lt;sub-queue>/&lt;counter>-&lt;task-id>-&lt;host name>
* ........................../todo_qualified/&lt;sub-queue>/&lt;qualifier-string>/&lt;counter>-&lt;task-id>-&lt;host name>
* ........................../inprogress/<task-id>
* ........................../delivery/&lt;sub-queue>/&lt;worker-name>/&lt;delivery-time>
* </tt>
*/
public class ZkTaskQueue {
/** prefix for task directories. */
public static final String ROOT_DIR_PREFIX = "/smila";
/** prefix for task directories. */
public static final String TASKDIR_PREFIX = ROOT_DIR_PREFIX + "/tasks";
/** prefix for tasks-todo directories. */
public static final String TODODIR_SUFFIX = "/todo";
/** prefix for qualified-tasks-todo directories. */
public static final String TODOQUALIFIEDDIR_SUFFIX = "/todo_qualified";
/** prefix for task-in-progress directories. */
public static final String INPROGRESSDIR_SUFFIX = "/inprogress";
/** zk path for delayed task delivery data. */
public static final String DELIVERY_PREFIX = "/delivery";
/** Property identifying owner of this task. */
public static final String OWNER_PROP = "inProgressBy";
/** Property for tasks-todo. */
public static final String SECTION_TODO = "todo";
/** Property for qualified-tasks-todo. */
public static final String SECTION_TODOQUALIFIED = "todo_qualified";
/** Property for task-in-progress. */
public static final String SECTION_INPROGRESS = "inprogress";
/** use this as a default for identifying the right (sub) queue if no property is given in the task. */
public static final String DEFAULT_SUB_QUEUE = "_default";
/** task property for identifiying the right sub queue within our queue. */
private static final String SUB_QUEUE_TASK_PROPERTY = Task.PROPERTY_JOB_NAME;
/** task pipe we are working on. */
private final String _workerName;
/** local host name. */
private final String _localhost;
/** maximum number of tasks that should be delivered for a host. */
private long _maxNoOfTasksPerHost;
/** Number formatter. */
private final NumberFormat _numberFormat;
/** Connection to ZooKeeper server. */
private final ZkConnection _zk;
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** Any<-JSON converter. */
private final IpcAnyReader _anyReader = new IpcAnyReader();
/** Any->BON converter. */
private final IpcAnyWriter _anyWriter = new IpcAnyWriter();
/** sub queues for which we delivered tasks in least recently used order. */
private final Deque<String> _lruSubQueues = new LinkedList<String>();
/**
* @param service
* zookeeper service needed to get zookeeper client
* @param workerName
* name of worker for which we are managing tasks
* @param localhost
* local host name
*/
public ZkTaskQueue(final ZooKeeperService service, final String workerName, final String localhost) {
this(service, workerName, localhost, TaskStorageZk.TASKSPERHOST_UNLIMITED);
}
/**
* @param service
* zookeeper service needed to get zookeeper client
* @param workerName
* name of worker for which we are managing tasks
* @param localhost
* local host name
* @param maxNoOfTasksPerHost
*/
public ZkTaskQueue(final ZooKeeperService service, final String workerName, final String localhost,
final long maxNoOfTasksPerHost) {
super();
_zk = new ZkConnection(service);
_workerName = workerName;
_localhost = localhost;
_maxNoOfTasksPerHost = maxNoOfTasksPerHost;
_numberFormat = NumberFormat.getIntegerInstance();
final int counterStringLength = 19; // length of Long.MAX_VALUE
_numberFormat.setMaximumIntegerDigits(counterStringLength);
_numberFormat.setMinimumIntegerDigits(counterStringLength);
_numberFormat.setGroupingUsed(false);
}
/** for testing only! */
public void disconnectZkSession() throws TaskmanagerException {
try {
_zk.disconnectZkSession();
} catch (final Exception e) {
throw new TaskmanagerException("Error discconnecting zookeeper session.", e);
}
}
/** ensure that zk root path for storing the time of the last task delivery for delayed tasks is created. */
private ZkConcurrentMap ensureDelayedDeliveryMap() throws KeeperException {
_zk.ensurePathExists(DELIVERY_PREFIX);
return new ZkConcurrentMap(_zk, DELIVERY_PREFIX);
}
/** @return key for zk map under which the delivery time for this task is stored. */
private String getDelayedDeliveryKey(final Task task) {
// "###" is ok as separator, because this char is not allowed in job and worker names
return getSubQueue(task) + "###" + task.getWorkerName();
}
/** @return scale-up limit setting. */
public long getMaxNoOfTasksPerHost() {
return _maxNoOfTasksPerHost;
}
/** set new scale-up limit. */
public void setMaxNoOfTasksPerHost(final long maxNoOfTasksPerHost) {
_maxNoOfTasksPerHost = maxNoOfTasksPerHost;
}
/** return worker name. */
public String getWorkerName() {
return _workerName;
}
/**
* Puts a task in our task pipe.
*
* @param task
* the task
* @throws TaskmanagerException
* error
*/
public void put(final Task task) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("put(), task = " + task);
}
try {
final String subQueue = getSubQueue(task);
final String todoTaskPath = getTodoTaskPath(task.getQualifier(), subQueue);
putTask(task, todoTaskPath);
} catch (final Exception ex) {
throw new TaskmanagerException(ex);
}
}
/**
* write task to in-progress section of the queue.
*
* @param task
* task
* @return value of {@link #OWNER_PROP} property.
* @throws TaskmanagerException
*/
public String putInProgress(final Task task) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("putInProgress(), task = " + task);
}
try {
final String myId = _localhost + "-" + Thread.currentThread().getName(); // unique identifier
task.getProperties().put(OWNER_PROP, myId);
task.setTaskStartedProperties();
final byte[] inProgressData = task2Message(task);
final String inProgressNodeName = getInProgressTaskPath(task.getTaskId());
try {
_zk.createPath(inProgressNodeName, inProgressData);
} catch (final KeeperException.NodeExistsException e) {
// ignore, cause this can happen if multiple threads try to get a task
_log.trace("Node exists: " + e);
}
return myId;
} catch (final Exception ex) {
throw new TaskmanagerException(ex);
} finally {
task.getProperties().remove(OWNER_PROP);
}
}
/**
* Return next task that can be processed. This method is not thread-safe, it must be called in a synchronized
* context!
*
* @param qualifiers
* optional task qualifiers
* @param host
* host name where the worker is running that requested the task, to be stored as property in the task
* @return next task that can be processed.
* @throws TaskmanagerException
* error
*/
public Task get(final Collection<String> qualifiers, final String host) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("get(), qualifiers = " + qualifiers);
}
final String qualifiedRootPath = getTodoQualifiedRootPath() + "/";
final boolean getUnqualifiedTasks = qualifiers == null || qualifiers.isEmpty();
final Collection<String> effectiveQualifiers =
getUnqualifiedTasks ? Collections.singleton((String) null) : qualifiers;
try {
final List<String> lruSubQueues = getSubQueuesForNextTask();
for (final String subQueue : lruSubQueues) {
if (getUnqualifiedTasks || isNotEmpty(qualifiedRootPath + subQueue)) {
for (final String qualifier : effectiveQualifiers) {
final String todoTaskPath = getTodoTaskPath(qualifier, subQueue);
final Task task = getTask(todoTaskPath, host);
if (task != null) {
markSubQueueAsUsed(subQueue);
return task;
}
}
}
}
return null;
} catch (final KeeperException | RuntimeException ex) {
throw new TaskmanagerException(ex);
}
}
/**
* @return list of sub queues in the correct order (LRU) for a fair processing.
*/
private List<String> getSubQueuesForNextTask() throws KeeperException {
final List<String> todoSubQueues = new LinkedList<>();
final String todoRoot = getTodoRootPath();
final String todoQualifiedRoot = getTodoQualifiedRootPath();
// we assume that todo sub queue nodes that have no tasks are removed after a while (see cleanEmptyNodes())
if (isNotEmpty(todoRoot)) {
todoSubQueues.addAll(_zk.getChildrenSorted(todoRoot));
}
if (isNotEmpty(todoQualifiedRoot)) {
todoSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedRoot));
}
final List<String> orderedSubQueues = new LinkedList<>();
if (!todoSubQueues.isEmpty()) {
// We could also copy the LRU list to orderedSubQueues first and do the retainAll there,
// but the current impl. helps for cleaning up our LRU list from old unused job entries.
// It can theoretically cause unfairness, but this should have no practical relevance.
synchronized (_lruSubQueues) {
_lruSubQueues.retainAll(todoSubQueues);
// all sub queues from LRU list
orderedSubQueues.addAll(_lruSubQueues);
}
// add sub queues for which we have tasks that are not in LRU-list yet
for (final String subQueue : todoSubQueues) {
if (!orderedSubQueues.contains(subQueue)) {
orderedSubQueues.add(0, subQueue);
}
}
}
return orderedSubQueues;
}
private void markSubQueueAsUsed(final String subQueue) {
synchronized (_lruSubQueues) {
_lruSubQueues.remove(subQueue); // move from beginning (if it existed)
_lruSubQueues.addLast(subQueue); // ... to the end of our LRU queue
}
}
/**
* Returns a map with node names and tasks matching to given path.
*
* @param path
* The path
* @return list with tasks
* @throws Exception
* an exception if something went wrong
*/
private Map<String, Task> getTasks(final String path) throws KeeperException, TaskmanagerException {
final Map<String, Task> tasks = new HashMap<String, Task>();
try {
final List<String> nodeNames = _zk.getChildrenSorted(path);
if (nodeNames != null && !nodeNames.isEmpty()) {
final int noOfNames = nodeNames.size();
for (int i = 0; i < noOfNames; i++) {
final String nodeName = nodeNames.get(i);
byte[] nodeData = null;
try {
nodeData = _zk.getData(path + "/" + nodeName);
} catch (final KeeperException.NoNodeException e) {
// ignore, cause this can happen if multiple threads try to get a task
_log.trace("Node to get data doesn't exist anymore: " + nodeName);
}
if (nodeData != null && nodeData.length > 0) {
tasks.put(nodeName, message2Task(nodeData));
}
}
}
} catch (final KeeperException.NoNodeException ex) {
if (_log.isDebugEnabled()) {
_log.debug("Todo directory " + path + " does not exist yet.");
}
}
return tasks;
}
/**
* @return task to process. Null if none exists or no more tasks are allowed for the worker's host.
*/
private Task getTask(final String todoTaskPath, final String host) throws KeeperException, TaskmanagerException {
Task task = null;
boolean hostCheckSuccessful = false;
try {
// select first from all potential tasks, sorted to maintain queue FIFO character
if (isNotEmpty(todoTaskPath)) {
final List<String> nodeNames = _zk.getChildrenSorted(todoTaskPath);
if (nodeNames != null && !nodeNames.isEmpty()) {
hostCheckSuccessful = checkAndIncHostCounter(host);
if (hostCheckSuccessful) { // scale up control allows us to deliver another task for this host
final int noOfNames = nodeNames.size();
for (int i = 0; task == null && i < noOfNames; i++) {
task = tryTask(todoTaskPath, nodeNames.get(i), host);
}
}
}
}
} catch (final KeeperException.NoNodeException ex) {
if (_log.isDebugEnabled()) {
_log.debug("Todo directory " + todoTaskPath + " does not exist yet.");
}
} finally {
if (hostCheckSuccessful && task == null) {
try {
decHostCounter(host); // inc'd the host counter but could not get a task -> reset host counter
} catch (final Exception e) {
_log.warn("Error while decrementing host task counter for host '" + host + "'", e);
}
}
}
return task;
}
/**
* Check if we can deliver another task to this host (scale up control).
*
* @param host
* host name is the key of the counter
* @return 'true' if the host counter could be successfully incremented, otherwise 'false'.
*/
private boolean checkAndIncHostCounter(final String host) throws KeeperException {
if (host == null) {
return true; // no host given -> no limit
}
_zk.ensurePathExists(TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH);
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH);
Integer newValue = null;
if (_maxNoOfTasksPerHost == TaskStorageZk.TASKSPERHOST_UNLIMITED) { // no limit -> just count
newValue = hostsMap.add(host, 1);
} else {
newValue = hostsMap.incIfLessThan(host, _maxNoOfTasksPerHost);
}
if (newValue == null) {
_log.warn("Could not update host task counter for host '" + host + "', probably due to temporary overload.");
return false;
} else if (newValue == -1) {
return false; // inc host counter failed because max is reached
}
return true;
}
/**
* try to read task from "todoTaskPath/nodeName" for worker host "host".
*
* @return task if we managed to read it without interference. Else null.
*/
private Task tryTask(final String todoTaskPath, final String nodeName, final String host) throws KeeperException,
TaskmanagerException {
final String todoNodeName = todoTaskPath + "/" + nodeName;
byte[] nodeData = null;
try {
nodeData = _zk.getData(todoNodeName);
} catch (final KeeperException.NoNodeException e) {
// ignore, cause this can happen if multiple threads try to get a task
_log.trace("Node to get data doesn't exist anymore: " + todoNodeName);
}
if (nodeData != null && nodeData.length > 0) {
final Task task = message2Task(nodeData);
if (host != null) { // must be set before task is put in progress queue
task.getProperties().put(Task.PROPERTY_WORKER_HOST, host);
}
// check if there is a delivery delay resp. if we have to wait
boolean canGoOn = checkDeliveryDelay(task, false);
if (!canGoOn) {
return null; // delivery delay is configured and not reached
}
final String inProgressNodeName = getInProgressTaskPath(task.getTaskId());
final String myId = putInProgress(task);
// check if I am really the one that got the task first
if (checkTaskProperty(inProgressNodeName, OWNER_PROP, myId)) {
try {
// check delivery delay again, someone else could have gotten a task meanwhile
canGoOn = checkDeliveryDelay(task, true);
if (!canGoOn) {
if (_log.isDebugEnabled()) {
_log.debug("Delay check not successful, removing task " + task.getTaskId()
+ " from in-progress queue");
}
delete(task.getTaskId()); // someone else was faster, reset in-progress task
return null;
}
_zk.deleteNode(todoNodeName);
task.getProperties().remove(OWNER_PROP);
return task;
} catch (final KeeperException.NoNodeException e) {
// this can happen if another thread already finished
if (_log.isTraceEnabled()) {
_log.trace("Node to delete doesn't exist anymore: " + todoNodeName);
}
// we have to clean up the node we created before
_zk.deleteNode(inProgressNodeName);
}
}
}
return null;
}
/**
* @return 'false' if we were not successful in getting a delayed task. If task is already in-progress then we also
* try to change the delivery time in zk. If that wasn't successful, we also return 'false'.
*/
private boolean checkDeliveryDelay(final Task task, final boolean changeDeliveryTime)
throws TaskmanagerException, KeeperException {
final String deliveryDelay = task.getProperties().get(Task.PROPERTY_DELIVERY_DELAY);
if (deliveryDelay != null) {
if (_log.isTraceEnabled()) {
_log.trace("delivery delay configured: " + deliveryDelay);
}
final ZkConcurrentMap deliveryMap = ensureDelayedDeliveryMap();
final String deliveryDelayKey = getDelayedDeliveryKey(task);
final String lastDelivery = deliveryMap.getString(deliveryDelayKey);
final long now = System.currentTimeMillis();
if (mustWaitForDeliveryDelay(task, deliveryMap, deliveryDelay, lastDelivery, now)) {
return false;
}
if (changeDeliveryTime) {
// change the last delivery time (make sure I'm the one to do it)
final String newValue = String.valueOf(now);
if (_log.isTraceEnabled()) {
_log.trace("trying to change last delivery time from " + lastDelivery + " to " + newValue);
}
if (lastDelivery != null) {
return deliveryMap.replace(deliveryDelayKey, lastDelivery, newValue);
} else {
return newValue.equals(deliveryMap.putIfAbsent(deliveryDelayKey, newValue));
}
}
}
return true;
}
/** @return 'true' if task has a delivery delay and it's not reached/exceeded. */
private boolean mustWaitForDeliveryDelay(final Task task, final ZkConcurrentMap deliveryMap,
final String deliveryDelay, final String lastDelivery, final long now) {
if (deliveryDelay != null) {
if (lastDelivery != null && now - Long.valueOf(lastDelivery) < Long.valueOf(deliveryDelay)) {
if (_log.isDebugEnabled()) {
_log.debug("configured delivery delay " + deliveryDelay + " not reached yet: "
+ (now - Long.valueOf(lastDelivery)));
}
return true;
}
}
return false;
}
/**
* read the task content of an in-progress task. The task is not deleted yet, but kept alive to prevent concurrent
* rollback.
*
* @param taskId
* task Id
* @return task contents or null if no such task exists or is already finishing
* @throws TaskmanagerException
* if task is not in-progress or other errors.
*/
public Task getInProgressTask(final String taskId) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("getInProgresTask(), taskId = " + taskId);
}
final String inProgressNodeName = getInProgressTaskPath(taskId);
try {
final byte[] message = _zk.getData(inProgressNodeName);
final Task task = message2Task(message);
task.getProperties().remove(OWNER_PROP);
final String host = task.getProperties().remove(Task.PROPERTY_WORKER_HOST);
decHostCounter(host);
return task;
} catch (final KeeperException.NodeExistsException ex) {
// somebody else just created the finishing mark.
return null;
} catch (final KeeperException.NoNodeException ex) {
// task node does not exist in inprogress dir.
return null;
} catch (final Exception ex) {
throw new TaskmanagerException(ex);
}
}
/**
* @param host
* the host name for which to decrement the task counter
*/
private void decHostCounter(final String host) throws KeeperException {
if (host != null) {
_zk.ensurePathExists(TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH);
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH);
final Integer newValue = hostsMap.decIfGreaterThan(host, 0);
if (newValue == null) {
_log.warn("Could not decrement host task counter for host '" + host
+ "', probably due to temporary overload.");
} else if (newValue < 0) {
_log.error("Negative host counter (" + newValue + ") for host '" + host + "'!");
}
}
}
/**
* Deletes the given task from the inprogress section.
*
* @param taskId
* identifies the task
* @throws TaskmanagerException
* error; BadParameterTaskmanagerException if taskId isn't found
*/
public void delete(final String taskId) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("delete(), taskId = " + taskId);
}
try {
final String inProgressNodeName = getInProgressTaskPath(taskId);
try {
_zk.deleteNode(inProgressNodeName);
} catch (final KeeperException.NoNodeException ex) {
throw new BadParameterTaskmanagerException("Could not find taskId '" + taskId + "' for worker '"
+ _workerName + "'.", BadParameterTaskmanagerException.Cause.taskId);
}
} catch (final KeeperException | RuntimeException ex) {
throw new TaskmanagerException(ex);
}
}
/**
* Process isAlive call for given task.
*
* @param taskId
* identifies the task
*
* @throws TaskmanagerException
* error; BadParameterTaskmanagerException if taskId isn't found
*/
public void keepAlive(final String taskId) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("keepAlive(), taskId = " + taskId);
}
try {
final String inProgressNodeName = getInProgressTaskPath(taskId);
final byte[] nodeData = _zk.getData(inProgressNodeName);
_zk.setData(inProgressNodeName, nodeData); // -> update node's last modified timestamp
} catch (final KeeperException.NoNodeException ex) {
throw new BadParameterTaskmanagerException("Could not find taskId '" + taskId + "' for worker '"
+ _workerName + "'.", BadParameterTaskmanagerException.Cause.taskId);
} catch (final Exception ex) {
throw new TaskmanagerException(ex);
}
}
/**
* @return true if this queue has a directory for qualified tasks and it contains at least one sub-queue node with
* qualifier(s).
*/
public boolean hasQualifiedTasks() throws KeeperException {
final String qualifiedRootPath = getTodoQualifiedRootPath();
if (isNotEmpty(qualifiedRootPath)) {
final List<String> todoQualifiedSubQueues = _zk.getChildrenSorted(qualifiedRootPath);
for (final String subQueue : todoQualifiedSubQueues) {
if (isNotEmpty(qualifiedRootPath + "/" + subQueue)) {
return true;
}
}
}
return false;
}
/** @return true if path exists and contains sub-nodes. */
private boolean isNotEmpty(final String path) throws KeeperException {
final Stat stat = _zk.exists(path);
return stat != null && stat.getNumChildren() > 0;
}
/**
* @return task counter
*
* @throws TaskmanagerException
* error
*/
public TaskCounter getTaskCounter() throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("getTaskCounter()");
}
try {
final String workerDir = getWorkerPath();
final String todoDir = getTodoRootPath();
final String todoQualifiedDir = getTodoQualifiedRootPath();
final String inProgressDir = workerDir + INPROGRESSDIR_SUFFIX;
final List<String> todoSubQueues = new ArrayList<>();
final List<String> todoQualifiedSubQueues = new ArrayList<>();
if (_zk.exists(todoDir) != null) {
todoSubQueues.addAll(_zk.getChildrenSorted(todoDir));
}
if (_zk.exists(todoQualifiedDir) != null) {
todoQualifiedSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedDir));
}
final Map<String, Integer> tasksTodoPerJobs = new HashMap<String, Integer>();
for (final String subQueue : todoSubQueues) {
final String todoSubQueueDir = getTodoTaskPath(null, subQueue);
final Stat todoStat = _zk.exists(todoSubQueueDir);
if (todoStat != null) {
tasksTodoPerJobs.put(subQueue, todoStat.getNumChildren());
}
}
for (final String subQueue : todoQualifiedSubQueues) {
final String todoQualifiedSubQueueDir = todoQualifiedDir + "/" + subQueue;
if (_zk.exists(todoQualifiedSubQueueDir) != null) {
final List<String> qualifiers = _zk.getChildrenSorted(todoQualifiedSubQueueDir);
for (final String qualifier : qualifiers) {
final String qualifierDir = getTodoTaskPath(qualifier, subQueue);
final Stat qualifierStat = _zk.exists(qualifierDir);
if (qualifierStat != null) {
if (tasksTodoPerJobs.containsKey(subQueue)) {
tasksTodoPerJobs.put(subQueue, tasksTodoPerJobs.get(subQueue) + qualifierStat.getNumChildren());
} else {
tasksTodoPerJobs.put(subQueue, qualifierStat.getNumChildren());
}
}
}
}
}
final Stat inProgressStat = _zk.exists(inProgressDir);
int inProgressCount = 0;
if (inProgressStat != null) {
inProgressCount = inProgressStat.getNumChildren();
}
return new TaskCounter(_workerName, tasksTodoPerJobs, inProgressCount);
} catch (final Exception e) {
throw new TaskmanagerException(e);
}
}
/**
* @param timeToLiveMs
* the maximum processing time
* @return task Ids of tasks that have exceeded the timeToLive.
*/
public Collection<String> getTimedOutTasks(final long timeToLiveMs) {
if (_log.isTraceEnabled()) {
_log.trace("checkInProgressTasks()");
}
final Collection<String> timedOutTasks = new ArrayList<String>();
try {
final String inProgressPath = getInProgressPath();
final Stat stat = _zk.exists(inProgressPath);
if (stat != null && stat.getNumChildren() > 0) {
final List<String> tasks = _zk.getChildrenSorted(inProgressPath);
for (final String taskId : tasks) {
if (isTaskTimedOut(inProgressPath, taskId, timeToLiveMs)) {
timedOutTasks.add(taskId);
}
}
}
} catch (final Exception ex) {
_log.warn("Error checking inprogress tasks of worker " + _workerName, ex);
}
return timedOutTasks;
}
/**
* Check if maximum processing time of given task exceeded.
*
* @param rootPath
* root path for tasks
* @param taskId
* identifies the task
* @param timeToLiveMs
* the maximum processing time
*/
private boolean isTaskTimedOut(final String rootPath, final String taskId, final long timeToLiveMs) {
try {
final String taskNodePath = rootPath + "/" + taskId;
final Stat stat = _zk.exists(taskNodePath);
if (stat != null) {
final long mtime = stat.getMtime();
final long now = System.currentTimeMillis();
if (now - mtime > timeToLiveMs) {
if (_log.isInfoEnabled()) {
_log.info("reached maximum processing time (" + timeToLiveMs + " ms) for task " + taskId);
}
final String finishNodePath = taskNodePath + "/finishing";
// check if node is not already being finished.
return _zk.exists(finishNodePath) == null;
}
}
} catch (final Exception ex) {
_log.error("Error checking task " + taskId + " for worker " + _workerName, ex);
}
return false;
}
/**
* Remove all task (and todo sub queue) nodes. Errors are logged as warning, but the purge tries to continue.
*/
public void purge() {
if (_log.isTraceEnabled()) {
_log.trace("purge()");
}
try {
final String workerDir = getWorkerPath();
final String todoDir = getTodoRootPath();
final String todoQualifiedDir = getTodoQualifiedRootPath();
final String inProgressDir = workerDir + INPROGRESSDIR_SUFFIX;
final List<String> todoSubQueues = new ArrayList<>();
final List<String> todoQualifiedSubQueues = new ArrayList<>();
if (_zk.exists(todoDir) != null) {
todoSubQueues.addAll(_zk.getChildrenSorted(todoDir));
}
if (_zk.exists(todoQualifiedDir) != null) {
todoQualifiedSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedDir));
}
for (final String subQueue : todoSubQueues) {
final String todoSubQueueDir = getTodoTaskPath(null, subQueue);
purge(todoSubQueueDir);
}
purge(todoDir); // remove sub-queues too
for (final String subQueue : todoQualifiedSubQueues) {
final String todoQualifiedSubQueueDir = todoQualifiedDir + "/" + subQueue;
if (_zk.exists(todoQualifiedSubQueueDir) != null) {
try {
final List<String> qualifiers = _zk.getChildrenSorted(todoQualifiedSubQueueDir);
for (final String qualifier : qualifiers) {
final String qualifierDir = getTodoTaskPath(qualifier, subQueue);
purge(qualifierDir);
}
purge(todoQualifiedSubQueueDir); // remove sub-queues too
} catch (final Exception ex) {
_log.warn("Error determining qualifier node of queue " + _workerName + " during purge", ex);
}
}
}
purge(inProgressDir);
_lruSubQueues.clear();
} catch (final Exception ex) {
_log.warn("Error purging queue for worker " + _workerName, ex);
}
}
/**
* Remove all immediate children of the node.
*
* @param node
* some node.
*/
private void purge(final String node) {
try {
final Stat stat = _zk.exists(node);
if (stat != null && stat.getNumChildren() > 0) {
final List<String> children = _zk.getChildrenSorted(node);
for (final String childNode : children) {
final String childPath = node + "/" + childNode;
try {
_zk.deleteNode(childPath);
} catch (final Exception ex) {
_log.warn("Failed to delete node " + childPath + " during purge", ex);
}
}
}
} catch (final Exception ex) {
_log.warn("Failed to purge path " + node, ex);
}
}
/**
* Remove
* <ul>
* <li>empty task nodes in the todo(_qualified) directories (leftovers from failed create operations)
* <li>empty todo_qualified directories that seem not to be in use anymore.
* </ul>
* Errors are logged as warning, but the clean tries to continue.
*
* @param timeToLiveMs
* time in milliseconds after which to clean up stale tasks or empty todo_qualified dirs.
*
*/
public void cleanEmptyNodes(final long timeToLiveMs) {
if (_log.isDebugEnabled()) {
_log.debug("cleanEmptyNodes() for " + _workerName);
}
try {
// remove stale tasks in todo dir.
final String todoDir = getTodoRootPath();
cleanEmptyNodes(todoDir, false, timeToLiveMs);
// remove stale tasks and unused qualifier dirs in todo_qualified dir.
final String todoQualifiedDir = getTodoQualifiedRootPath();
cleanEmptyNodes(todoQualifiedDir, false, timeToLiveMs);
} catch (final Exception ex) {
_log.warn("Error during cleaning todo queue for worker " + _workerName, ex);
}
}
/**
* Recursively removes all empty children of the node, if they are untouched after a long time.
*
* @param node
* some node.
* @param removeEmpty
* if true the given node is removed if it is empty and untouched after a long time. Otherwise only its
* children are examined and removed if they fit the conditions.
* @param timeToLiveMs
* time to life for empty z-nodes in milliseconds
*/
private void cleanEmptyNodes(final String node, final boolean removeEmpty, final long timeToLiveMs) {
try {
Stat stat = _zk.exists(node);
if (stat != null && stat.getNumChildren() > 0) {
final List<String> children = _zk.getChildrenSorted(node);
for (final String childNode : children) {
final String childPath = node + "/" + childNode;
cleanEmptyNodes(childPath, true, timeToLiveMs);
}
}
if (removeEmpty) {
stat = _zk.exists(node);
if (stat != null && stat.getNumChildren() == 0 && stat.getDataLength() == 0) {
final long mtime = stat.getMtime();
final long now = System.currentTimeMillis();
if (now - mtime > timeToLiveMs) {
if (_log.isDebugEnabled()) {
_log.debug("remove empty node " + node);
}
_zk.deleteNode(node);
}
}
}
} catch (final Exception ex) {
_log.warn("Failed to clean node " + node, ex);
}
}
/** @return root path for tasks waiting for execution. */
private String getTodoRootPath() {
return new StringBuilder(getWorkerPath()).append(TODODIR_SUFFIX).toString();
}
/** @return base path for qualified tasks waiting for execution. */
private String getTodoQualifiedRootPath() {
return getWorkerPath() + TODOQUALIFIEDDIR_SUFFIX;
}
/** @return queue path with tasks waiting for execution. */
private String getTodoTaskPath(final String qualifier, final String subQueue) {
final StringBuilder todoTaskDir = new StringBuilder(getWorkerPath());
if (qualifier == null) {
todoTaskDir.append(TODODIR_SUFFIX);
if (subQueue != null) {
todoTaskDir.append("/").append(subQueue);
}
} else {
todoTaskDir.append(TODOQUALIFIEDDIR_SUFFIX);
if (subQueue != null) {
todoTaskDir.append("/").append(subQueue);
}
todoTaskDir.append('/').append(encodeQualifier(qualifier));
}
return todoTaskDir.toString();
}
/** @return root path for given task pipe. */
private String getWorkerPath() {
final StringBuilder workerPath = new StringBuilder(TASKDIR_PREFIX);
workerPath.append('/').append(_workerName);
return workerPath.toString();
}
/** @return in-progress queue path */
private String getInProgressPath() {
return getWorkerPath() + INPROGRESSDIR_SUFFIX;
}
/** @return path to in-progress task node. */
private String getInProgressTaskPath(final String taskId) {
return getInProgressPath() + "/" + taskId;
}
/**
* @param task
* the task to write
* @param path
* the path where to put the task
* @throws Exception
* error
*/
private void putTask(final Task task, final String path) throws KeeperException, TaskmanagerException {
final String nodePath = path + "/" + createUniqueTaskNodeName(task);
final byte[] nodeData = task2Message(task);
_zk.ensurePathExists(path);
_zk.setData(path, ZkConnection.NO_DATA); // update timestamp of directory.
// create node empty first: empty nodes are not removed by getTask, so we can
// retry this until success without producing duplicates
_zk.createPath(nodePath, ZkConnection.NO_DATA);
// now attach data to node, can be retried without problems, too.
try {
_zk.setData(nodePath, nodeData);
} catch (final KeeperException.NoNodeException e) {
_log.info("Coudn't set data for task " + task.getTaskId(), e);
// We can ignore that, it can only happen if:
// - we are in a retry-ZK-operation cause a previous setData got a ConnectionLoss
// - the setData operation in which the ConnectionLoss happened was actually successful
// - meanwhile the task was delivered to a worker -> todo-ZK-node is deleted
}
}
/**
* Convert task object to binary message.
*
* @param task
* a task
* @return a binary message
*/
private byte[] task2Message(final Task task) throws TaskmanagerException {
try {
final AnyMap anyTask = task.toAny();
final byte[] message = _anyWriter.writeBinaryObject(anyTask);
return message;
} catch (final Exception ex) {
throw new TaskmanagerException("Failed to create BON objet from task", ex);
}
}
/**
* Convert binary message to task object.
*
* @param message
* a binary Message
* @return a task object
*/
private Task message2Task(final byte[] message) throws TaskmanagerException {
try {
final Any anyTask = _anyReader.readBinaryObject(message);
final Task task = Task.fromAny(anyTask);
return task;
} catch (final Exception ex) {
throw new TaskmanagerException("Failed to parse task from BON object", ex);
}
}
/**
* check if task at given node contains the specified task property value.
*
* @param nodePath
* the path where to find the given task
* @param taskPropertyName
* the property to check
* @param expectedPropertyValue
* the property value to check
* @return 'true' if task exists and contains given property value, 'false' otherwise.
*/
private boolean checkTaskProperty(final String nodePath, final String taskPropertyName,
final String expectedPropertyValue) {
try {
final byte[] nodeData = _zk.getData(nodePath);
if (nodeData != null) {
final Task task = message2Task(nodeData);
final String value = task.getProperties().get(taskPropertyName);
return expectedPropertyValue.equals(value);
}
} catch (final KeeperException.NoNodeException e) {
_log.warn("Node doesn't exist" + nodePath);
} catch (final Exception e) {
_log.warn("Error while checking task property ", e);
}
return false;
}
/**
* @return string containing uniqueness tag property of task.
*/
private String createUniqueTagString(final Task task) {
return new StringBuilder("_").append(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)).append("_")
.toString();
}
/**
* @param task
* the task
* @return unique task node name for given task
*/
private String createUniqueTaskNodeName(final Task task) {
final StringBuilder taskNodeName = new StringBuilder();
taskNodeName.append(_numberFormat.format(System.nanoTime()));
if (task.getProperties().containsKey(Task.PROPERTY_UNIQUENESS_TAG)) {
taskNodeName.append(createUniqueTagString(task));
}
taskNodeName.append("-").append(task.getTaskId()).append("-").append(_localhost);
return taskNodeName.toString();
}
/**
* Get task list for current task pipe and sub-type.
*
* @param section
* task pipe sub-type: "inprogress" or "todo".
* @param maxCount
* max. number of tasks in return list
* @return task list
* @throws TaskmanagerException
* error
*/
public TaskList getTaskList(final String section, final int maxCount) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log.trace("getTaskList(): workerName=" + _workerName + " section=" + section + " maxCount=" + maxCount);
}
final TaskList taskList = new TaskList(_workerName, section, 0);
try {
final String workerDir = getWorkerPath();
final String todoDir = workerDir + TODODIR_SUFFIX;
final String todoQualifiedDir = workerDir + TODOQUALIFIEDDIR_SUFFIX;
final String inProgressDir = workerDir + INPROGRESSDIR_SUFFIX;
// TODO Impl. pruefen, gemeinsame Sachen refactorn mit getTaskCounter()? Ist getTaskList() optimal impl.?
if (section.equals(SECTION_TODO) || section.equals(SECTION_TODOQUALIFIED)) {
final List<String> todoSubQueues = new ArrayList<>();
final List<String> todoQualifiedSubQueues = new ArrayList<>();
if (_zk.exists(todoDir) != null) {
todoSubQueues.addAll(_zk.getChildrenSorted(todoDir));
}
if (_zk.exists(todoQualifiedDir) != null) {
todoQualifiedSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedDir));
}
for (final String subQueue : todoSubQueues) {
final String todoSubQueueDir = getTodoTaskPath(null, subQueue);
getTaskList(todoSubQueueDir, subQueue, maxCount, taskList);
}
for (final String subQueue : todoQualifiedSubQueues) {
final String todoQualifiedSubQueueDir = todoQualifiedDir + "/" + subQueue;
if (_zk.exists(todoQualifiedSubQueueDir) != null) {
try {
final List<String> qualifiers = _zk.getChildrenSorted(todoQualifiedSubQueueDir);
for (final String qualifier : qualifiers) {
final String qualifierDir = getTodoTaskPath(qualifier, subQueue);
getTaskList(qualifierDir, subQueue, maxCount, taskList);
}
} catch (final KeeperException.NoNodeException e) {
; // do nothing
}
}
}
} else if (section.equals(SECTION_INPROGRESS)) {
getTaskList(inProgressDir, null, maxCount, taskList);
} else {
throw new BadParameterTaskmanagerException("Accessing task pipe " + _workerName + " with an invalid type "
+ section + ". ", BadParameterTaskmanagerException.Cause.workerName);
}
} catch (final TaskmanagerException tex) {
throw tex;
} catch (final Exception e) {
throw new TaskmanagerException(e);
}
return taskList;
}
/**
* Add task count and IDs from current zk-node dir to task list.
*
* @param currentDir
* e.g. "/smila/tasks/statePipe1/todo" or "/smila/tasks/statePipe1/todo_qualified/qualifier1"
* @param maxCount
* max. number of tasks in return list
* @param taskList
* returned task list
* @throws Exception
* on zookeeper error
*/
private void getTaskList(final String currentDir, final String subQueue, final int maxCount,
final TaskList taskList) throws KeeperException {
if (_log.isTraceEnabled()) {
_log.trace("getTaskList(): currentDir=" + currentDir + " maxCount=" + maxCount);
}
final Stat currentStat = _zk.exists(currentDir);
if (currentStat == null) {
return;
}
taskList.setSize(taskList.getSize() + currentStat.getNumChildren());
try {
final List<String> taskIDs = _zk.getChildrenSorted(currentDir);
for (final String taskID : taskIDs) {
if (maxCount >= 0 && taskList.getTaskNames().size() >= maxCount) {
break;
}
// we don't want to return the sub queue name in the result list
// currentDir="/smila/tasks/statePipe1/todo/<subqueue>" => Id="/smila/tasks/statePipe1/todo/1234"
// currentDir="/smila/tasks/statePipe1/todo_qualfied/<subqueue>/qualifier1"
// => Id="/smila/tasks/statePipe1/todo_qualfied/qualifier1/1234"
String taskPath = currentDir + "/" + taskID;
if (subQueue != null && taskPath.contains("/" + subQueue + "/")) {
taskPath = taskPath.replace(subQueue + "/", "");
}
taskList.addTaskName(taskPath);
}
} catch (final KeeperException.NoNodeException e) {
; // do nothing
}
}
/**
* Prepares information on a task stored in the task storage.
*
* @param section
* queue section ("todo" or "inprogress")
* @param taskName
* task identifier
* @return task information
*
* @throws TaskmanagerException
* on error
*/
public AnyMap getTaskInfo(final String section, final String taskName) throws TaskmanagerException {
if (_log.isTraceEnabled()) {
_log
.trace("getTaskInfo(): workerName=" + _workerName + " taskQueueType=" + section + " taskName=" + taskName);
}
final AnyMap taskInfo = DataFactory.DEFAULT.createAnyMap();
try {
final String workerDir = getWorkerPath();
final String rootDir = workerDir + "/" + section;
Stat taskStat = null;
String taskDir = null;
if (SECTION_TODO.equals(section) || SECTION_TODOQUALIFIED.equals(section)) {
// here we have sub-queues so we have to search for the task in the sub-queues
final List<String> subQueues = new ArrayList<>();
if (_zk.exists(rootDir) != null) {
subQueues.addAll(_zk.getChildrenSorted(rootDir));
}
for (final String subQueue : subQueues) {
taskDir = rootDir + "/" + subQueue + "/" + taskName;
taskStat = _zk.exists(taskDir);
if (taskStat != null) {
break;
}
}
} else {
taskDir = rootDir + "/" + taskName;
taskStat = _zk.exists(taskDir);
}
if (taskStat != null) {
byte[] nodeData = null;
try {
nodeData = _zk.getData(taskDir);
} catch (final KeeperException.NoNodeException e) {
; // do nothing
}
if (nodeData != null) {
taskInfo.put("created", toTimestampAny(taskStat.getCtime()));
taskInfo.put("modified", toTimestampAny(taskStat.getMtime()));
if (nodeData.length > 0) {
final Task task = message2Task(nodeData);
taskInfo.put("task", task.toAny());
} else {
taskInfo.put("task", "");
}
}
return taskInfo;
}
} catch (final Exception e) {
throw new TaskmanagerException(e);
}
throw new BadParameterTaskmanagerException("Task does not exist in " + _workerName + ".",
BadParameterTaskmanagerException.Cause.taskId);
}
/** create Any Value from timestamp. */
private Value toTimestampAny(final long timestamp) {
return DataFactory.DEFAULT.createDateTimeValue(new Date(timestamp));
}
/**
* @param qualifier
* the qualifier to encode for zookeeper to avoid '/' in node name
* @return encoded qualifier string
*/
private String encodeQualifier(final String qualifier) {
return qualifier.replace('/', '_');
}
/**
* Remove canceled tasks identified by the filter map.
*
* @param filterMap
* map to identify tasks to be removed
*/
public void removeTasks(final AnyMap filterMap) {
String subQueue = filterMap.getStringValue(SUB_QUEUE_TASK_PROPERTY);
if (subQueue == null) {
subQueue = DEFAULT_SUB_QUEUE;
}
// tasks in todo
if (subQueue != null) {
removeTasks(getTodoTaskPath(null, subQueue), filterMap);
}
// tasks in qualified todo
List<String> qualifiers = new ArrayList<>();
try {
final String todoQualifiedDir = getTodoQualifiedRootPath() + "/" + subQueue;
if (_zk.exists(todoQualifiedDir) != null) {
qualifiers = _zk.getChildrenSorted(todoQualifiedDir);
}
} catch (final Exception e) {
;// do nothing, no qualified found
}
for (final String qualifier : qualifiers) {
removeTasks(getTodoTaskPath(qualifier, subQueue), filterMap);
}
// tasks in progress
removeTasks(getInProgressPath(), filterMap);
}
/**
* Removes tasks with given path and matching filter map.
*
* @param path
* The path of the task to be removed
* @param filterMap
* The filter map to check if task should be removed
*/
private void removeTasks(final String path, final AnyMap filterMap) {
try {
final Map<String, Task> tasks = getTasks(path);
for (final Entry<String, Task> entry : tasks.entrySet()) {
final String nodeName = entry.getKey();
final Task task = entry.getValue();
if (checkTask(task, filterMap)) {
try {
_zk.deleteTree(path + "/" + nodeName);
final String host = task.getProperties().get(Task.PROPERTY_WORKER_HOST);
if (host != null) {
decHostCounter(host);
}
} catch (final KeeperException.NoNodeException ex) {
;// do nothing, continue removing
} catch (final Exception e) {
_log.warn("Caught exception while removing tasks.", e);
}
}
}
} catch (final Exception e) {
_log.warn("Caught exception while removing tasks.", e);
}
}
/**
* Checks if the property map of a task matches to the filter map.
*
* @param task
* The task to be checked
* @param filterMap
* the filter map
* @return true if filter matches task properties, false else
*/
private boolean checkTask(final Task task, final AnyMap filterMap) {
final Map<String, String> properties = task.getProperties();
for (final Entry<String, Any> entry : filterMap.entrySet()) {
final String key = entry.getKey();
final String value = entry.getValue().toString();
final String propertyValue = properties.get(key);
if (propertyValue == null || !propertyValue.equals(value)) {
return false;
}
}
return true;
}
/**
* return new list of tasks with only those tasks from the given task list
* <ul>
* <li>that do not have property {@link Task#PROPERTY_UNIQUENESS_TAG} set or
* <li>for which currently no task is in the Todo queue that has the same uniqueness tag.
* </ul>
*
* @param tasks
* to filter for this task queue
* @return tasks that are not already contained in current task queue's todo list
*/
public List<Task> filterDuplicates(final List<Task> tasks) throws TaskmanagerException {
final List<Task> result = new ArrayList<>();
try {
final Map<String, List<String>> todoTasksCache = new HashMap<>(); // cache for ZK entries
for (final Task task : tasks) {
boolean taskExists = false;
if (task.getProperties().containsKey(Task.PROPERTY_UNIQUENESS_TAG)) {
final String uniqueTagToSearch = createUniqueTagString(task);
final String subQueue = getSubQueue(task);
final String todoTaskPath = getTodoTaskPath(task.getQualifier(), subQueue);
final List<String> todoTasks = getStoredTodoTasksWithCache(todoTaskPath, todoTasksCache);
for (final String todoTask : todoTasks) {
if (todoTask.contains(uniqueTagToSearch)) {
taskExists = true;
if (_log.isDebugEnabled()) {
_log.debug("Filtered task '" + task.getTaskId() + "' for worker '" + _workerName
+ "' with uniqueness tag '" + task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)
+ "' cause todo task exists: " + todoTask);
}
break;
}
}
}
if (!taskExists) {
result.add(task);
}
}
return result;
} catch (final Exception e) {
throw new TaskmanagerException(e);
}
}
private List<String> getStoredTodoTasksWithCache(final String todoTaskPath,
final Map<String, List<String>> todoTasksCache) throws KeeperException {
List<String> todoTasks = todoTasksCache.get(todoTaskPath);
if (todoTasks == null) {
try {
todoTasks = _zk.getChildrenSorted(todoTaskPath);
} catch (final NoNodeException ex) {
todoTasks = new ArrayList<>();
}
todoTasksCache.put(todoTaskPath, todoTasks);
}
return todoTasks;
}
private String getSubQueue(final Task task) {
if (task.getProperties().containsKey(SUB_QUEUE_TASK_PROPERTY)) {
return task.getProperties().get(SUB_QUEUE_TASK_PROPERTY);
}
return DEFAULT_SUB_QUEUE;
}
}