/******************************************************************************* | |
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This | |
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which | |
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: | |
* <ul> | |
* <li>Andreas Weber (Empolis Information Management GmbH) - initial implementation | |
* <li>Juergen Schumacher (Empolis Information Management GmbH) - fixes | |
* </ul> | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.persistence.zk; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.clusterconfig.ClusterConfigService; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
import org.eclipse.smila.taskmanager.TaskManager; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
import org.eclipse.smila.zookeeper.ZkConnection; | |
import org.eclipse.smila.zookeeper.ZkLock; | |
import org.eclipse.smila.zookeeper.ZooKeeperService; | |
import org.osgi.service.component.ComponentContext; | |
/** | |
* Runnable class that checks if tasks that are in progress have timed out. | |
*/ | |
public class TaskWatcher extends TimerTask { | |
/** Constant to convert from seconds to milliseconds. */ | |
private static final long MS_PER_SEC = 1000; | |
/** Time to wait between invocations of method work(). */ | |
private static final long DEFAULT_SLEEPYTIME = 10; | |
/** time-to-live for empty nodes (stale tasks or unused todo_part subdirs). */ | |
private static final long EMPTY_NODE_TTL = 15 * 60 * MS_PER_SEC; | |
/** zookeeper node used for locking. */ | |
private static final String WATCHER_ZNODE = "/smila/taskmanager/watcher"; | |
/** Reference to a TaskStorageZk. */ | |
private TaskStorageZk _taskStorage; | |
/** Reference to taskmanager. */ | |
private TaskManager _taskManager; | |
/** ZooKeeperService. */ | |
private ZooKeeperService _zkService; | |
/** service reference to cluster config service to read job property. */ | |
private ClusterConfigService _clusterConfigService; | |
/** Zookeeper connection. */ | |
private ZkConnection _zk; | |
/** Flag if the InProgressTaskWatcher was stopped. */ | |
private volatile boolean _stopped; | |
/** counter to clean unused empty z-nodes for all todo and todopart queues. */ | |
private long _lastQueueCleanupTime; | |
/** schedules repeated execution. */ | |
private final Timer _timer = new Timer("TaskStorageZk-TaskWatcher", true); | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** | |
* initialize TaskManager internally on service start. | |
* | |
* @param context | |
* OSGi service context. | |
*/ | |
protected void activate(final ComponentContext context) { | |
try { | |
_zk = new ZkConnection(_zkService); | |
if (_log.isDebugEnabled()) { | |
_log.debug("TaskManager activate() successful"); | |
} | |
final long sleepytime = Math.min(DEFAULT_SLEEPYTIME, _clusterConfigService.getTimeToLive()); | |
if (_log.isInfoEnabled()) { | |
_log.info("Checking task storage each " + sleepytime + " seconds, timeToLive is " | |
+ _clusterConfigService.getTimeToLive() + " seconds"); | |
} | |
final long sleepytimeMs = MS_PER_SEC * sleepytime; | |
_timer.schedule(this, sleepytimeMs, sleepytimeMs); | |
} catch (final Throwable e) { | |
final String msg = "Error while starting TaskWatcher for TaskStorageZk."; | |
if (_log.isWarnEnabled()) { | |
_log.warn(msg, e); | |
} | |
throw new RuntimeException(msg, e); | |
} | |
} | |
/** OSGi deactivate bundle. */ | |
protected void deactivate(final ComponentContext context) { | |
cancel(); | |
_timer.cancel(); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void run() { | |
try { | |
final ZkLock lock = acquireLock(); | |
if (lock != null) { | |
try { | |
if (!_stopped) { | |
checkInProgressTasks(); | |
} | |
if (!_stopped) { | |
checkEmptyNodes(); | |
} | |
} finally { | |
releaseLock(lock); | |
} | |
} | |
} catch (final Exception ex) { | |
_log.warn("Error during work", ex); | |
} | |
} | |
/** | |
* @return 'true' if we could successful set a lock, 'false' if lock is already set. | |
* @throws Exception | |
* error while trying to get the lock | |
*/ | |
private ZkLock acquireLock() throws Exception { | |
_zk.ensurePathExists(WATCHER_ZNODE); | |
final ZkLock zkLock = new ZkLock(_zk, WATCHER_ZNODE); | |
if (zkLock.tryLock()) { | |
return zkLock; | |
} | |
return null; | |
} | |
/** | |
* remove lock after finishing. | |
*/ | |
private void releaseLock(final ZkLock zkLock) { | |
if (zkLock != null) { | |
zkLock.unlock(); | |
} | |
} | |
/** | |
* Iterates over all ZkTaskQueues managed by the TaskStorageZk and checks if any task has timed out (reached its Time | |
* To Live). | |
*/ | |
private void checkInProgressTasks() { | |
try { | |
final long timeToLiveMs = _clusterConfigService.getTimeToLive() * MS_PER_SEC; | |
final Collection<ZkTaskQueue> zkTaskQueues = new ArrayList<ZkTaskQueue>(_taskStorage.getTaskQueues()); | |
for (final ZkTaskQueue zkTaskQueue : zkTaskQueues) { | |
if (_stopped) { | |
break; | |
} | |
final Collection<String> taskIds = zkTaskQueue.getTimedOutTasks(timeToLiveMs); | |
final String workerName = zkTaskQueue.getWorkerName(); | |
for (final String taskId : taskIds) { | |
try { | |
_log.warn("Reached timeout for task " + taskId + ", task will be retried"); | |
_taskManager.finishTask(workerName, taskId, createRecoverableErrorResult()); | |
} catch (final Exception ex) { | |
_log.warn("Error rolling back task " + taskId + " for worker " + workerName); | |
} | |
} | |
} | |
} catch (final Exception ex) { | |
_log.warn("Error during work", ex); | |
} | |
} | |
/** create result description for recoverable error. */ | |
private ResultDescription createRecoverableErrorResult() { | |
return new ResultDescription(TaskCompletionStatus.RECOVERABLE_ERROR, TaskManager.TASKERROR_TIME_TO_LIVE, | |
"No keepAlive was sent for longer than " + _clusterConfigService.getTimeToLive() + " seconds.", null); | |
} | |
/** | |
* Iterates over all ZkTaskQueues managed by the TaskStorageZk and checks if any node in todo_part is empty and can be | |
* remove from memory data tree. | |
*/ | |
private void checkEmptyNodes() { | |
final long now = System.currentTimeMillis(); | |
if (now - _lastQueueCleanupTime > EMPTY_NODE_TTL) { | |
try { | |
final Collection<ZkTaskQueue> zkTaskQueues = _taskStorage.getTaskQueues(); | |
for (final ZkTaskQueue zkTaskQueue : zkTaskQueues) { | |
if (_stopped) { | |
break; | |
} | |
zkTaskQueue.cleanEmptyNodes(EMPTY_NODE_TTL); | |
} | |
} catch (final Exception ex) { | |
_log.warn("Error during clean up of empty nodes.", ex); | |
} finally { | |
_lastQueueCleanupTime = System.currentTimeMillis(); | |
} | |
_taskStorage.checkQualifierLockAge(EMPTY_NODE_TTL); | |
} | |
} | |
/** | |
* Stop execution of the TaskWatcher. | |
*/ | |
@Override | |
public synchronized boolean cancel() { | |
_stopped = true; | |
return true; | |
} | |
/** | |
* @param taskStorage | |
* the new task storage | |
*/ | |
public void setTaskStorage(final TaskStorage taskStorage) { | |
if (taskStorage instanceof TaskStorageZk) { | |
_taskStorage = (TaskStorageZk) taskStorage; | |
} | |
} | |
/** | |
* @param taskStorage | |
* the task storage | |
*/ | |
public void unsetTaskStorage(final TaskStorage taskStorage) { | |
if (taskStorage == _taskStorage) { | |
_taskStorage = null; | |
} | |
} | |
/** | |
* @param taskManager | |
* the new task manager | |
*/ | |
public void setTaskManager(final TaskManager taskManager) { | |
_taskManager = taskManager; | |
} | |
/** | |
* @param taskManager | |
* the task manager | |
*/ | |
public void unsetTaskManager(final TaskManager taskManager) { | |
if (taskManager == _taskManager) { | |
_taskManager = null; | |
} | |
} | |
/** | |
* @param zooKeeperService | |
* referenced service | |
*/ | |
public void setZooKeeperService(final ZooKeeperService zooKeeperService) { | |
_zkService = zooKeeperService; | |
} | |
/** | |
* | |
* @param zooKeeperService | |
* referenced service | |
*/ | |
public void unsetZooKeeperService(final ZooKeeperService zooKeeperService) { | |
if (_zkService == zooKeeperService) { | |
_zkService = null; | |
} | |
} | |
/** | |
* method for DS to set a service reference. | |
*/ | |
public void setClusterConfigService(final ClusterConfigService clusterConfigService) { | |
_clusterConfigService = clusterConfigService; | |
} | |
/** | |
* method for DS to unset a service reference. | |
*/ | |
public void unsetClusterConfigService(final ClusterConfigService clusterConfigService) { | |
if (_clusterConfigService == clusterConfigService) { | |
_clusterConfigService = null; | |
} | |
} | |
} |