blob: b225bf00d09011eef658f2fb11bf86480cad3fb9 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* <ul>
* <li>Andreas Weber (Empolis Information Management GmbH) - initial implementation
* <li>Juergen Schumacher (Empolis Information Management GmbH) - fixes
* </ul>
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.persistence.zk;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.clusterconfig.ClusterConfigService;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
import org.eclipse.smila.taskmanager.TaskManager;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZkLock;
import org.eclipse.smila.zookeeper.ZooKeeperService;
import org.osgi.service.component.ComponentContext;
/**
* Runnable class that checks if tasks that are in progress have timed out.
*/
public class TaskWatcher extends TimerTask {
/** Constant to convert from seconds to milliseconds. */
private static final long MS_PER_SEC = 1000;
/** Time to wait between invocations of method work(). */
private static final long DEFAULT_SLEEPYTIME = 10;
/** time-to-live for empty nodes (stale tasks or unused todo_part subdirs). */
private static final long EMPTY_NODE_TTL = 15 * 60 * MS_PER_SEC;
/** zookeeper node used for locking. */
private static final String WATCHER_ZNODE = "/smila/taskmanager/watcher";
/** Reference to a TaskStorageZk. */
private TaskStorageZk _taskStorage;
/** Reference to taskmanager. */
private TaskManager _taskManager;
/** ZooKeeperService. */
private ZooKeeperService _zkService;
/** service reference to cluster config service to read job property. */
private ClusterConfigService _clusterConfigService;
/** Zookeeper connection. */
private ZkConnection _zk;
/** Flag if the InProgressTaskWatcher was stopped. */
private volatile boolean _stopped;
/** counter to clean unused empty z-nodes for all todo and todopart queues. */
private long _lastQueueCleanupTime;
/** schedules repeated execution. */
private final Timer _timer = new Timer("TaskStorageZk-TaskWatcher", true);
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/**
* initialize TaskManager internally on service start.
*
* @param context
* OSGi service context.
*/
protected void activate(final ComponentContext context) {
try {
_zk = new ZkConnection(_zkService);
if (_log.isDebugEnabled()) {
_log.debug("TaskManager activate() successful");
}
final long sleepytime = Math.min(DEFAULT_SLEEPYTIME, _clusterConfigService.getTimeToLive());
if (_log.isInfoEnabled()) {
_log.info("Checking task storage each " + sleepytime + " seconds, timeToLive is "
+ _clusterConfigService.getTimeToLive() + " seconds");
}
final long sleepytimeMs = MS_PER_SEC * sleepytime;
_timer.schedule(this, sleepytimeMs, sleepytimeMs);
} catch (final Throwable e) {
final String msg = "Error while starting TaskWatcher for TaskStorageZk.";
if (_log.isWarnEnabled()) {
_log.warn(msg, e);
}
throw new RuntimeException(msg, e);
}
}
/** OSGi deactivate bundle. */
protected void deactivate(final ComponentContext context) {
cancel();
_timer.cancel();
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
try {
final ZkLock lock = acquireLock();
if (lock != null) {
try {
if (!_stopped) {
checkInProgressTasks();
}
if (!_stopped) {
checkEmptyNodes();
}
} finally {
releaseLock(lock);
}
}
} catch (final Exception ex) {
_log.warn("Error during work", ex);
}
}
/**
* @return 'true' if we could successful set a lock, 'false' if lock is already set.
* @throws Exception
* error while trying to get the lock
*/
private ZkLock acquireLock() throws Exception {
_zk.ensurePathExists(WATCHER_ZNODE);
final ZkLock zkLock = new ZkLock(_zk, WATCHER_ZNODE);
if (zkLock.tryLock()) {
return zkLock;
}
return null;
}
/**
* remove lock after finishing.
*/
private void releaseLock(final ZkLock zkLock) {
if (zkLock != null) {
zkLock.unlock();
}
}
/**
* Iterates over all ZkTaskQueues managed by the TaskStorageZk and checks if any task has timed out (reached its Time
* To Live).
*/
private void checkInProgressTasks() {
try {
final long timeToLiveMs = _clusterConfigService.getTimeToLive() * MS_PER_SEC;
final Collection<ZkTaskQueue> zkTaskQueues = new ArrayList<ZkTaskQueue>(_taskStorage.getTaskQueues());
for (final ZkTaskQueue zkTaskQueue : zkTaskQueues) {
if (_stopped) {
break;
}
final Collection<String> taskIds = zkTaskQueue.getTimedOutTasks(timeToLiveMs);
final String workerName = zkTaskQueue.getWorkerName();
for (final String taskId : taskIds) {
try {
_log.warn("Reached timeout for task " + taskId + ", task will be retried");
_taskManager.finishTask(workerName, taskId, createRecoverableErrorResult());
} catch (final Exception ex) {
_log.warn("Error rolling back task " + taskId + " for worker " + workerName);
}
}
}
} catch (final Exception ex) {
_log.warn("Error during work", ex);
}
}
/** create result description for recoverable error. */
private ResultDescription createRecoverableErrorResult() {
return new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, TaskManager.TASKERROR_TIME_TO_LIVE,
"No keepAlive was sent for longer than " + _clusterConfigService.getTimeToLive() + " seconds.", null);
}
/**
* Iterates over all ZkTaskQueues managed by the TaskStorageZk and checks if any node in todo_part is empty and can be
* remove from memory data tree.
*/
private void checkEmptyNodes() {
final long now = System.currentTimeMillis();
if (now - _lastQueueCleanupTime > EMPTY_NODE_TTL) {
try {
final Collection<ZkTaskQueue> zkTaskQueues = _taskStorage.getTaskQueues();
for (final ZkTaskQueue zkTaskQueue : zkTaskQueues) {
if (_stopped) {
break;
}
zkTaskQueue.cleanEmptyNodes(EMPTY_NODE_TTL);
}
} catch (final Exception ex) {
_log.warn("Error during clean up of empty nodes.", ex);
} finally {
_lastQueueCleanupTime = System.currentTimeMillis();
}
_taskStorage.checkQualifierLockAge(EMPTY_NODE_TTL);
}
}
/**
* Stop execution of the TaskWatcher.
*/
@Override
public synchronized boolean cancel() {
_stopped = true;
return true;
}
/**
* @param taskStorage
* the new task storage
*/
public void setTaskStorage(final TaskStorage taskStorage) {
if (taskStorage instanceof TaskStorageZk) {
_taskStorage = (TaskStorageZk) taskStorage;
}
}
/**
* @param taskStorage
* the task storage
*/
public void unsetTaskStorage(final TaskStorage taskStorage) {
if (taskStorage == _taskStorage) {
_taskStorage = null;
}
}
/**
* @param taskManager
* the new task manager
*/
public void setTaskManager(final TaskManager taskManager) {
_taskManager = taskManager;
}
/**
* @param taskManager
* the task manager
*/
public void unsetTaskManager(final TaskManager taskManager) {
if (taskManager == _taskManager) {
_taskManager = null;
}
}
/**
* @param zooKeeperService
* referenced service
*/
public void setZooKeeperService(final ZooKeeperService zooKeeperService) {
_zkService = zooKeeperService;
}
/**
*
* @param zooKeeperService
* referenced service
*/
public void unsetZooKeeperService(final ZooKeeperService zooKeeperService) {
if (_zkService == zooKeeperService) {
_zkService = null;
}
}
/**
* method for DS to set a service reference.
*/
public void setClusterConfigService(final ClusterConfigService clusterConfigService) {
_clusterConfigService = clusterConfigService;
}
/**
* method for DS to unset a service reference.
*/
public void unsetClusterConfigService(final ClusterConfigService clusterConfigService) {
if (_clusterConfigService == clusterConfigService) {
_clusterConfigService = null;
}
}
}