/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.persistence.zk; | |
import java.io.UnsupportedEncodingException; | |
import java.net.URLEncoder; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.locks.ReadWriteLock; | |
import java.util.concurrent.locks.ReentrantReadWriteLock; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.data.Stat; | |
import org.eclipse.smila.clusterconfig.ClusterConfigService; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.ipc.IpcAnyReader; | |
import org.eclipse.smila.datamodel.ipc.IpcAnyWriter; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCounter; | |
import org.eclipse.smila.taskmanager.TaskList; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
import org.eclipse.smila.zookeeper.ZkConcurrentMap; | |
import org.eclipse.smila.zookeeper.ZkConnection; | |
import org.eclipse.smila.zookeeper.ZooKeeperService; | |
import org.osgi.service.component.ComponentContext; | |
/** | |
* Zookeeper task storage implementation. | |
* | |
* Zookeeper structure: | |
* | |
* <tt> | |
* /smila/taskmanager/hosts/<host-counter> | |
* /smila/taskmanager/qualifierlocks/<worker-name>/<qualifier-value> | |
* ... (also see ZkTaskQueue) | |
* </tt>A | |
* | |
*/ | |
public class TaskStorageZk implements TaskStorage { | |
/** Zookeeper path to stored host counters. */ | |
public static final String HOST_COUNTER_MAP_ZK_PATH = "/smila/taskmanager/hosts"; | |
/** Zookeeper path to qualifier locks. */ | |
public static final String QUALIFIER_LOCKS_ZK_PATH = "/smila/taskmanager/qualifierlocks"; | |
public static final String TASK_HANDLER_CONFIGURATION = "/smila/taskmanager/config"; | |
/** default value for {@link #_maxNoOfTasksPerHost}. */ | |
public static final long TASKSPERHOST_UNLIMITED = -1; | |
/** if this time is exceeded for an operation, a log message is written. */ | |
private static final long MIN_OPERATION_TIME_TO_LOG = 1000; // ms | |
/** ZooKeeperService. */ | |
private ZooKeeperService _zkService; | |
/** Zookeeper connection wrapper. */ | |
private ZkConnection _zk; | |
/** Any->BON converter. */ | |
private final IpcAnyWriter _anyWriter = new IpcAnyWriter(); | |
/** BON ->Any converter. */ | |
private final IpcAnyReader _anyReader = new IpcAnyReader(); | |
/** connection to ClusterConfigService. */ | |
private ClusterConfigService _clusterConfigService; | |
/** map of task queues. */ | |
private final Map<String, ZkTaskQueue> _taskQueues = new ConcurrentHashMap<String, ZkTaskQueue>(); | |
/** name of localhost, needed to initialized ZkTaskQueues. */ | |
private String _localHost; | |
/** | |
* Lock to ensure availability of services. Processing methods use read lock, deactivate needs write lock. | |
*/ | |
private final ReadWriteLock _lock = new ReentrantReadWriteLock(true); | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** maximum number of tasks that should be delivered for a host. */ | |
private long _maxNoOfTasksPerHost = -1; // default: -1 (no limit) | |
/** | |
* @param context | |
* OSGi service component context. | |
*/ | |
protected void activate(final ComponentContext context) { | |
try { | |
// wait for all operations to finish. | |
_lock.writeLock().lock(); | |
_log.info("Activate ZooKeeper taskstorage implementation."); | |
_zk = new ZkConnection(_zkService); | |
} finally { | |
_lock.writeLock().unlock(); | |
} | |
} | |
/** | |
* @param context | |
* OSGi service component context. | |
*/ | |
protected void deactivate(final ComponentContext context) { | |
try { | |
// wait for all operations to finish. | |
_lock.writeLock().lock(); | |
_log.info("Shutdown ZooKeeper taskstorage implementation."); | |
_zk = null; | |
_taskQueues.clear(); | |
} finally { | |
_lock.writeLock().unlock(); | |
} | |
} | |
@Override | |
public void createTaskQueue(final String name) { | |
if (!_taskQueues.containsKey(name)) { | |
final ZkTaskQueue taskQueue = new ZkTaskQueue(_zkService, name, getLocalHost(), _maxNoOfTasksPerHost); | |
_taskQueues.put(name, taskQueue); | |
} | |
} | |
@Override | |
public boolean hasTaskQueue(final String name) { | |
return _taskQueues.containsKey(name); | |
} | |
/** get task queues as unmodifyable collection. */ | |
protected Collection<ZkTaskQueue> getTaskQueues() { | |
return Collections.unmodifiableCollection(_taskQueues.values()); | |
} | |
@Override | |
public long getMaxNoOfTasksPerHost() { | |
return _maxNoOfTasksPerHost; | |
} | |
/** | |
* set the maximum number of tasks that should be delivered to a host. (scale up control) | |
*/ | |
@Override | |
public void setMaxNoOfTasksPerHost(final long maxTasks) { | |
_maxNoOfTasksPerHost = maxTasks; | |
for (final ZkTaskQueue queue : getTaskQueues()) { | |
queue.setMaxNoOfTasksPerHost(_maxNoOfTasksPerHost); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Map<String, Integer> getScaleUpCounters() throws TaskmanagerException { | |
final Map<String, Integer> counters = new HashMap<String, Integer>(); | |
try { | |
_zk.ensurePathExists(HOST_COUNTER_MAP_ZK_PATH); | |
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, HOST_COUNTER_MAP_ZK_PATH); | |
for (final String host : hostsMap.keySet()) { | |
counters.put(host, hostsMap.getInt(host)); | |
} | |
} catch (final KeeperException | RuntimeException ex) { | |
_log.warn("Could not read scale-up counters", ex); | |
} | |
return counters; | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void storeInProgressTask(final Task task) throws TaskmanagerException { | |
final long start = System.currentTimeMillis(); | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(task.getWorkerName(), true); | |
taskQueue.putInProgress(task); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error storing task to " + task.getWorkerName(), ex); | |
} finally { | |
_lock.readLock().unlock(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("TaskStorageZk.storeInProgressTask() operation time: " + t + " ms, taskId=" + task.getTaskId()); | |
} | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void storeTask(final Task task) throws TaskmanagerException { | |
storeTask(task.getWorkerName(), task); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void storeTask(final String workerName, final Task task) throws TaskmanagerException { | |
final long start = System.currentTimeMillis(); | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true); | |
taskQueue.put(task); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error storing task in '" + workerName + "' queue", ex); | |
} finally { | |
_lock.readLock().unlock(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("TaskStorageZk.storeTask() operation time: " + t + " ms, taskId=" + task.getTaskId()); | |
} | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Task getTask(final String workerName, final String workerHost, final Collection<String> qualifiers) | |
throws TaskmanagerException { | |
final long start = System.currentTimeMillis(); | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true); | |
return taskQueue.get(qualifiers, workerHost); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error getting task for worker " + workerName, ex); | |
} finally { | |
_lock.readLock().unlock(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("TaskStorageZk.getTask() operation time: " + t + " ms, workerName=" + workerName | |
+ ", workerHost=" + workerHost + ", qualifiers=" + qualifiers); | |
} | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Task getInProgressTask(final String workerName, final String taskId) throws TaskmanagerException { | |
final long start = System.currentTimeMillis(); | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true); | |
return taskQueue.getInProgressTask(taskId); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error reading task " + taskId + " for worker " + workerName, ex); | |
} finally { | |
_lock.readLock().unlock(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("TaskStorageZk.getInProgressTask() operation time: " + t + " ms, workerName=" + workerName | |
+ ", taskId=" + taskId); | |
} | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void deleteTask(final String workerName, final String taskId) throws TaskmanagerException { | |
final long start = System.currentTimeMillis(); | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true); | |
taskQueue.delete(taskId); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error deleting task " + taskId + " for worker " + workerName, ex); | |
} finally { | |
_lock.readLock().unlock(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("TaskStorageZk.deleteTask() operation time: " + t + " ms, workerName=" + workerName | |
+ ", taskId=" + taskId); | |
} | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void keepAlive(final String workerName, final String taskId) throws TaskmanagerException { | |
final long start = System.currentTimeMillis(); | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true); | |
taskQueue.keepAlive(taskId); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error deleting task " + taskId + " for worker " + workerName, ex); | |
} finally { | |
_lock.readLock().unlock(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("TaskStorageZk.keepAlive() operation time: " + t + " ms, workerName=" + workerName | |
+ ", taskId=" + taskId); | |
} | |
} | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void purge(final String workerName) throws TaskmanagerException { | |
_lock.readLock().lock(); | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true); | |
taskQueue.purge(); | |
} catch (final RuntimeException ex) { | |
throw new TaskmanagerException("Error purging tasks for worker " + workerName, ex); | |
} finally { | |
_lock.readLock().unlock(); | |
} | |
} | |
@Override | |
public void lockQualifiers(final String workerName, final Collection<String> qualifiers) { | |
_lock.readLock().lock(); | |
try { | |
if (workerName != null && qualifiers != null) { | |
for (final String qualifier : qualifiers) { | |
try { | |
final String qualifierLockPath = getQualifierLockPath(workerName, qualifier); | |
_zk.ensurePathExists(qualifierLockPath); | |
} catch (final Exception ex) { | |
_log.info("Qualifier lock for " + workerName + " and qualifier " + qualifier | |
+ " could not be created.", ex); | |
} | |
} | |
} | |
} finally { | |
_lock.readLock().unlock(); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public boolean isLockedQualifier(final String workerName, final String qualifier) throws TaskmanagerException { | |
if (qualifier == null) { | |
return false; | |
} | |
try { | |
final String qualifierLockPath = getQualifierLockPath(workerName, qualifier); | |
return _zk.exists(qualifierLockPath) != null; | |
} catch (final KeeperException | RuntimeException ex) { | |
throw new TaskmanagerException("Failed to determine if qualifier " + qualifier + " for worker " + workerName | |
+ " is locked", ex); | |
} | |
} | |
/** get path of qualifier lock node. */ | |
private String getQualifierLockPath(final String workerName, final String qualifier) { | |
try { | |
return QUALIFIER_LOCKS_ZK_PATH + "/" + workerName + "/" + URLEncoder.encode(qualifier, "UTF-8"); | |
} catch (final UnsupportedEncodingException ex) { | |
throw new IllegalStateException(ex); | |
} | |
} | |
/** | |
* check age of qualifier locks. | |
* | |
* @param timeToLiveMs | |
*/ | |
public void checkQualifierLockAge(final long timeToLiveMs) { | |
try { | |
final Stat lockPathStat = _zk.exists(QUALIFIER_LOCKS_ZK_PATH); | |
if (lockPathStat != null && lockPathStat.getNumChildren() > 0) { | |
final List<String> workers = _zk.getChildrenSorted(QUALIFIER_LOCKS_ZK_PATH); | |
for (final String worker : workers) { | |
final String workerLockPath = QUALIFIER_LOCKS_ZK_PATH + "/" + worker; | |
final List<String> qualifiers = _zk.getChildrenSorted(workerLockPath); | |
for (final String qualifier : qualifiers) { | |
// don't use #getQualifierLockPath, because it's encoded | |
// already. | |
final String qualifierLockPath = workerLockPath + "/" + qualifier; | |
try { | |
final Stat stat = _zk.exists(qualifierLockPath); | |
if (stat != null) { | |
final long mtime = stat.getMtime(); | |
final long now = System.currentTimeMillis(); | |
if (now - mtime > timeToLiveMs) { | |
_zk.deleteNode(qualifierLockPath); | |
} | |
} | |
} catch (final Exception ex) { | |
_log.info("Could not remove qualifier lock " + qualifierLockPath + ". Will try again next time.", ex); | |
} | |
} | |
} | |
} | |
} catch (final Exception ex) { | |
_log.info("Failed to get qualifier locks.", ex); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Map<String, TaskCounter> getTaskCounters() throws TaskmanagerException { | |
try { | |
final Collection<String> queueNames = getQueueNames(); | |
final Map<String, TaskCounter> counters = new HashMap<String, TaskCounter>(); | |
for (final String queueName : queueNames) { | |
counters.put(queueName, getZkTaskQueue(queueName, true).getTaskCounter()); | |
} | |
return counters; | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Error getting task counters", ex); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public TaskList getTaskList(final String workerName, final String section, final int maxCount) | |
throws TaskmanagerException { | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, false); | |
if (taskQueue == null) { | |
throw new BadParameterTaskmanagerException("Worker name " + workerName + " is invalid.", | |
BadParameterTaskmanagerException.Cause.workerName); | |
} | |
return taskQueue.getTaskList(section, maxCount); | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Error getting task list for " + workerName + "/" + section, ex); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Any getTaskInfo(final String workerName, final String section, final String taskName) | |
throws TaskmanagerException { | |
try { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, false); | |
if (taskQueue == null) { | |
throw new BadParameterTaskmanagerException("Task pipe " + workerName + " is invalid.", | |
BadParameterTaskmanagerException.Cause.workerName); | |
} | |
return taskQueue.getTaskInfo(section, taskName); | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Error getting task info for " + workerName + "/" + section + "/" + taskName, | |
ex); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public long getFailSafetyLevel() { | |
return _zkService.getFailSafetyLevel(); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void clear() throws TaskmanagerException { | |
try { | |
// clear tasks | |
for (final ZkTaskQueue queue : _taskQueues.values()) { | |
queue.purge(); | |
} | |
// clear counters | |
_zk.deleteTree(HOST_COUNTER_MAP_ZK_PATH); | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Error clearing task storage", ex); | |
} | |
} | |
@Override | |
public List<Task> filterDuplicates(final List<Task> tasksToBeAdded) throws TaskmanagerException { | |
final List<Task> result = new ArrayList<>(); | |
final Map<String, List<Task>> workerToTasksMap = new HashMap<>(); | |
// sort tasks by worker | |
for (final Task task : tasksToBeAdded) { | |
final String worker = task.getWorkerName(); | |
List<Task> workerTaskList = workerToTasksMap.get(worker); | |
if (workerTaskList == null) { | |
workerTaskList = new ArrayList<Task>(); | |
workerToTasksMap.put(worker, workerTaskList); | |
} | |
workerTaskList.add(task); | |
} | |
// filter tasks for each worker's task queue | |
for (final Map.Entry<String, List<Task>> entry : workerToTasksMap.entrySet()) { | |
final ZkTaskQueue taskQueue = getZkTaskQueue(entry.getKey(), false); | |
final List<Task> unfilteredTasks = entry.getValue(); | |
if (taskQueue != null) { | |
final List<Task> filteredTasks = taskQueue.filterDuplicates(unfilteredTasks); | |
result.addAll(filteredTasks); | |
} else { | |
result.addAll(unfilteredTasks); | |
} | |
} | |
if (tasksToBeAdded.size() != result.size()) { | |
_log.info("Returning " + result.size() + " filtered tasks for " + tasksToBeAdded.size() + " original tasks"); | |
} | |
return result; | |
} | |
@Override | |
public void storeConfiguration(final AnyMap config) throws TaskmanagerException { | |
try { | |
_zk.ensurePathExists(TASK_HANDLER_CONFIGURATION); | |
final byte[] message = _anyWriter.writeBinaryObject(config); | |
_zk.setData(TASK_HANDLER_CONFIGURATION, message); | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Error saving taskmanager configuration", ex); | |
} | |
} | |
@Override | |
public AnyMap readConfiguration() throws TaskmanagerException { | |
Any result = DataFactory.DEFAULT.createAnyMap(); | |
try { | |
final Stat statResult = _zk.exists(TASK_HANDLER_CONFIGURATION); | |
if (statResult.getDataLength() > 0) { | |
final byte[] message = _zk.getData(TASK_HANDLER_CONFIGURATION); | |
result = _anyReader.readBinaryObject(message); | |
} | |
} catch (final Exception ex) { | |
throw new TaskmanagerException("Error saving taskmanager configuration", ex); | |
} | |
return (AnyMap) result; | |
} | |
/** | |
* Get the ZkTaskQueue for the given taskPipe. If no ZkTaskQueue exists a new one is created | |
* | |
* @param taskPipe | |
* name of the task pipe | |
* @param create | |
* create ZkTaskQueue if not found. if false return null. | |
* @return a ZkTaskQueue | |
*/ | |
private synchronized ZkTaskQueue getZkTaskQueue(final String taskPipe, final boolean create) { | |
ZkTaskQueue taskQueue = _taskQueues.get(taskPipe); | |
if (taskQueue == null && create) { | |
createTaskQueue(taskPipe); | |
taskQueue = _taskQueues.get(taskPipe); | |
} | |
return taskQueue; | |
} | |
/** | |
* @return local host name. | |
*/ | |
private String getLocalHost() { | |
if (_localHost == null) { | |
_localHost = _clusterConfigService.getLocalHost(); | |
} | |
return _localHost; | |
} | |
/** | |
* get queue names from ZooKeeper. | |
* | |
* @throws KeeperException | |
* error talking to Zk | |
*/ | |
private Collection<String> getQueueNames() throws KeeperException { | |
_zk.ensurePathExists(ZkTaskQueue.TASKDIR_PREFIX); | |
return _zk.getChildrenSorted(ZkTaskQueue.TASKDIR_PREFIX); | |
} | |
/** | |
* @param zooKeeperService | |
* referenced service | |
*/ | |
public void setZooKeeperService(final ZooKeeperService zooKeeperService) { | |
_zkService = zooKeeperService; | |
} | |
/** | |
* | |
* @param zooKeeperService | |
* referenced service | |
*/ | |
public void unsetZooKeeperService(final ZooKeeperService zooKeeperService) { | |
if (_zkService == zooKeeperService) { | |
_zkService = null; | |
} | |
} | |
/** | |
* set new ClusterConfigService. To be called by DS runtime before activation. | |
* | |
* @param ccs | |
* new ClusterConfigService | |
*/ | |
public void setClusterConfigService(final ClusterConfigService ccs) { | |
_clusterConfigService = ccs; | |
} | |
/** | |
* remove an ClusterConfigService. To be called by DS runtime after deactivation. | |
* | |
* @param ccs | |
* new ClusterConfigService | |
*/ | |
public void unsetClusterConfigService(final ClusterConfigService ccs) { | |
if (_clusterConfigService == ccs) { | |
_clusterConfigService = null; | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void removeTasks(final AnyMap filterMap) throws TaskmanagerException { | |
for (final Map.Entry<String, ZkTaskQueue> entry : _taskQueues.entrySet()) { | |
final ZkTaskQueue queue = entry.getValue(); | |
queue.removeTasks(filterMap); | |
} | |
} | |
} |