blob: fd0b7904757201422e71c7a876c345dd416787e1 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.eclipse.smila.clusterconfig.ClusterConfigService;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.ipc.IpcAnyReader;
import org.eclipse.smila.datamodel.ipc.IpcAnyWriter;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCounter;
import org.eclipse.smila.taskmanager.TaskList;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.eclipse.smila.zookeeper.ZkConcurrentMap;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZooKeeperService;
import org.osgi.service.component.ComponentContext;
/**
* Zookeeper task storage implementation.
*
* Zookeeper structure:
*
* <tt>
* /smila/taskmanager/hosts/<host-counter>
* /smila/taskmanager/qualifierlocks/<worker-name>/<qualifier-value>
* ... (also see ZkTaskQueue)
* </tt>A
*
*/
public class TaskStorageZk implements TaskStorage {
/** Zookeeper path to stored host counters. */
public static final String HOST_COUNTER_MAP_ZK_PATH = "/smila/taskmanager/hosts";
/** Zookeeper path to qualifier locks. */
public static final String QUALIFIER_LOCKS_ZK_PATH = "/smila/taskmanager/qualifierlocks";
public static final String TASK_HANDLER_CONFIGURATION = "/smila/taskmanager/config";
/** default value for {@link #_maxNoOfTasksPerHost}. */
public static final long TASKSPERHOST_UNLIMITED = -1;
/** if this time is exceeded for an operation, a log message is written. */
private static final long MIN_OPERATION_TIME_TO_LOG = 1000; // ms
/** ZooKeeperService. */
private ZooKeeperService _zkService;
/** Zookeeper connection wrapper. */
private ZkConnection _zk;
/** Any->BON converter. */
private final IpcAnyWriter _anyWriter = new IpcAnyWriter();
/** BON ->Any converter. */
private final IpcAnyReader _anyReader = new IpcAnyReader();
/** connection to ClusterConfigService. */
private ClusterConfigService _clusterConfigService;
/** map of task queues. */
private final Map<String, ZkTaskQueue> _taskQueues = new ConcurrentHashMap<String, ZkTaskQueue>();
/** name of localhost, needed to initialized ZkTaskQueues. */
private String _localHost;
/**
* Lock to ensure availability of services. Processing methods use read lock, deactivate needs write lock.
*/
private final ReadWriteLock _lock = new ReentrantReadWriteLock(true);
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** maximum number of tasks that should be delivered for a host. */
private long _maxNoOfTasksPerHost = -1; // default: -1 (no limit)
/**
* @param context
* OSGi service component context.
*/
protected void activate(final ComponentContext context) {
try {
// wait for all operations to finish.
_lock.writeLock().lock();
_log.info("Activate ZooKeeper taskstorage implementation.");
_zk = new ZkConnection(_zkService);
} finally {
_lock.writeLock().unlock();
}
}
/**
* @param context
* OSGi service component context.
*/
protected void deactivate(final ComponentContext context) {
try {
// wait for all operations to finish.
_lock.writeLock().lock();
_log.info("Shutdown ZooKeeper taskstorage implementation.");
_zk = null;
_taskQueues.clear();
} finally {
_lock.writeLock().unlock();
}
}
@Override
public void createTaskQueue(final String name) {
if (!_taskQueues.containsKey(name)) {
final ZkTaskQueue taskQueue = new ZkTaskQueue(_zkService, name, getLocalHost(), _maxNoOfTasksPerHost);
_taskQueues.put(name, taskQueue);
}
}
@Override
public boolean hasTaskQueue(final String name) {
return _taskQueues.containsKey(name);
}
/** get task queues as unmodifyable collection. */
protected Collection<ZkTaskQueue> getTaskQueues() {
return Collections.unmodifiableCollection(_taskQueues.values());
}
@Override
public long getMaxNoOfTasksPerHost() {
return _maxNoOfTasksPerHost;
}
/**
* set the maximum number of tasks that should be delivered to a host. (scale up control)
*/
@Override
public void setMaxNoOfTasksPerHost(final long maxTasks) {
_maxNoOfTasksPerHost = maxTasks;
for (final ZkTaskQueue queue : getTaskQueues()) {
queue.setMaxNoOfTasksPerHost(_maxNoOfTasksPerHost);
}
}
/** {@inheritDoc} */
@Override
public Map<String, Integer> getScaleUpCounters() throws TaskmanagerException {
final Map<String, Integer> counters = new HashMap<String, Integer>();
try {
_zk.ensurePathExists(HOST_COUNTER_MAP_ZK_PATH);
final ZkConcurrentMap hostsMap = new ZkConcurrentMap(_zk, HOST_COUNTER_MAP_ZK_PATH);
for (final String host : hostsMap.keySet()) {
counters.put(host, hostsMap.getInt(host));
}
} catch (final KeeperException | RuntimeException ex) {
_log.warn("Could not read scale-up counters", ex);
}
return counters;
}
/** {@inheritDoc} */
@Override
public void storeInProgressTask(final Task task) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(task.getWorkerName(), true);
taskQueue.putInProgress(task);
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error storing task to " + task.getWorkerName(), ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.storeInProgressTask() operation time: " + t + " ms, taskId=" + task.getTaskId());
}
}
}
}
/** {@inheritDoc} */
@Override
public void storeTask(final Task task) throws TaskmanagerException {
storeTask(task.getWorkerName(), task);
}
/** {@inheritDoc} */
@Override
public void storeTask(final String workerName, final Task task) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.put(task);
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error storing task in '" + workerName + "' queue", ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.storeTask() operation time: " + t + " ms, taskId=" + task.getTaskId());
}
}
}
}
/** {@inheritDoc} */
@Override
public Task getTask(final String workerName, final String workerHost, final Collection<String> qualifiers)
throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
return taskQueue.get(qualifiers, workerHost);
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error getting task for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.getTask() operation time: " + t + " ms, workerName=" + workerName
+ ", workerHost=" + workerHost + ", qualifiers=" + qualifiers);
}
}
}
}
/** {@inheritDoc} */
@Override
public Task getInProgressTask(final String workerName, final String taskId) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
return taskQueue.getInProgressTask(taskId);
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error reading task " + taskId + " for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.getInProgressTask() operation time: " + t + " ms, workerName=" + workerName
+ ", taskId=" + taskId);
}
}
}
}
/** {@inheritDoc} */
@Override
public void deleteTask(final String workerName, final String taskId) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.delete(taskId);
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error deleting task " + taskId + " for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.deleteTask() operation time: " + t + " ms, workerName=" + workerName
+ ", taskId=" + taskId);
}
}
}
}
/** {@inheritDoc} */
@Override
public void keepAlive(final String workerName, final String taskId) throws TaskmanagerException {
final long start = System.currentTimeMillis();
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.keepAlive(taskId);
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error deleting task " + taskId + " for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("TaskStorageZk.keepAlive() operation time: " + t + " ms, workerName=" + workerName
+ ", taskId=" + taskId);
}
}
}
}
/** {@inheritDoc} */
@Override
public void purge(final String workerName) throws TaskmanagerException {
_lock.readLock().lock();
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, true);
taskQueue.purge();
} catch (final RuntimeException ex) {
throw new TaskmanagerException("Error purging tasks for worker " + workerName, ex);
} finally {
_lock.readLock().unlock();
}
}
@Override
public void lockQualifiers(final String workerName, final Collection<String> qualifiers) {
_lock.readLock().lock();
try {
if (workerName != null && qualifiers != null) {
for (final String qualifier : qualifiers) {
try {
final String qualifierLockPath = getQualifierLockPath(workerName, qualifier);
_zk.ensurePathExists(qualifierLockPath);
} catch (final Exception ex) {
_log.info("Qualifier lock for " + workerName + " and qualifier " + qualifier
+ " could not be created.", ex);
}
}
}
} finally {
_lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override
public boolean isLockedQualifier(final String workerName, final String qualifier) throws TaskmanagerException {
if (qualifier == null) {
return false;
}
try {
final String qualifierLockPath = getQualifierLockPath(workerName, qualifier);
return _zk.exists(qualifierLockPath) != null;
} catch (final KeeperException | RuntimeException ex) {
throw new TaskmanagerException("Failed to determine if qualifier " + qualifier + " for worker " + workerName
+ " is locked", ex);
}
}
/** get path of qualifier lock node. */
private String getQualifierLockPath(final String workerName, final String qualifier) {
try {
return QUALIFIER_LOCKS_ZK_PATH + "/" + workerName + "/" + URLEncoder.encode(qualifier, "UTF-8");
} catch (final UnsupportedEncodingException ex) {
throw new IllegalStateException(ex);
}
}
/**
* check age of qualifier locks.
*
* @param timeToLiveMs
*/
public void checkQualifierLockAge(final long timeToLiveMs) {
try {
final Stat lockPathStat = _zk.exists(QUALIFIER_LOCKS_ZK_PATH);
if (lockPathStat != null && lockPathStat.getNumChildren() > 0) {
final List<String> workers = _zk.getChildrenSorted(QUALIFIER_LOCKS_ZK_PATH);
for (final String worker : workers) {
final String workerLockPath = QUALIFIER_LOCKS_ZK_PATH + "/" + worker;
final List<String> qualifiers = _zk.getChildrenSorted(workerLockPath);
for (final String qualifier : qualifiers) {
// don't use #getQualifierLockPath, because it's encoded
// already.
final String qualifierLockPath = workerLockPath + "/" + qualifier;
try {
final Stat stat = _zk.exists(qualifierLockPath);
if (stat != null) {
final long mtime = stat.getMtime();
final long now = System.currentTimeMillis();
if (now - mtime > timeToLiveMs) {
_zk.deleteNode(qualifierLockPath);
}
}
} catch (final Exception ex) {
_log.info("Could not remove qualifier lock " + qualifierLockPath + ". Will try again next time.", ex);
}
}
}
}
} catch (final Exception ex) {
_log.info("Failed to get qualifier locks.", ex);
}
}
/** {@inheritDoc} */
@Override
public Map<String, TaskCounter> getTaskCounters() throws TaskmanagerException {
try {
final Collection<String> queueNames = getQueueNames();
final Map<String, TaskCounter> counters = new HashMap<String, TaskCounter>();
for (final String queueName : queueNames) {
counters.put(queueName, getZkTaskQueue(queueName, true).getTaskCounter());
}
return counters;
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task counters", ex);
}
}
/** {@inheritDoc} */
@Override
public TaskList getTaskList(final String workerName, final String section, final int maxCount)
throws TaskmanagerException {
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, false);
if (taskQueue == null) {
throw new BadParameterTaskmanagerException("Worker name " + workerName + " is invalid.",
BadParameterTaskmanagerException.Cause.workerName);
}
return taskQueue.getTaskList(section, maxCount);
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task list for " + workerName + "/" + section, ex);
}
}
/** {@inheritDoc} */
@Override
public Any getTaskInfo(final String workerName, final String section, final String taskName)
throws TaskmanagerException {
try {
final ZkTaskQueue taskQueue = getZkTaskQueue(workerName, false);
if (taskQueue == null) {
throw new BadParameterTaskmanagerException("Task pipe " + workerName + " is invalid.",
BadParameterTaskmanagerException.Cause.workerName);
}
return taskQueue.getTaskInfo(section, taskName);
} catch (final Exception ex) {
throw new TaskmanagerException("Error getting task info for " + workerName + "/" + section + "/" + taskName,
ex);
}
}
/** {@inheritDoc} */
@Override
public long getFailSafetyLevel() {
return _zkService.getFailSafetyLevel();
}
/** {@inheritDoc} */
@Override
public void clear() throws TaskmanagerException {
try {
// clear tasks
for (final ZkTaskQueue queue : _taskQueues.values()) {
queue.purge();
}
// clear counters
_zk.deleteTree(HOST_COUNTER_MAP_ZK_PATH);
} catch (final Exception ex) {
throw new TaskmanagerException("Error clearing task storage", ex);
}
}
@Override
public List<Task> filterDuplicates(final List<Task> tasksToBeAdded) throws TaskmanagerException {
final List<Task> result = new ArrayList<>();
final Map<String, List<Task>> workerToTasksMap = new HashMap<>();
// sort tasks by worker
for (final Task task : tasksToBeAdded) {
final String worker = task.getWorkerName();
List<Task> workerTaskList = workerToTasksMap.get(worker);
if (workerTaskList == null) {
workerTaskList = new ArrayList<Task>();
workerToTasksMap.put(worker, workerTaskList);
}
workerTaskList.add(task);
}
// filter tasks for each worker's task queue
for (final Map.Entry<String, List<Task>> entry : workerToTasksMap.entrySet()) {
final ZkTaskQueue taskQueue = getZkTaskQueue(entry.getKey(), false);
final List<Task> unfilteredTasks = entry.getValue();
if (taskQueue != null) {
final List<Task> filteredTasks = taskQueue.filterDuplicates(unfilteredTasks);
result.addAll(filteredTasks);
} else {
result.addAll(unfilteredTasks);
}
}
if (tasksToBeAdded.size() != result.size()) {
_log.info("Returning " + result.size() + " filtered tasks for " + tasksToBeAdded.size() + " original tasks");
}
return result;
}
@Override
public void storeConfiguration(final AnyMap config) throws TaskmanagerException {
try {
_zk.ensurePathExists(TASK_HANDLER_CONFIGURATION);
final byte[] message = _anyWriter.writeBinaryObject(config);
_zk.setData(TASK_HANDLER_CONFIGURATION, message);
} catch (final Exception ex) {
throw new TaskmanagerException("Error saving taskmanager configuration", ex);
}
}
@Override
public AnyMap readConfiguration() throws TaskmanagerException {
Any result = DataFactory.DEFAULT.createAnyMap();
try {
final Stat statResult = _zk.exists(TASK_HANDLER_CONFIGURATION);
if (statResult.getDataLength() > 0) {
final byte[] message = _zk.getData(TASK_HANDLER_CONFIGURATION);
result = _anyReader.readBinaryObject(message);
}
} catch (final Exception ex) {
throw new TaskmanagerException("Error saving taskmanager configuration", ex);
}
return (AnyMap) result;
}
/**
* Get the ZkTaskQueue for the given taskPipe. If no ZkTaskQueue exists a new one is created
*
* @param taskPipe
* name of the task pipe
* @param create
* create ZkTaskQueue if not found. if false return null.
* @return a ZkTaskQueue
*/
private synchronized ZkTaskQueue getZkTaskQueue(final String taskPipe, final boolean create) {
ZkTaskQueue taskQueue = _taskQueues.get(taskPipe);
if (taskQueue == null && create) {
createTaskQueue(taskPipe);
taskQueue = _taskQueues.get(taskPipe);
}
return taskQueue;
}
/**
* @return local host name.
*/
private String getLocalHost() {
if (_localHost == null) {
_localHost = _clusterConfigService.getLocalHost();
}
return _localHost;
}
/**
* get queue names from ZooKeeper.
*
* @throws KeeperException
* error talking to Zk
*/
private Collection<String> getQueueNames() throws KeeperException {
_zk.ensurePathExists(ZkTaskQueue.TASKDIR_PREFIX);
return _zk.getChildrenSorted(ZkTaskQueue.TASKDIR_PREFIX);
}
/**
* @param zooKeeperService
* referenced service
*/
public void setZooKeeperService(final ZooKeeperService zooKeeperService) {
_zkService = zooKeeperService;
}
/**
*
* @param zooKeeperService
* referenced service
*/
public void unsetZooKeeperService(final ZooKeeperService zooKeeperService) {
if (_zkService == zooKeeperService) {
_zkService = null;
}
}
/**
* set new ClusterConfigService. To be called by DS runtime before activation.
*
* @param ccs
* new ClusterConfigService
*/
public void setClusterConfigService(final ClusterConfigService ccs) {
_clusterConfigService = ccs;
}
/**
* remove an ClusterConfigService. To be called by DS runtime after deactivation.
*
* @param ccs
* new ClusterConfigService
*/
public void unsetClusterConfigService(final ClusterConfigService ccs) {
if (_clusterConfigService == ccs) {
_clusterConfigService = null;
}
}
/** {@inheritDoc} */
@Override
public void removeTasks(final AnyMap filterMap) throws TaskmanagerException {
for (final Map.Entry<String, ZkTaskQueue> entry : _taskQueues.entrySet()) {
final ZkTaskQueue queue = entry.getValue();
queue.removeTasks(filterMap);
}
}
}