blob: 03f17fab3f0d4ddb113322f3d522b95a12c95524 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.data.Stat;
import org.eclipse.smila.clusterconfig.ClusterConfigException;
import org.eclipse.smila.clusterconfig.ClusterConfigService;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskCounter;
import org.eclipse.smila.taskmanager.persistence.TaskList;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.eclipse.smila.zookeeper.ZkConcurrentMap;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZooKeeperService;
import org.osgi.service.component.ComponentContext;
/**
* Zookeeper task storage implementation.
*
* Zookeeper structure:
*
* <tt>
* /smila/taskmanager/hosts/<host-counter>
* /smila/taskmanager/qualifierlocks/<worker-name>/<qualifier-value>
* ... (also see ZkTaskQueue)
* </tt>
*
*/
public class TaskStorageZk implements TaskStorage {
/** Zookeeper path to stored host counters. */
public static final String HOST_COUNTER_MAP_ZK_PATH = "/smila/taskmanager/hosts";
/** Zookeeper path to qualifier locks. */
public static final String QUALIFIER_LOCKS_ZK_PATH = "/smila/taskmanager/qualifierlocks";
/** default value for {@link #_maxNoOfTasksPerHost}. */
public static final long TASKSPERHOST_UNLIMITED = -1;
/** if this time is exceeded for an operation, a log message is written. */
private static final long MIN_OPERATION_TIME_TO_LOG = 500; // ms
/** ZooKeeperService. */
private ZooKeeperService _zkService;
/** Zookeeper connection wrapper. */
private ZkConnection _zk;
/** connection to ClusterConfigService. */
private ClusterConfigService _clusterConfigService;
/** map of task queues. */
private final Map<String, ZkTaskQueue> _taskQueues = new ConcurrentHashMap<String, ZkTaskQueue>();
/** name of localhost, needed to initialized ZkTaskQueues. */
private String _localHost;
/** Lock to ensure availability of services. Processing methods use read lock, deactivate needs write lock. */
private final ReadWriteLock _lock = new ReentrantReadWriteLock(true);
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** maximum number of tasks that should be delivered for a host. */
private long _maxNoOfTasksPerHost = -1; // default: -1 (no limit)
/**
* @param context
* OSGi service component context.
*/
protected void activate(final ComponentContext context) {
try {
// wait for all operations to finish.
_lock.writeLock().lock();
_log.info("Activate ZooKeeper taskstorage implementation.");
_zk = new ZkConnection(_zkService);
} finally {
_lock.writeLock().unlock();
}
}
/**
* @param context
* OSGi service component context.
*/
protected synchronized void deactivate(final ComponentContext context) {
try {
// wait for all operations to finish.
_lock.writeLock().lock();
_log.info("Shutdown ZooKeeper taskstorage implementation.");
_zk = null;
_taskQueues.clear();
} finally {
_lock.writeLock().unlock();
}
}
@Override
public void createTaskQueue(final String name) {
final ZkTaskQueue taskQueue = new ZkTaskQueue(_zkService, name, getLocalHost(), _maxNoOfTasksPerHost);
_taskQueues.put(name, taskQueue);
}
@Override
public boolean hasTaskQueue(final String name) {
return _taskQueues.containsKey(name);
}
/** get task queues as unmodifyable collection. */
protected Collection<ZkTaskQueue> getTaskQueues() {
return Collections.unmodifiableCollection(_taskQueues.values());
}
@Override
public long getMaxNoOfTasksPerHost() {
return _maxNoOfTasksPerHost;
}
/**
* set the maximum number of tasks that should be delivered to a host. (scale up control)
*/
@Override
public void setMaxNoOfTasksPerHost(final long maxTasks) {
_maxNoOfTasksPerHost = maxTasks;
for (final ZkTaskQueue queue : getTaskQueues()) {
queue.setMaxNoOfTasksPerHost(_maxNoOfTasksPerHost);
}
}
/** {@inheritDoc} */
@Override
public Map<String, Integer> getScaleUpCounters() throws TaskmanagerException {
final Map<String, Integer> counters = new HashMap<String, Integer>();
try {
_zk.ensurePathExists(HOST_COUNTER_MAP_ZK_PATH);
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, HOST_COUNTER_MAP_ZK_PATH);
for (final String host : hostsMap.keySet()) {
counters.put(host, hostsMap.getInt(host));
}
} catch (final Exception ex) {
_log.warn("Could not read scale-up counters", ex);
}
return counters;
}
/** {@inheritDoc} */
@Override
public void storeInProgressTask(final Task task) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(task.getWorkerName(), true);
taskQueue.putInProgress(task);
} catch (final TaskmanagerException ex) {
throw ex;
} catch (final Exception ex) {
throw new TaskmanagerException("Error storing task to " + task.getWorkerName(), ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.storeInProgressTask() operation time: " + t + " ms, taskId=" + task.getTaskId());
}
}
}
}
/** {@inheritDoc} */
@Override
public void storeTask(final Task task) throws TaskmanagerException {
storeTask(task.getWorkerName(), task);
}
/** {@inheritDoc} */
@Override
public void storeTask(final String workerName, final Task task) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.put(task);
} catch (final TaskmanagerException ex) {
throw ex;
} catch (final Exception ex) {
throw new TaskmanagerException("Error storing task in '" + workerName + "' queue", ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.storeTask() operation time: " + t + " ms, taskId=" + task.getTaskId());
}
}
}
}
/** {@inheritDoc} */
@Override
public Task getTask(final String workerName, final String workerHost, final Collection<String> qualifiers)
throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
if (qualifiers == null || qualifiers.isEmpty()) {
return taskQueue.get(null, workerHost);
} else {
if (taskQueue.hasQualifiedTasks()) {
for (final String qualifier : qualifiers) {
final Task task = taskQueue.get(qualifier, workerHost);
if (task != null) {
return task;
}
}
}
}
return null;
} catch (final TaskmanagerException ex) {
throw ex;
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.getTask() operation time: " + t + " ms, workerName=" + workerName
+ ", workerHost=" + workerHost + ", qualifiers=" + qualifiers);
}
}
}
}
/** {@inheritDoc} */
@Override
public Task getInProgressTask(final String workerName, final String taskId) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
return taskQueue.getInProgressTask(taskId);
} catch (final TaskmanagerException ex) {
throw ex;
} catch (final Exception ex) {
throw new TaskmanagerException("Error reading task " + taskId + " for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.getInProgressTask() operation time: " + t + " ms, workerName=" + workerName
+ ", taskId=" + taskId);
}
}
}
}
/** {@inheritDoc} */
@Override
public void deleteTask(final String workerName, final String taskId) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.delete(taskId);
} catch (final TaskmanagerException ex) {
throw ex;
} catch (final Exception ex) {
throw new TaskmanagerException("Error deleting task " + taskId + " for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.deleteTask() operation time: " + t + " ms, workerName=" + workerName
+ ", taskId=" + taskId);
}
}
}
}
/** {@inheritDoc} */
@Override
public void keepAlive(final String workerName, final String taskId) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.keepAlive(taskId);
} catch (final TaskmanagerException ex) {
throw ex;
} catch (final Exception ex) {
throw new TaskmanagerException("Error deleting task " + taskId + " for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.keepAlive() operation time: " + t + " ms, workerName=" + workerName
+ ", taskId=" + taskId);
}
}
}
}
/** {@inheritDoc} */
@Override
public void purge(final String workerName) throws TaskmanagerException {
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.purge();
} catch (final Exception ex) {
throw new TaskmanagerException("Error purging tasks for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
}
}
@Override
public void lockQualifiers(final String workerName, final Collection<String> qualifiers) {
_lock.readLock().lock();
try {
if (workerName != null && qualifiers != null) {
for (final String qualifier : qualifiers) {
try {
final String qualifierLockPath = getQualifierLockPath(workerName, qualifier);
_zk.ensurePathExists(qualifierLockPath);
} catch (final Exception ex) {
_log.info("Qualifier lock for " + workerName + " and qualifier " + qualifier
+ " could not be created.", ex);
}
}
}
} finally {
_lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override
public boolean isLockedQualifier(final String workerName, final String qualifier) throws TaskmanagerException {
if (qualifier == null) {
return false;
}
try {
final String qualifierLockPath = getQualifierLockPath(workerName, qualifier);
return _zk.exists(qualifierLockPath) != null;
} catch (final Exception ex) {
throw new TaskmanagerException("Failed to determine if qualifier " + qualifier + " for worker " + workerName
+ " is locked", ex);
}
}
/** get path of qualifier lock node. */
private String getQualifierLockPath(final String workerName, final String qualifier) {
try {
return QUALIFIER_LOCKS_ZK_PATH + "/" + workerName + "/" + URLEncoder.encode(qualifier, "UTF-8");
} catch (final UnsupportedEncodingException ex) {
throw new IllegalStateException(ex);
}
}
/**
* check age of qualifier locks.
*
* @param timeToLiveMs
*/
public void checkQualifierLockAge(final long timeToLiveMs) {
try {
final Stat lockPathStat = _zk.exists(QUALIFIER_LOCKS_ZK_PATH);
if (lockPathStat != null && lockPathStat.getNumChildren() > 0) {
final List<String> workers = _zk.getChildrenSorted(QUALIFIER_LOCKS_ZK_PATH);
for (final String worker : workers) {
final String workerLockPath = QUALIFIER_LOCKS_ZK_PATH + "/" + worker;
final List<String> qualifiers = _zk.getChildrenSorted(workerLockPath);
for (final String qualifier : qualifiers) {
// don't use #getQualifierLockPath, because it's encoded already.
final String qualifierLockPath = workerLockPath + "/" + qualifier;
try {
final Stat stat = _zk.exists(qualifierLockPath);
if (stat != null) {
final long mtime = stat.getMtime();
final long now = System.currentTimeMillis();
if (now - mtime > timeToLiveMs) {
_zk.deleteNode(qualifierLockPath);
}
}
} catch (final Exception ex) {
_log.info("Could not remove qualifier lock " + qualifierLockPath + ". Will try again next time.", ex);
}
}
}
}
} catch (final Exception ex) {
_log.info("Failed to get qualifier locks.", ex);
}
}
/** {@inheritDoc} */
@Override
public Map<String, TaskCounter> getTaskCounters() throws TaskmanagerException {
try {
final Collection<String> queueNames = getQueueNames();
final Map<String, TaskCounter> counters = new HashMap<String, TaskCounter>();
for (final String queueName : queueNames) {
counters.put(queueName, getZkTaskQueue(queueName, true).getTaskCounter());
}
return counters;
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task counters", ex);
}
}
/** {@inheritDoc} */
@Override
public TaskList getTaskList(final String workerName, final String section, final int maxCount)
throws TaskmanagerException {
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, false);
if (taskQueue == null) {
throw new BadParameterTaskmanagerException("Worker name " + workerName + " is invalid.",
BadParameterTaskmanagerException.Cause.workerName);
}
return taskQueue.getTaskList(section, maxCount);
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task list for " + workerName + "/" + section, ex);
}
}
/** {@inheritDoc} */
@Override
public Any getTaskInfo(final String workerName, final String section, final String taskName)
throws TaskmanagerException {
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, false);
if (taskQueue == null) {
throw new BadParameterTaskmanagerException("Task pipe " + workerName + " is invalid.",
BadParameterTaskmanagerException.Cause.workerName);
}
return taskQueue.getTaskInfo(section, taskName);
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task info for " + workerName + "/" + section + "/" + taskName,
ex);
}
}
/** {@inheritDoc} */
@Override
public long getFailSafetyLevel() {
return _zkService.getFailSafetyLevel();
}
/** {@inheritDoc} */
@Override
public void clear() throws TaskmanagerException {
try {
// clear tasks
for (final ZkTaskQueue queue : _taskQueues.values()) {
queue.purge();
}
// clear counters
_zk.deleteTree(HOST_COUNTER_MAP_ZK_PATH);
} catch (final Exception ex) {
throw new TaskmanagerException("Error clearing task storage", ex);
}
}
/**
* Get the ZkTaskQueue for the given taskPipe. If no ZkTaskQueue exists a new one is created
*
* @param taskPipe
* name of the task pipe
* @param create
* create ZkTaskQueue if not found. if false return null.
* @return a ZkTaskQueue
* @throws IOException
* if the ZkTaskQueue cannot be created
* @throws ClusterConfigException
* if the ZkTaskQueue cannot be created
*/
private synchronized ZkTaskQueue getZkTaskQueue(final String taskPipe, final boolean create) throws IOException,
ClusterConfigException {
ZkTaskQueue taskQueue = _taskQueues.get(taskPipe);
if (taskQueue == null && create) {
createTaskQueue(taskPipe);
taskQueue = _taskQueues.get(taskPipe);
}
return taskQueue;
}
/**
* @return local host name.
*/
private String getLocalHost() {
if (_localHost == null) {
_localHost = _clusterConfigService.getLocalHost();
}
return _localHost;
}
/**
* get queue names from ZooKeeper.
*
* @throws Exception
* error talking to Zk
*/
private Collection<String> getQueueNames() throws Exception {
_zk.ensurePathExists(ZkTaskQueue.TASKDIR_PREFIX);
return _zk.getChildrenSorted(ZkTaskQueue.TASKDIR_PREFIX);
}
/**
* @param zooKeeperService
* referenced service
*/
public void setZooKeeperService(final ZooKeeperService zooKeeperService) {
_zkService = zooKeeperService;
}
/**
*
* @param zooKeeperService
* referenced service
*/
public void unsetZooKeeperService(final ZooKeeperService zooKeeperService) {
if (_zkService == zooKeeperService) {
_zkService = null;
}
}
/**
* set new ClusterConfigService. To be called by DS runtime before activation.
*
* @param ccs
* new ClusterConfigService
*/
public void setClusterConfigService(final ClusterConfigService ccs) {
_clusterConfigService = ccs;
}
/**
* remove an ClusterConfigService. To be called by DS runtime after deactivation.
*
* @param ccs
* new ClusterConfigService
*/
public void unsetClusterConfigService(final ClusterConfigService ccs) {
if (_clusterConfigService == ccs) {
_clusterConfigService = null;
}
}
/** {@inheritDoc} */
@Override
public void removeTasks(final AnyMap filterMap) throws TaskmanagerException {
for (final Map.Entry<String, ZkTaskQueue> entry : _taskQueues.entrySet()) {
final ZkTaskQueue queue = entry.getValue();
queue.removeTasks(filterMap);
}
}
}