/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.persistence.zk; | |
import java.text.NumberFormat; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.Date; | |
import java.util.Deque; | |
import java.util.HashMap; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.KeeperException.NoNodeException; | |
import org.apache.zookeeper.data.Stat; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Value; | |
import org.eclipse.smila.datamodel.ipc.IpcAnyReader; | |
import org.eclipse.smila.datamodel.ipc.IpcAnyWriter; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskList; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.zookeeper.ZkConcurrentMap; | |
import org.eclipse.smila.zookeeper.ZkConnection; | |
import org.eclipse.smila.zookeeper.ZooKeeperService; | |
/** | |
* Queue implementation wrapping a zookeeper client. Tasks are stored in different (virtual) sub queues identified by | |
* task property. For those sub queues, tasks are delivered round robin (-> fairness). | |
* | |
* Does not do ZooKeeper session handling, it must be done from the calling context. | |
* | |
* Zookeeper structure: (task-id nodes contain Task as data) | |
* | |
* <tt> | |
* /smila/tasks/<worker-name>/todo/<sub-queue>/<counter>-<task-id>-<host name> | |
* ........................../todo_qualified/<sub-queue>/<qualifier-string>/<counter>-<task-id>-<host name> | |
* ........................../inprogress/<task-id> | |
* ........................../delivery/<sub-queue>/<worker-name>/<delivery-time> | |
* </tt> | |
*/ | |
public class ZkTaskQueue { | |
/** prefix for task directories. */ | |
public static final String ROOT_DIR_PREFIX = "/smila"; | |
/** prefix for task directories. */ | |
public static final String TASKDIR_PREFIX = ROOT_DIR_PREFIX + "/tasks"; | |
/** prefix for tasks-todo directories. */ | |
public static final String TODODIR_SUFFIX = "/todo"; | |
/** prefix for qualified-tasks-todo directories. */ | |
public static final String TODOQUALIFIEDDIR_SUFFIX = "/todo_qualified"; | |
/** prefix for task-in-progress directories. */ | |
public static final String INPROGRESSDIR_SUFFIX = "/inprogress"; | |
/** zk path for delayed task delivery data. */ | |
public static final String DELIVERY_PREFIX = "/delivery"; | |
/** Property identifying owner of this task. */ | |
public static final String OWNER_PROP = "inProgressBy"; | |
/** Property for tasks-todo. */ | |
public static final String SECTION_TODO = "todo"; | |
/** Property for qualified-tasks-todo. */ | |
public static final String SECTION_TODOQUALIFIED = "todo_qualified"; | |
/** Property for task-in-progress. */ | |
public static final String SECTION_INPROGRESS = "inprogress"; | |
/** use this as a default for identifying the right (sub) queue if no property is given in the task. */ | |
public static final String DEFAULT_SUB_QUEUE = "_default"; | |
/** task property for identifiying the right sub queue within our queue. */ | |
private static final String SUB_QUEUE_TASK_PROPERTY = Task.PROPERTY_JOB_NAME; | |
/** task pipe we are working on. */ | |
private final String _workerName; | |
/** local host name. */ | |
private final String _localhost; | |
/** maximum number of tasks that should be delivered for a host. */ | |
private long _maxNoOfTasksPerHost; | |
/** Number formatter. */ | |
private final NumberFormat _numberFormat; | |
/** Connection to ZooKeeper server. */ | |
private final ZkConnection _zk; | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** Any<-JSON converter. */ | |
private final IpcAnyReader _anyReader = new IpcAnyReader(); | |
/** Any->BON converter. */ | |
private final IpcAnyWriter _anyWriter = new IpcAnyWriter(); | |
/** sub queues for which we delivered tasks in least recently used order. */ | |
private final Deque<String> _lruSubQueues = new LinkedList<String>(); | |
/** | |
* @param service | |
* zookeeper service needed to get zookeeper client | |
* @param workerName | |
* name of worker for which we are managing tasks | |
* @param localhost | |
* local host name | |
*/ | |
public ZkTaskQueue(final ZooKeeperService service, final String workerName, final String localhost) { | |
this(service, workerName, localhost, TaskStorageZk.TASKSPERHOST_UNLIMITED); | |
} | |
/** | |
* @param service | |
* zookeeper service needed to get zookeeper client | |
* @param workerName | |
* name of worker for which we are managing tasks | |
* @param localhost | |
* local host name | |
* @param maxNoOfTasksPerHost | |
*/ | |
public ZkTaskQueue(final ZooKeeperService service, final String workerName, final String localhost, | |
final long maxNoOfTasksPerHost) { | |
super(); | |
_zk = new ZkConnection(service); | |
_workerName = workerName; | |
_localhost = localhost; | |
_maxNoOfTasksPerHost = maxNoOfTasksPerHost; | |
_numberFormat = NumberFormat.getIntegerInstance(); | |
final int counterStringLength = 19; // length of Long.MAX_VALUE | |
_numberFormat.setMaximumIntegerDigits(counterStringLength); | |
_numberFormat.setMinimumIntegerDigits(counterStringLength); | |
_numberFormat.setGroupingUsed(false); | |
} | |
/** for testing only! */ | |
public void disconnectZkSession() throws TaskmanagerException { | |
try { | |
_zk.disconnectZkSession(); | |
} catch (final Exception e) { | |
throw new TaskmanagerException("Error discconnecting zookeeper session.", e); | |
} | |
} | |
/** ensure that zk root path for storing the time of the last task delivery for delayed tasks is created. */ | |
private ZkConcurrentMap ensureDelayedDeliveryMap() throws KeeperException { | |
_zk.ensurePathExists(DELIVERY_PREFIX); | |
return new ZkConcurrentMap(_zk, DELIVERY_PREFIX); | |
} | |
/** @return key for zk map under which the delivery time for this task is stored. */ | |
private String getDelayedDeliveryKey(final Task task) { | |
// "###" is ok as separator, because this char is not allowed in job and worker names | |
return getSubQueue(task) + "###" + task.getWorkerName(); | |
} | |
/** @return scale-up limit setting. */ | |
public long getMaxNoOfTasksPerHost() { | |
return _maxNoOfTasksPerHost; | |
} | |
/** set new scale-up limit. */ | |
public void setMaxNoOfTasksPerHost(final long maxNoOfTasksPerHost) { | |
_maxNoOfTasksPerHost = maxNoOfTasksPerHost; | |
} | |
/** return worker name. */ | |
public String getWorkerName() { | |
return _workerName; | |
} | |
/** | |
* Puts a task in our task pipe. | |
* | |
* @param task | |
* the task | |
* @throws TaskmanagerException | |
* error | |
*/ | |
public void put(final Task task) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("put(), task = " + task); | |
} | |
try { | |
final String subQueue = getSubQueue(task); | |
final String todoTaskPath = getTodoTaskPath(task.getQualifier(), subQueue); | |
putTask(task, todoTaskPath); | |
} catch (final Exception ex) { | |
throw new TaskmanagerException(ex); | |
} | |
} | |
/** | |
* write task to in-progress section of the queue. | |
* | |
* @param task | |
* task | |
* @return value of {@link #OWNER_PROP} property. | |
* @throws TaskmanagerException | |
*/ | |
public String putInProgress(final Task task) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("putInProgress(), task = " + task); | |
} | |
try { | |
final String myId = _localhost + "-" + Thread.currentThread().getName(); // unique identifier | |
task.getProperties().put(OWNER_PROP, myId); | |
task.setTaskStartedProperties(); | |
final byte[] inProgressData = task2Message(task); | |
final String inProgressNodeName = getInProgressTaskPath(task.getTaskId()); | |
try { | |
_zk.createPath(inProgressNodeName, inProgressData); | |
} catch (final KeeperException.NodeExistsException e) { | |
// ignore, cause this can happen if multiple threads try to get a task | |
_log.trace("Node exists: " + e); | |
} | |
return myId; | |
} catch (final Exception ex) { | |
throw new TaskmanagerException(ex); | |
} finally { | |
task.getProperties().remove(OWNER_PROP); | |
} | |
} | |
/** | |
* Return next task that can be processed. This method is not thread-safe, it must be called in a synchronized | |
* context! | |
* | |
* @param qualifiers | |
* optional task qualifiers | |
* @param host | |
* host name where the worker is running that requested the task, to be stored as property in the task | |
* @return next task that can be processed. | |
* @throws TaskmanagerException | |
* error | |
*/ | |
public Task get(final Collection<String> qualifiers, final String host) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("get(), qualifiers = " + qualifiers); | |
} | |
final String qualifiedRootPath = getTodoQualifiedRootPath() + "/"; | |
final boolean getUnqualifiedTasks = qualifiers == null || qualifiers.isEmpty(); | |
final Collection<String> effectiveQualifiers = | |
getUnqualifiedTasks ? Collections.singleton((String) null) : qualifiers; | |
try { | |
final List<String> lruSubQueues = getSubQueuesForNextTask(); | |
for (final String subQueue : lruSubQueues) { | |
if (getUnqualifiedTasks || isNotEmpty(qualifiedRootPath + subQueue)) { | |
for (final String qualifier : effectiveQualifiers) { | |
final String todoTaskPath = getTodoTaskPath(qualifier, subQueue); | |
final Task task = getTask(todoTaskPath, host); | |
if (task != null) { | |
markSubQueueAsUsed(subQueue); | |
return task; | |
} | |
} | |
} | |
} | |
return null; | |
} catch (final KeeperException | RuntimeException ex) { | |
throw new TaskmanagerException(ex); | |
} | |
} | |
/** | |
* @return list of sub queues in the correct order (LRU) for a fair processing. | |
*/ | |
private List<String> getSubQueuesForNextTask() throws KeeperException { | |
final List<String> todoSubQueues = new LinkedList<>(); | |
final String todoRoot = getTodoRootPath(); | |
final String todoQualifiedRoot = getTodoQualifiedRootPath(); | |
// we assume that todo sub queue nodes that have no tasks are removed after a while (see cleanEmptyNodes()) | |
if (isNotEmpty(todoRoot)) { | |
todoSubQueues.addAll(_zk.getChildrenSorted(todoRoot)); | |
} | |
if (isNotEmpty(todoQualifiedRoot)) { | |
todoSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedRoot)); | |
} | |
final List<String> orderedSubQueues = new LinkedList<>(); | |
if (!todoSubQueues.isEmpty()) { | |
// We could also copy the LRU list to orderedSubQueues first and do the retainAll there, | |
// but the current impl. helps for cleaning up our LRU list from old unused job entries. | |
// It can theoretically cause unfairness, but this should have no practical relevance. | |
synchronized (_lruSubQueues) { | |
_lruSubQueues.retainAll(todoSubQueues); | |
// all sub queues from LRU list | |
orderedSubQueues.addAll(_lruSubQueues); | |
} | |
// add sub queues for which we have tasks that are not in LRU-list yet | |
for (final String subQueue : todoSubQueues) { | |
if (!orderedSubQueues.contains(subQueue)) { | |
orderedSubQueues.add(0, subQueue); | |
} | |
} | |
} | |
return orderedSubQueues; | |
} | |
private void markSubQueueAsUsed(final String subQueue) { | |
synchronized (_lruSubQueues) { | |
_lruSubQueues.remove(subQueue); // move from beginning (if it existed) | |
_lruSubQueues.addLast(subQueue); // ... to the end of our LRU queue | |
} | |
} | |
/** | |
* Returns a map with node names and tasks matching to given path. | |
* | |
* @param path | |
* The path | |
* @return list with tasks | |
* @throws Exception | |
* an exception if something went wrong | |
*/ | |
private Map<String, Task> getTasks(final String path) throws KeeperException, TaskmanagerException { | |
final Map<String, Task> tasks = new HashMap<String, Task>(); | |
try { | |
final List<String> nodeNames = _zk.getChildrenSorted(path); | |
if (nodeNames != null && !nodeNames.isEmpty()) { | |
final int noOfNames = nodeNames.size(); | |
for (int i = 0; i < noOfNames; i++) { | |
final String nodeName = nodeNames.get(i); | |
byte[] nodeData = null; | |
try { | |
nodeData = _zk.getData(path + "/" + nodeName); | |
} catch (final KeeperException.NoNodeException e) { | |
// ignore, cause this can happen if multiple threads try to get a task | |
_log.trace("Node to get data doesn't exist anymore: " + nodeName); | |
} | |
if (nodeData != null && nodeData.length > 0) { | |
tasks.put(nodeName, message2Task(nodeData)); | |
} | |
} | |
} | |
} catch (final KeeperException.NoNodeException ex) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Todo directory " + path + " does not exist yet."); | |
} | |
} | |
return tasks; | |
} | |
/** | |
* @return task to process. Null if none exists or no more tasks are allowed for the worker's host. | |
*/ | |
private Task getTask(final String todoTaskPath, final String host) throws KeeperException, TaskmanagerException { | |
Task task = null; | |
boolean hostCheckSuccessful = false; | |
try { | |
// select first from all potential tasks, sorted to maintain queue FIFO character | |
if (isNotEmpty(todoTaskPath)) { | |
final List<String> nodeNames = _zk.getChildrenSorted(todoTaskPath); | |
if (nodeNames != null && !nodeNames.isEmpty()) { | |
hostCheckSuccessful = checkAndIncHostCounter(host); | |
if (hostCheckSuccessful) { // scale up control allows us to deliver another task for this host | |
final int noOfNames = nodeNames.size(); | |
for (int i = 0; task == null && i < noOfNames; i++) { | |
task = tryTask(todoTaskPath, nodeNames.get(i), host); | |
} | |
} | |
} | |
} | |
} catch (final KeeperException.NoNodeException ex) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Todo directory " + todoTaskPath + " does not exist yet."); | |
} | |
} finally { | |
if (hostCheckSuccessful && task == null) { | |
try { | |
decHostCounter(host); // inc'd the host counter but could not get a task -> reset host counter | |
} catch (final Exception e) { | |
_log.warn("Error while decrementing host task counter for host '" + host + "'", e); | |
} | |
} | |
} | |
return task; | |
} | |
/** | |
* Check if we can deliver another task to this host (scale up control). | |
* | |
* @param host | |
* host name is the key of the counter | |
* @return 'true' if the host counter could be successfully incremented, otherwise 'false'. | |
*/ | |
private boolean checkAndIncHostCounter(final String host) throws KeeperException { | |
if (host == null) { | |
return true; // no host given -> no limit | |
} | |
_zk.ensurePathExists(TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH); | |
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH); | |
Integer newValue = null; | |
if (_maxNoOfTasksPerHost == TaskStorageZk.TASKSPERHOST_UNLIMITED) { // no limit -> just count | |
newValue = hostsMap.add(host, 1); | |
} else { | |
newValue = hostsMap.incIfLessThan(host, _maxNoOfTasksPerHost); | |
} | |
if (newValue == null) { | |
_log.warn("Could not update host task counter for host '" + host + "', probably due to temporary overload."); | |
return false; | |
} else if (newValue == -1) { | |
return false; // inc host counter failed because max is reached | |
} | |
return true; | |
} | |
/** | |
* try to read task from "todoTaskPath/nodeName" for worker host "host". | |
* | |
* @return task if we managed to read it without interference. Else null. | |
*/ | |
private Task tryTask(final String todoTaskPath, final String nodeName, final String host) throws KeeperException, | |
TaskmanagerException { | |
final String todoNodeName = todoTaskPath + "/" + nodeName; | |
byte[] nodeData = null; | |
try { | |
nodeData = _zk.getData(todoNodeName); | |
} catch (final KeeperException.NoNodeException e) { | |
// ignore, cause this can happen if multiple threads try to get a task | |
_log.trace("Node to get data doesn't exist anymore: " + todoNodeName); | |
} | |
if (nodeData != null && nodeData.length > 0) { | |
final Task task = message2Task(nodeData); | |
if (host != null) { // must be set before task is put in progress queue | |
task.getProperties().put(Task.PROPERTY_WORKER_HOST, host); | |
} | |
// check if there is a delivery delay resp. if we have to wait | |
boolean canGoOn = checkDeliveryDelay(task, false); | |
if (!canGoOn) { | |
return null; // delivery delay is configured and not reached | |
} | |
final String inProgressNodeName = getInProgressTaskPath(task.getTaskId()); | |
final String myId = putInProgress(task); | |
// check if I am really the one that got the task first | |
if (checkTaskProperty(inProgressNodeName, OWNER_PROP, myId)) { | |
try { | |
// check delivery delay again, someone else could have gotten a task meanwhile | |
canGoOn = checkDeliveryDelay(task, true); | |
if (!canGoOn) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Delay check not successful, removing task " + task.getTaskId() | |
+ " from in-progress queue"); | |
} | |
delete(task.getTaskId()); // someone else was faster, reset in-progress task | |
return null; | |
} | |
_zk.deleteNode(todoNodeName); | |
task.getProperties().remove(OWNER_PROP); | |
return task; | |
} catch (final KeeperException.NoNodeException e) { | |
// this can happen if another thread already finished | |
if (_log.isTraceEnabled()) { | |
_log.trace("Node to delete doesn't exist anymore: " + todoNodeName); | |
} | |
// we have to clean up the node we created before | |
_zk.deleteNode(inProgressNodeName); | |
} | |
} | |
} | |
return null; | |
} | |
/** | |
* @return 'false' if we were not successful in getting a delayed task. If task is already in-progress then we also | |
* try to change the delivery time in zk. If that wasn't successful, we also return 'false'. | |
*/ | |
private boolean checkDeliveryDelay(final Task task, final boolean changeDeliveryTime) | |
throws TaskmanagerException, KeeperException { | |
final String deliveryDelay = task.getProperties().get(Task.PROPERTY_DELIVERY_DELAY); | |
if (deliveryDelay != null) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("delivery delay configured: " + deliveryDelay); | |
} | |
final ZkConcurrentMap deliveryMap = ensureDelayedDeliveryMap(); | |
final String deliveryDelayKey = getDelayedDeliveryKey(task); | |
final String lastDelivery = deliveryMap.getString(deliveryDelayKey); | |
final long now = System.currentTimeMillis(); | |
if (mustWaitForDeliveryDelay(task, deliveryMap, deliveryDelay, lastDelivery, now)) { | |
return false; | |
} | |
if (changeDeliveryTime) { | |
// change the last delivery time (make sure I'm the one to do it) | |
final String newValue = String.valueOf(now); | |
if (_log.isTraceEnabled()) { | |
_log.trace("trying to change last delivery time from " + lastDelivery + " to " + newValue); | |
} | |
if (lastDelivery != null) { | |
return deliveryMap.replace(deliveryDelayKey, lastDelivery, newValue); | |
} else { | |
return newValue.equals(deliveryMap.putIfAbsent(deliveryDelayKey, newValue)); | |
} | |
} | |
} | |
return true; | |
} | |
/** @return 'true' if task has a delivery delay and it's not reached/exceeded. */ | |
private boolean mustWaitForDeliveryDelay(final Task task, final ZkConcurrentMap deliveryMap, | |
final String deliveryDelay, final String lastDelivery, final long now) { | |
if (deliveryDelay != null) { | |
if (lastDelivery != null && now - Long.valueOf(lastDelivery) < Long.valueOf(deliveryDelay)) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("configured delivery delay " + deliveryDelay + " not reached yet: " | |
+ (now - Long.valueOf(lastDelivery))); | |
} | |
return true; | |
} | |
} | |
return false; | |
} | |
/** | |
* read the task content of an in-progress task. The task is not deleted yet, but kept alive to prevent concurrent | |
* rollback. | |
* | |
* @param taskId | |
* task Id | |
* @return task contents or null if no such task exists or is already finishing | |
* @throws TaskmanagerException | |
* if task is not in-progress or other errors. | |
*/ | |
public Task getInProgressTask(final String taskId) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("getInProgresTask(), taskId = " + taskId); | |
} | |
final String inProgressNodeName = getInProgressTaskPath(taskId); | |
try { | |
final byte[] message = _zk.getData(inProgressNodeName); | |
final Task task = message2Task(message); | |
task.getProperties().remove(OWNER_PROP); | |
final String host = task.getProperties().remove(Task.PROPERTY_WORKER_HOST); | |
decHostCounter(host); | |
return task; | |
} catch (final KeeperException.NodeExistsException ex) { | |
// somebody else just created the finishing mark. | |
return null; | |
} catch (final KeeperException.NoNodeException ex) { | |
// task node does not exist in inprogress dir. | |
return null; | |
} catch (final Exception ex) { | |
throw new TaskmanagerException(ex); | |
} | |
} | |
/** | |
* @param host | |
* the host name for which to decrement the task counter | |
*/ | |
private void decHostCounter(final String host) throws KeeperException { | |
if (host != null) { | |
_zk.ensurePathExists(TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH); | |
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, TaskStorageZk.HOST_COUNTER_MAP_ZK_PATH); | |
final Integer newValue = hostsMap.decIfGreaterThan(host, 0); | |
if (newValue == null) { | |
_log.warn("Could not decrement host task counter for host '" + host | |
+ "', probably due to temporary overload."); | |
} else if (newValue < 0) { | |
_log.error("Negative host counter (" + newValue + ") for host '" + host + "'!"); | |
} | |
} | |
} | |
/** | |
* Deletes the given task from the inprogress section. | |
* | |
* @param taskId | |
* identifies the task | |
* @throws TaskmanagerException | |
* error; BadParameterTaskmanagerException if taskId isn't found | |
*/ | |
public void delete(final String taskId) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("delete(), taskId = " + taskId); | |
} | |
try { | |
final String inProgressNodeName = getInProgressTaskPath(taskId); | |
try { | |
_zk.deleteNode(inProgressNodeName); | |
} catch (final KeeperException.NoNodeException ex) { | |
throw new BadParameterTaskmanagerException("Could not find taskId '" + taskId + "' for worker '" | |
+ _workerName + "'.", BadParameterTaskmanagerException.Cause.taskId); | |
} | |
} catch (final KeeperException | RuntimeException ex) { | |
throw new TaskmanagerException(ex); | |
} | |
} | |
/** | |
* Process isAlive call for given task. | |
* | |
* @param taskId | |
* identifies the task | |
* | |
* @throws TaskmanagerException | |
* error; BadParameterTaskmanagerException if taskId isn't found | |
*/ | |
public void keepAlive(final String taskId) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("keepAlive(), taskId = " + taskId); | |
} | |
try { | |
final String inProgressNodeName = getInProgressTaskPath(taskId); | |
final byte[] nodeData = _zk.getData(inProgressNodeName); | |
_zk.setData(inProgressNodeName, nodeData); // -> update node's last modified timestamp | |
} catch (final KeeperException.NoNodeException ex) { | |
throw new BadParameterTaskmanagerException("Could not find taskId '" + taskId + "' for worker '" | |
+ _workerName + "'.", BadParameterTaskmanagerException.Cause.taskId); | |
} catch (final Exception ex) { | |
throw new TaskmanagerException(ex); | |
} | |
} | |
/** | |
* @return true if this queue has a directory for qualified tasks and it contains at least one sub-queue node with | |
* qualifier(s). | |
*/ | |
public boolean hasQualifiedTasks() throws KeeperException { | |
final String qualifiedRootPath = getTodoQualifiedRootPath(); | |
if (isNotEmpty(qualifiedRootPath)) { | |
final List<String> todoQualifiedSubQueues = _zk.getChildrenSorted(qualifiedRootPath); | |
for (final String subQueue : todoQualifiedSubQueues) { | |
if (isNotEmpty(qualifiedRootPath + "/" + subQueue)) { | |
return true; | |
} | |
} | |
} | |
return false; | |
} | |
/** @return true if path exists and contains sub-nodes. */ | |
private boolean isNotEmpty(final String path) throws KeeperException { | |
final Stat stat = _zk.exists(path); | |
return stat != null && stat.getNumChildren() > 0; | |
} | |
/** | |
* @return task counter | |
* | |
* @throws TaskmanagerException | |
* error | |
*/ | |
public TaskCounter getTaskCounter() throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("getTaskCounter()"); | |
} | |
try { | |
final String workerDir = getWorkerPath(); | |
final String todoDir = getTodoRootPath(); | |
final String todoQualifiedDir = getTodoQualifiedRootPath(); | |
final String inProgressDir = workerDir + INPROGRESSDIR_SUFFIX; | |
final List<String> todoSubQueues = new ArrayList<>(); | |
final List<String> todoQualifiedSubQueues = new ArrayList<>(); | |
if (_zk.exists(todoDir) != null) { | |
todoSubQueues.addAll(_zk.getChildrenSorted(todoDir)); | |
} | |
if (_zk.exists(todoQualifiedDir) != null) { | |
todoQualifiedSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedDir)); | |
} | |
final Map<String, Integer> tasksTodoPerJobs = new HashMap<String, Integer>(); | |
for (final String subQueue : todoSubQueues) { | |
final String todoSubQueueDir = getTodoTaskPath(null, subQueue); | |
final Stat todoStat = _zk.exists(todoSubQueueDir); | |
if (todoStat != null) { | |
tasksTodoPerJobs.put(subQueue, todoStat.getNumChildren()); | |
} | |
} | |
for (final String subQueue : todoQualifiedSubQueues) { | |
final String todoQualifiedSubQueueDir = todoQualifiedDir + "/" + subQueue; | |
if (_zk.exists(todoQualifiedSubQueueDir) != null) { | |
final List<String> qualifiers = _zk.getChildrenSorted(todoQualifiedSubQueueDir); | |
for (final String qualifier : qualifiers) { | |
final String qualifierDir = getTodoTaskPath(qualifier, subQueue); | |
final Stat qualifierStat = _zk.exists(qualifierDir); | |
if (qualifierStat != null) { | |
if (tasksTodoPerJobs.containsKey(subQueue)) { | |
tasksTodoPerJobs.put(subQueue, tasksTodoPerJobs.get(subQueue) + qualifierStat.getNumChildren()); | |
} else { | |
tasksTodoPerJobs.put(subQueue, qualifierStat.getNumChildren()); | |
} | |
} | |
} | |
} | |
} | |
final Stat inProgressStat = _zk.exists(inProgressDir); | |
int inProgressCount = 0; | |
if (inProgressStat != null) { | |
inProgressCount = inProgressStat.getNumChildren(); | |
} | |
return new TaskCounter(_workerName, tasksTodoPerJobs, inProgressCount); | |
} catch (final Exception e) { | |
throw new TaskmanagerException(e); | |
} | |
} | |
/** | |
* @param timeToLiveMs | |
* the maximum processing time | |
* @return task Ids of tasks that have exceeded the timeToLive. | |
*/ | |
public Collection<String> getTimedOutTasks(final long timeToLiveMs) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("checkInProgressTasks()"); | |
} | |
final Collection<String> timedOutTasks = new ArrayList<String>(); | |
try { | |
final String inProgressPath = getInProgressPath(); | |
final Stat stat = _zk.exists(inProgressPath); | |
if (stat != null && stat.getNumChildren() > 0) { | |
final List<String> tasks = _zk.getChildrenSorted(inProgressPath); | |
for (final String taskId : tasks) { | |
if (isTaskTimedOut(inProgressPath, taskId, timeToLiveMs)) { | |
timedOutTasks.add(taskId); | |
} | |
} | |
} | |
} catch (final Exception ex) { | |
_log.warn("Error checking inprogress tasks of worker " + _workerName, ex); | |
} | |
return timedOutTasks; | |
} | |
/** | |
* Check if maximum processing time of given task exceeded. | |
* | |
* @param rootPath | |
* root path for tasks | |
* @param taskId | |
* identifies the task | |
* @param timeToLiveMs | |
* the maximum processing time | |
*/ | |
private boolean isTaskTimedOut(final String rootPath, final String taskId, final long timeToLiveMs) { | |
try { | |
final String taskNodePath = rootPath + "/" + taskId; | |
final Stat stat = _zk.exists(taskNodePath); | |
if (stat != null) { | |
final long mtime = stat.getMtime(); | |
final long now = System.currentTimeMillis(); | |
if (now - mtime > timeToLiveMs) { | |
if (_log.isInfoEnabled()) { | |
_log.info("reached maximum processing time (" + timeToLiveMs + " ms) for task " + taskId); | |
} | |
final String finishNodePath = taskNodePath + "/finishing"; | |
// check if node is not already being finished. | |
return _zk.exists(finishNodePath) == null; | |
} | |
} | |
} catch (final Exception ex) { | |
_log.error("Error checking task " + taskId + " for worker " + _workerName, ex); | |
} | |
return false; | |
} | |
/** | |
* Remove all task (and todo sub queue) nodes. Errors are logged as warning, but the purge tries to continue. | |
*/ | |
public void purge() { | |
if (_log.isTraceEnabled()) { | |
_log.trace("purge()"); | |
} | |
try { | |
final String workerDir = getWorkerPath(); | |
final String todoDir = getTodoRootPath(); | |
final String todoQualifiedDir = getTodoQualifiedRootPath(); | |
final String inProgressDir = workerDir + INPROGRESSDIR_SUFFIX; | |
final List<String> todoSubQueues = new ArrayList<>(); | |
final List<String> todoQualifiedSubQueues = new ArrayList<>(); | |
if (_zk.exists(todoDir) != null) { | |
todoSubQueues.addAll(_zk.getChildrenSorted(todoDir)); | |
} | |
if (_zk.exists(todoQualifiedDir) != null) { | |
todoQualifiedSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedDir)); | |
} | |
for (final String subQueue : todoSubQueues) { | |
final String todoSubQueueDir = getTodoTaskPath(null, subQueue); | |
purge(todoSubQueueDir); | |
} | |
purge(todoDir); // remove sub-queues too | |
for (final String subQueue : todoQualifiedSubQueues) { | |
final String todoQualifiedSubQueueDir = todoQualifiedDir + "/" + subQueue; | |
if (_zk.exists(todoQualifiedSubQueueDir) != null) { | |
try { | |
final List<String> qualifiers = _zk.getChildrenSorted(todoQualifiedSubQueueDir); | |
for (final String qualifier : qualifiers) { | |
final String qualifierDir = getTodoTaskPath(qualifier, subQueue); | |
purge(qualifierDir); | |
} | |
purge(todoQualifiedSubQueueDir); // remove sub-queues too | |
} catch (final Exception ex) { | |
_log.warn("Error determining qualifier node of queue " + _workerName + " during purge", ex); | |
} | |
} | |
} | |
purge(inProgressDir); | |
_lruSubQueues.clear(); | |
} catch (final Exception ex) { | |
_log.warn("Error purging queue for worker " + _workerName, ex); | |
} | |
} | |
/** | |
* Remove all immediate children of the node. | |
* | |
* @param node | |
* some node. | |
*/ | |
private void purge(final String node) { | |
try { | |
final Stat stat = _zk.exists(node); | |
if (stat != null && stat.getNumChildren() > 0) { | |
final List<String> children = _zk.getChildrenSorted(node); | |
for (final String childNode : children) { | |
final String childPath = node + "/" + childNode; | |
try { | |
_zk.deleteNode(childPath); | |
} catch (final Exception ex) { | |
_log.warn("Failed to delete node " + childPath + " during purge", ex); | |
} | |
} | |
} | |
} catch (final Exception ex) { | |
_log.warn("Failed to purge path " + node, ex); | |
} | |
} | |
/** | |
* Remove | |
* <ul> | |
* <li>empty task nodes in the todo(_qualified) directories (leftovers from failed create operations) | |
* <li>empty todo_qualified directories that seem not to be in use anymore. | |
* </ul> | |
* Errors are logged as warning, but the clean tries to continue. | |
* | |
* @param timeToLiveMs | |
* time in milliseconds after which to clean up stale tasks or empty todo_qualified dirs. | |
* | |
*/ | |
public void cleanEmptyNodes(final long timeToLiveMs) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("cleanEmptyNodes() for " + _workerName); | |
} | |
try { | |
// remove stale tasks in todo dir. | |
final String todoDir = getTodoRootPath(); | |
cleanEmptyNodes(todoDir, false, timeToLiveMs); | |
// remove stale tasks and unused qualifier dirs in todo_qualified dir. | |
final String todoQualifiedDir = getTodoQualifiedRootPath(); | |
cleanEmptyNodes(todoQualifiedDir, false, timeToLiveMs); | |
} catch (final Exception ex) { | |
_log.warn("Error during cleaning todo queue for worker " + _workerName, ex); | |
} | |
} | |
/** | |
* Recursively removes all empty children of the node, if they are untouched after a long time. | |
* | |
* @param node | |
* some node. | |
* @param removeEmpty | |
* if true the given node is removed if it is empty and untouched after a long time. Otherwise only its | |
* children are examined and removed if they fit the conditions. | |
* @param timeToLiveMs | |
* time to life for empty z-nodes in milliseconds | |
*/ | |
private void cleanEmptyNodes(final String node, final boolean removeEmpty, final long timeToLiveMs) { | |
try { | |
Stat stat = _zk.exists(node); | |
if (stat != null && stat.getNumChildren() > 0) { | |
final List<String> children = _zk.getChildrenSorted(node); | |
for (final String childNode : children) { | |
final String childPath = node + "/" + childNode; | |
cleanEmptyNodes(childPath, true, timeToLiveMs); | |
} | |
} | |
if (removeEmpty) { | |
stat = _zk.exists(node); | |
if (stat != null && stat.getNumChildren() == 0 && stat.getDataLength() == 0) { | |
final long mtime = stat.getMtime(); | |
final long now = System.currentTimeMillis(); | |
if (now - mtime > timeToLiveMs) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("remove empty node " + node); | |
} | |
_zk.deleteNode(node); | |
} | |
} | |
} | |
} catch (final Exception ex) { | |
_log.warn("Failed to clean node " + node, ex); | |
} | |
} | |
/** @return root path for tasks waiting for execution. */ | |
private String getTodoRootPath() { | |
return new StringBuilder(getWorkerPath()).append(TODODIR_SUFFIX).toString(); | |
} | |
/** @return base path for qualified tasks waiting for execution. */ | |
private String getTodoQualifiedRootPath() { | |
return getWorkerPath() + TODOQUALIFIEDDIR_SUFFIX; | |
} | |
/** @return queue path with tasks waiting for execution. */ | |
private String getTodoTaskPath(final String qualifier, final String subQueue) { | |
final StringBuilder todoTaskDir = new StringBuilder(getWorkerPath()); | |
if (qualifier == null) { | |
todoTaskDir.append(TODODIR_SUFFIX); | |
if (subQueue != null) { | |
todoTaskDir.append("/").append(subQueue); | |
} | |
} else { | |
todoTaskDir.append(TODOQUALIFIEDDIR_SUFFIX); | |
if (subQueue != null) { | |
todoTaskDir.append("/").append(subQueue); | |
} | |
todoTaskDir.append('/').append(encodeQualifier(qualifier)); | |
} | |
return todoTaskDir.toString(); | |
} | |
/** @return root path for given task pipe. */ | |
private String getWorkerPath() { | |
final StringBuilder workerPath = new StringBuilder(TASKDIR_PREFIX); | |
workerPath.append('/').append(_workerName); | |
return workerPath.toString(); | |
} | |
/** @return in-progress queue path */ | |
private String getInProgressPath() { | |
return getWorkerPath() + INPROGRESSDIR_SUFFIX; | |
} | |
/** @return path to in-progress task node. */ | |
private String getInProgressTaskPath(final String taskId) { | |
return getInProgressPath() + "/" + taskId; | |
} | |
/** | |
* @param task | |
* the task to write | |
* @param path | |
* the path where to put the task | |
* @throws Exception | |
* error | |
*/ | |
private void putTask(final Task task, final String path) throws KeeperException, TaskmanagerException { | |
final String nodePath = path + "/" + createUniqueTaskNodeName(task); | |
final byte[] nodeData = task2Message(task); | |
_zk.ensurePathExists(path); | |
_zk.setData(path, ZkConnection.NO_DATA); // update timestamp of directory. | |
// create node empty first: empty nodes are not removed by getTask, so we can | |
// retry this until success without producing duplicates | |
_zk.createPath(nodePath, ZkConnection.NO_DATA); | |
// now attach data to node, can be retried without problems, too. | |
try { | |
_zk.setData(nodePath, nodeData); | |
} catch (final KeeperException.NoNodeException e) { | |
_log.info("Coudn't set data for task " + task.getTaskId(), e); | |
// We can ignore that, it can only happen if: | |
// - we are in a retry-ZK-operation cause a previous setData got a ConnectionLoss | |
// - the setData operation in which the ConnectionLoss happened was actually successful | |
// - meanwhile the task was delivered to a worker -> todo-ZK-node is deleted | |
} | |
} | |
/** | |
* Convert task object to binary message. | |
* | |
* @param task | |
* a task | |
* @return a binary message | |
*/ | |
private byte[] task2Message(final Task task) throws TaskmanagerException { | |
try { | |
final AnyMap anyTask = task.toAny(); | |
final byte[] message = _anyWriter.writeBinaryObject(anyTask); | |
return message; | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Failed to create BON objet from task", ex); | |
} | |
} | |
/** | |
* Convert binary message to task object. | |
* | |
* @param message | |
* a binary Message | |
* @return a task object | |
*/ | |
private Task message2Task(final byte[] message) throws TaskmanagerException { | |
try { | |
final Any anyTask = _anyReader.readBinaryObject(message); | |
final Task task = Task.fromAny(anyTask); | |
return task; | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Failed to parse task from BON object", ex); | |
} | |
} | |
/** | |
* check if task at given node contains the specified task property value. | |
* | |
* @param nodePath | |
* the path where to find the given task | |
* @param taskPropertyName | |
* the property to check | |
* @param expectedPropertyValue | |
* the property value to check | |
* @return 'true' if task exists and contains given property value, 'false' otherwise. | |
*/ | |
private boolean checkTaskProperty(final String nodePath, final String taskPropertyName, | |
final String expectedPropertyValue) { | |
try { | |
final byte[] nodeData = _zk.getData(nodePath); | |
if (nodeData != null) { | |
final Task task = message2Task(nodeData); | |
final String value = task.getProperties().get(taskPropertyName); | |
return expectedPropertyValue.equals(value); | |
} | |
} catch (final KeeperException.NoNodeException e) { | |
_log.warn("Node doesn't exist" + nodePath); | |
} catch (final Exception e) { | |
_log.warn("Error while checking task property ", e); | |
} | |
return false; | |
} | |
/** | |
* @return string containing uniqueness tag property of task. | |
*/ | |
private String createUniqueTagString(final Task task) { | |
return new StringBuilder("_").append(task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG)).append("_") | |
.toString(); | |
} | |
/** | |
* @param task | |
* the task | |
* @return unique task node name for given task | |
*/ | |
private String createUniqueTaskNodeName(final Task task) { | |
final StringBuilder taskNodeName = new StringBuilder(); | |
taskNodeName.append(_numberFormat.format(System.nanoTime())); | |
if (task.getProperties().containsKey(Task.PROPERTY_UNIQUENESS_TAG)) { | |
taskNodeName.append(createUniqueTagString(task)); | |
} | |
taskNodeName.append("-").append(task.getTaskId()).append("-").append(_localhost); | |
return taskNodeName.toString(); | |
} | |
/** | |
* Get task list for current task pipe and sub-type. | |
* | |
* @param section | |
* task pipe sub-type: "inprogress" or "todo". | |
* @param maxCount | |
* max. number of tasks in return list | |
* @return task list | |
* @throws TaskmanagerException | |
* error | |
*/ | |
public TaskList getTaskList(final String section, final int maxCount) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("getTaskList(): workerName=" + _workerName + " section=" + section + " maxCount=" + maxCount); | |
} | |
final TaskList taskList = new TaskList(_workerName, section, 0); | |
try { | |
final String workerDir = getWorkerPath(); | |
final String todoDir = workerDir + TODODIR_SUFFIX; | |
final String todoQualifiedDir = workerDir + TODOQUALIFIEDDIR_SUFFIX; | |
final String inProgressDir = workerDir + INPROGRESSDIR_SUFFIX; | |
// TODO Impl. pruefen, gemeinsame Sachen refactorn mit getTaskCounter()? Ist getTaskList() optimal impl.? | |
if (section.equals(SECTION_TODO) || section.equals(SECTION_TODOQUALIFIED)) { | |
final List<String> todoSubQueues = new ArrayList<>(); | |
final List<String> todoQualifiedSubQueues = new ArrayList<>(); | |
if (_zk.exists(todoDir) != null) { | |
todoSubQueues.addAll(_zk.getChildrenSorted(todoDir)); | |
} | |
if (_zk.exists(todoQualifiedDir) != null) { | |
todoQualifiedSubQueues.addAll(_zk.getChildrenSorted(todoQualifiedDir)); | |
} | |
for (final String subQueue : todoSubQueues) { | |
final String todoSubQueueDir = getTodoTaskPath(null, subQueue); | |
getTaskList(todoSubQueueDir, subQueue, maxCount, taskList); | |
} | |
for (final String subQueue : todoQualifiedSubQueues) { | |
final String todoQualifiedSubQueueDir = todoQualifiedDir + "/" + subQueue; | |
if (_zk.exists(todoQualifiedSubQueueDir) != null) { | |
try { | |
final List<String> qualifiers = _zk.getChildrenSorted(todoQualifiedSubQueueDir); | |
for (final String qualifier : qualifiers) { | |
final String qualifierDir = getTodoTaskPath(qualifier, subQueue); | |
getTaskList(qualifierDir, subQueue, maxCount, taskList); | |
} | |
} catch (final KeeperException.NoNodeException e) { | |
; // do nothing | |
} | |
} | |
} | |
} else if (section.equals(SECTION_INPROGRESS)) { | |
getTaskList(inProgressDir, null, maxCount, taskList); | |
} else { | |
throw new BadParameterTaskmanagerException("Accessing task pipe " + _workerName + " with an invalid type " | |
+ section + ". ", BadParameterTaskmanagerException.Cause.workerName); | |
} | |
} catch (final TaskmanagerException tex) { | |
throw tex; | |
} catch (final Exception e) { | |
throw new TaskmanagerException(e); | |
} | |
return taskList; | |
} | |
/** | |
* Add task count and IDs from current zk-node dir to task list. | |
* | |
* @param currentDir | |
* e.g. "/smila/tasks/statePipe1/todo" or "/smila/tasks/statePipe1/todo_qualified/qualifier1" | |
* @param maxCount | |
* max. number of tasks in return list | |
* @param taskList | |
* returned task list | |
* @throws Exception | |
* on zookeeper error | |
*/ | |
private void getTaskList(final String currentDir, final String subQueue, final int maxCount, | |
final TaskList taskList) throws KeeperException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("getTaskList(): currentDir=" + currentDir + " maxCount=" + maxCount); | |
} | |
final Stat currentStat = _zk.exists(currentDir); | |
if (currentStat == null) { | |
return; | |
} | |
taskList.setSize(taskList.getSize() + currentStat.getNumChildren()); | |
try { | |
final List<String> taskIDs = _zk.getChildrenSorted(currentDir); | |
for (final String taskID : taskIDs) { | |
if (maxCount >= 0 && taskList.getTaskNames().size() >= maxCount) { | |
break; | |
} | |
// we don't want to return the sub queue name in the result list | |
// currentDir="/smila/tasks/statePipe1/todo/<subqueue>" => Id="/smila/tasks/statePipe1/todo/1234" | |
// currentDir="/smila/tasks/statePipe1/todo_qualfied/<subqueue>/qualifier1" | |
// => Id="/smila/tasks/statePipe1/todo_qualfied/qualifier1/1234" | |
String taskPath = currentDir + "/" + taskID; | |
if (subQueue != null && taskPath.contains("/" + subQueue + "/")) { | |
taskPath = taskPath.replace(subQueue + "/", ""); | |
} | |
taskList.addTaskName(taskPath); | |
} | |
} catch (final KeeperException.NoNodeException e) { | |
; // do nothing | |
} | |
} | |
/** | |
* Prepares information on a task stored in the task storage. | |
* | |
* @param section | |
* queue section ("todo" or "inprogress") | |
* @param taskName | |
* task identifier | |
* @return task information | |
* | |
* @throws TaskmanagerException | |
* on error | |
*/ | |
public AnyMap getTaskInfo(final String section, final String taskName) throws TaskmanagerException { | |
if (_log.isTraceEnabled()) { | |
_log | |
.trace("getTaskInfo(): workerName=" + _workerName + " taskQueueType=" + section + " taskName=" + taskName); | |
} | |
final AnyMap taskInfo = DataFactory.DEFAULT.createAnyMap(); | |
try { | |
final String workerDir = getWorkerPath(); | |
final String rootDir = workerDir + "/" + section; | |
Stat taskStat = null; | |
String taskDir = null; | |
if (SECTION_TODO.equals(section) || SECTION_TODOQUALIFIED.equals(section)) { | |
// here we have sub-queues so we have to search for the task in the sub-queues | |
final List<String> subQueues = new ArrayList<>(); | |
if (_zk.exists(rootDir) != null) { | |
subQueues.addAll(_zk.getChildrenSorted(rootDir)); | |
} | |
for (final String subQueue : subQueues) { | |
taskDir = rootDir + "/" + subQueue + "/" + taskName; | |
taskStat = _zk.exists(taskDir); | |
if (taskStat != null) { | |
break; | |
} | |
} | |
} else { | |
taskDir = rootDir + "/" + taskName; | |
taskStat = _zk.exists(taskDir); | |
} | |
if (taskStat != null) { | |
byte[] nodeData = null; | |
try { | |
nodeData = _zk.getData(taskDir); | |
} catch (final KeeperException.NoNodeException e) { | |
; // do nothing | |
} | |
if (nodeData != null) { | |
taskInfo.put("created", toTimestampAny(taskStat.getCtime())); | |
taskInfo.put("modified", toTimestampAny(taskStat.getMtime())); | |
if (nodeData.length > 0) { | |
final Task task = message2Task(nodeData); | |
taskInfo.put("task", task.toAny()); | |
} else { | |
taskInfo.put("task", ""); | |
} | |
} | |
return taskInfo; | |
} | |
} catch (final Exception e) { | |
throw new TaskmanagerException(e); | |
} | |
throw new BadParameterTaskmanagerException("Task does not exist in " + _workerName + ".", | |
BadParameterTaskmanagerException.Cause.taskId); | |
} | |
/** create Any Value from timestamp. */ | |
private Value toTimestampAny(final long timestamp) { | |
return DataFactory.DEFAULT.createDateTimeValue(new Date(timestamp)); | |
} | |
/** | |
* @param qualifier | |
* the qualifier to encode for zookeeper to avoid '/' in node name | |
* @return encoded qualifier string | |
*/ | |
private String encodeQualifier(final String qualifier) { | |
return qualifier.replace('/', '_'); | |
} | |
/** | |
* Remove canceled tasks identified by the filter map. | |
* | |
* @param filterMap | |
* map to identify tasks to be removed | |
*/ | |
public void removeTasks(final AnyMap filterMap) { | |
String subQueue = filterMap.getStringValue(SUB_QUEUE_TASK_PROPERTY); | |
if (subQueue == null) { | |
subQueue = DEFAULT_SUB_QUEUE; | |
} | |
// tasks in todo | |
if (subQueue != null) { | |
removeTasks(getTodoTaskPath(null, subQueue), filterMap); | |
} | |
// tasks in qualified todo | |
List<String> qualifiers = new ArrayList<>(); | |
try { | |
final String todoQualifiedDir = getTodoQualifiedRootPath() + "/" + subQueue; | |
if (_zk.exists(todoQualifiedDir) != null) { | |
qualifiers = _zk.getChildrenSorted(todoQualifiedDir); | |
} | |
} catch (final Exception e) { | |
;// do nothing, no qualified found | |
} | |
for (final String qualifier : qualifiers) { | |
removeTasks(getTodoTaskPath(qualifier, subQueue), filterMap); | |
} | |
// tasks in progress | |
removeTasks(getInProgressPath(), filterMap); | |
} | |
/** | |
* Removes tasks with given path and matching filter map. | |
* | |
* @param path | |
* The path of the task to be removed | |
* @param filterMap | |
* The filter map to check if task should be removed | |
*/ | |
private void removeTasks(final String path, final AnyMap filterMap) { | |
try { | |
final Map<String, Task> tasks = getTasks(path); | |
for (final Entry<String, Task> entry : tasks.entrySet()) { | |
final String nodeName = entry.getKey(); | |
final Task task = entry.getValue(); | |
if (checkTask(task, filterMap)) { | |
try { | |
_zk.deleteTree(path + "/" + nodeName); | |
final String host = task.getProperties().get(Task.PROPERTY_WORKER_HOST); | |
if (host != null) { | |
decHostCounter(host); | |
} | |
} catch (final KeeperException.NoNodeException ex) { | |
;// do nothing, continue removing | |
} catch (final Exception e) { | |
_log.warn("Caught exception while removing tasks.", e); | |
} | |
} | |
} | |
} catch (final Exception e) { | |
_log.warn("Caught exception while removing tasks.", e); | |
} | |
} | |
/** | |
* Checks if the property map of a task matches to the filter map. | |
* | |
* @param task | |
* The task to be checked | |
* @param filterMap | |
* the filter map | |
* @return true if filter matches task properties, false else | |
*/ | |
private boolean checkTask(final Task task, final AnyMap filterMap) { | |
final Map<String, String> properties = task.getProperties(); | |
for (final Entry<String, Any> entry : filterMap.entrySet()) { | |
final String key = entry.getKey(); | |
final String value = entry.getValue().toString(); | |
final String propertyValue = properties.get(key); | |
if (propertyValue == null || !propertyValue.equals(value)) { | |
return false; | |
} | |
} | |
return true; | |
} | |
/** | |
* return new list of tasks with only those tasks from the given task list | |
* <ul> | |
* <li>that do not have property {@link Task#PROPERTY_UNIQUENESS_TAG} set or | |
* <li>for which currently no task is in the Todo queue that has the same uniqueness tag. | |
* </ul> | |
* | |
* @param tasks | |
* to filter for this task queue | |
* @return tasks that are not already contained in current task queue's todo list | |
*/ | |
public List<Task> filterDuplicates(final List<Task> tasks) throws TaskmanagerException { | |
final List<Task> result = new ArrayList<>(); | |
try { | |
final Map<String, List<String>> todoTasksCache = new HashMap<>(); // cache for ZK entries | |
for (final Task task : tasks) { | |
boolean taskExists = false; | |
if (task.getProperties().containsKey(Task.PROPERTY_UNIQUENESS_TAG)) { | |
final String uniqueTagToSearch = createUniqueTagString(task); | |
final String subQueue = getSubQueue(task); | |
final String todoTaskPath = getTodoTaskPath(task.getQualifier(), subQueue); | |
final List<String> todoTasks = getStoredTodoTasksWithCache(todoTaskPath, todoTasksCache); | |
for (final String todoTask : todoTasks) { | |
if (todoTask.contains(uniqueTagToSearch)) { | |
taskExists = true; | |
if (_log.isDebugEnabled()) { | |
_log.debug("Filtered task '" + task.getTaskId() + "' for worker '" + _workerName | |
+ "' with uniqueness tag '" + task.getProperties().get(Task.PROPERTY_UNIQUENESS_TAG) | |
+ "' cause todo task exists: " + todoTask); | |
} | |
break; | |
} | |
} | |
} | |
if (!taskExists) { | |
result.add(task); | |
} | |
} | |
return result; | |
} catch (final Exception e) { | |
throw new TaskmanagerException(e); | |
} | |
} | |
private List<String> getStoredTodoTasksWithCache(final String todoTaskPath, | |
final Map<String, List<String>> todoTasksCache) throws KeeperException { | |
List<String> todoTasks = todoTasksCache.get(todoTaskPath); | |
if (todoTasks == null) { | |
try { | |
todoTasks = _zk.getChildrenSorted(todoTaskPath); | |
} catch (final NoNodeException ex) { | |
todoTasks = new ArrayList<>(); | |
} | |
todoTasksCache.put(todoTaskPath, todoTasks); | |
} | |
return todoTasks; | |
} | |
private String getSubQueue(final Task task) { | |
if (task.getProperties().containsKey(SUB_QUEUE_TASK_PROPERTY)) { | |
return task.getProperties().get(SUB_QUEUE_TASK_PROPERTY); | |
} | |
return DEFAULT_SUB_QUEUE; | |
} | |
} |