/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager.internal; | |
import java.util.Collection; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.clusterconfig.ClusterConfigService; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException; | |
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException.Cause; | |
import org.eclipse.smila.taskmanager.ResultDescription; | |
import org.eclipse.smila.taskmanager.Task; | |
import org.eclipse.smila.taskmanager.TaskCompletionStatus; | |
import org.eclipse.smila.taskmanager.TaskManager; | |
import org.eclipse.smila.taskmanager.TaskmanagerException; | |
import org.eclipse.smila.taskmanager.persistence.TaskCounter; | |
import org.eclipse.smila.taskmanager.persistence.TaskList; | |
import org.eclipse.smila.taskmanager.persistence.TaskStorage; | |
import org.osgi.service.component.ComponentContext; | |
/** | |
* The TaskManager implementation. | |
*/ | |
public class TaskManagerImpl implements TaskManager { | |
/** reference to the TaskStorage service. */ | |
private TaskStorage _taskStorage; | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** cluster configuration service. */ | |
private ClusterConfigService _clusterService; | |
/** | |
* OSGi Declarative Services service activation method. | |
* | |
* @param context | |
* OSGi service component context. | |
*/ | |
protected void activate(final ComponentContext context) { | |
_taskStorage.setMaxNoOfTasksPerHost(_clusterService.getMaxScaleUp()); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void addTask(final Task task) throws TaskmanagerException { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Taskmanager add task '" + task.getTaskId() + "' for worker '" + task.getWorkerName() + "'"); | |
} | |
task.setTaskCreatedProperties(); | |
if (hasObsoleteQualifier(task)) { | |
// if task qualifier was marked as obsolete earlier, finish task as obsolete immediately. | |
finishTaskAsObsolete(task); | |
} else { | |
_taskStorage.storeTask(task.getWorkerName(), task); | |
} | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void addTasks(final Collection<Task> taskList) throws TaskmanagerException { | |
for (final Task task : taskList) { | |
addTask(task); | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void addInProgressTask(final Task task) throws TaskmanagerException { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Taskmanager add inprogress task '" + task.getTaskId() + "' for worker '" + task.getWorkerName() | |
+ "'"); | |
} | |
task.setTaskCreatedProperties(); | |
_taskStorage.storeInProgressTask(task); | |
} | |
/** | |
* {@inheritDoc} | |
* | |
* @see TaskManager#getTask(String, Collection) | |
*/ | |
@Override | |
public Task getTask(final String workerName, final String host) throws TaskmanagerException { | |
return getTask(workerName, host, null); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public Task getTask(final String workerName, final String host, final Collection<String> qualifiers) | |
throws TaskmanagerException { | |
if (_log.isDebugEnabled()) { | |
String debugMessage = "Taskmanager get task for worker '" + workerName + "', host '" + host + "'"; | |
if (qualifiers != null) { | |
debugMessage += ", qualifiers = '" + qualifiers.toString() + "'."; | |
} | |
_log.debug(debugMessage); | |
} | |
checkWorkerName(workerName); | |
final Task newTask = _taskStorage.getTask(workerName, host, qualifiers); | |
if (newTask != null) { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Taskmanager got task '" + newTask.getTaskId() + "' for worker '" + workerName + "'."); | |
} | |
} else { | |
if (_log.isTraceEnabled()) { | |
_log.trace("Taskmanager returned no task for worker '" + workerName + "'."); | |
} | |
} | |
return newTask; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void finishTask(final String workerName, final String taskId, final ResultDescription resultDescription) | |
throws TaskmanagerException { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Taskmanager finish task '" + taskId + "' for worker '" + workerName + "'."); | |
} | |
checkWorkerName(workerName); | |
final Task task = _taskStorage.getInProgressTask(workerName, taskId); | |
if (task == null) { | |
throw new BadParameterTaskmanagerException("Task '" + taskId + "' for worker '" + workerName | |
+ "' is not in-progress anymore.", Cause.taskId); | |
} | |
if (FINISHING_TASKS_WORKER.equals(workerName)) { | |
finishTaskForFinishingWorker(task, resultDescription); | |
} else { | |
if (hasObsoleteQualifier(task)) { | |
// if task has survived in inprogress while qualifier was finished, finish it immediately now. | |
finishTaskAsObsolete(task); | |
} else if (resultDescription.getStatus() == TaskCompletionStatus.POSTPONE) { | |
// just put it back to the todo queue after changing some properties | |
postponeTask(task); | |
_taskStorage.storeTask(workerName, task); | |
} else { | |
// Move in-progress task to finishing todo queue. | |
// If any of the following actions fail, task will remain in-progress, and will be retried by TTL. | |
// Worst thing that may happen is a duplicated processing of a task. | |
final Task finishTask = task.createFinishTask(resultDescription, FINISHING_TASKS_WORKER); | |
_taskStorage.storeTask(FINISHING_TASKS_WORKER, finishTask); | |
} | |
_taskStorage.deleteTask(workerName, taskId); | |
} | |
} | |
/** | |
* @param task | |
* finishing-task to finish | |
* @param finishResultDescription | |
* the result of the finishing-task processing | |
*/ | |
private void finishTaskForFinishingWorker(final Task task, final ResultDescription finishResultDescription) | |
throws TaskmanagerException { | |
final TaskCompletionStatus finishStatus = finishResultDescription.getStatus(); | |
try { | |
_taskStorage.deleteTask(FINISHING_TASKS_WORKER, task.getTaskId()); | |
// in case of errors, finishing-task may be repeated | |
if (finishStatus == TaskCompletionStatus.RECOVERABLE_ERROR | |
|| finishStatus == TaskCompletionStatus.FATAL_ERROR) { | |
final ResultDescription originalResultDescription = task.getResultDescription(); | |
final TaskCompletionStatus originalStatus = originalResultDescription.getStatus(); | |
_log.warn("finishing-task '" + task.getTaskId() + "' has completion status '" + finishStatus | |
+ "', original task had completion status '" + originalStatus + "'"); | |
if (originalStatus == TaskCompletionStatus.OBSOLETE || originalStatus == TaskCompletionStatus.FATAL_ERROR | |
|| finishStatus == TaskCompletionStatus.FATAL_ERROR) { | |
_log.warn("Finishing-task will not be retried"); | |
} else { | |
_log.warn("Finishing-task will be retried"); | |
try { | |
_taskStorage.storeTask(FINISHING_TASKS_WORKER, task); | |
} catch (final TaskmanagerException e) { | |
_log.warn("Error while storing finishing-task '" + task.getTaskId() + "' in finishing-todo-queue", e); | |
} | |
} | |
} | |
} catch (final TaskmanagerException e) { | |
_log.warn("Error while deleting finishing-task '" + task.getTaskId() + "' with completion status '" | |
+ finishStatus + "' from finishing-inprogress-queue", e); | |
} | |
} | |
/** finish task as obsolete. */ | |
private void finishTaskAsObsolete(final Task task) throws TaskmanagerException { | |
task.setTaskStartedProperties(); // just for completeness. | |
final Task obsoleteTask = | |
task.createFinishTask(new ResultDescription(TaskCompletionStatus.OBSOLETE, null, null, null), | |
FINISHING_TASKS_WORKER); | |
_taskStorage.storeTask(FINISHING_TASKS_WORKER, obsoleteTask); | |
} | |
/** @return true if task has a qualifier that is locked in task storage. */ | |
private boolean hasObsoleteQualifier(final Task task) throws TaskmanagerException { | |
return task.getQualifier() != null && _taskStorage.isLockedQualifier(task.getWorkerName(), task.getQualifier()); | |
} | |
/** | |
* Changes properties for a postponed task. | |
* | |
* @param task | |
* the task to change | |
*/ | |
private void postponeTask(final Task task) { | |
task.getProperties().remove(Task.PROPERTY_START_TIME); | |
final String postponed = task.getProperties().get(Task.PROPERTY_POSTPONED); | |
if (postponed != null) { | |
int postponedInt = Integer.parseInt(postponed); | |
postponedInt++; | |
task.getProperties().put(Task.PROPERTY_POSTPONED, Integer.toString(postponedInt)); | |
} else { | |
task.getProperties().put(Task.PROPERTY_POSTPONED, "1"); | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void keepAlive(final String workerName, final String taskId) throws TaskmanagerException { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Taskmanager keepAlive for worker '" + workerName + "', taskId '" + taskId + "'."); | |
} | |
checkWorkerName(workerName); | |
_taskStorage.keepAlive(workerName, taskId); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void finishTasks(final String workerName, final Collection<String> qualifiers, | |
final ResultDescription resultDescription) throws TaskmanagerException { | |
if (qualifiers == null) { | |
throw new IllegalArgumentException("qualifiers must not be a null value"); | |
} | |
if (_log.isDebugEnabled()) { | |
_log.debug("Taskmanager finish tasks for worker '" + workerName + "' and qualifiers = '" | |
+ qualifiers.toString() + "' as obsolete."); | |
} | |
checkWorkerName(workerName); | |
_taskStorage.lockQualifiers(workerName, qualifiers); | |
Task task = null; | |
do { | |
task = getTask(workerName, null, qualifiers); | |
if (task != null) { | |
try { | |
if (_log.isTraceEnabled()) { | |
_log.trace(" finish task '" + task.getTaskId() + "', qualifier: '" + task.getQualifier() + "'."); | |
} | |
finishTask(workerName, task.getTaskId(), resultDescription); | |
} catch (final Exception ex) { | |
_log.warn("Finishing task " + task.getTaskId() + " for worker " + workerName + ".", ex); | |
} | |
} | |
} while (task != null); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void removeTasks(final AnyMap filterMap) throws TaskmanagerException { | |
_taskStorage.removeTasks(filterMap); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void addTaskQueue(final String workerName) throws TaskmanagerException { | |
_taskStorage.createTaskQueue(workerName); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Map<String, TaskCounter> getTaskCounters() throws TaskmanagerException { | |
return _taskStorage.getTaskCounters(); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public TaskList getTaskList(final String workerName, final String section, final int maxCount) | |
throws TaskmanagerException { | |
checkWorkerName(workerName); | |
return _taskStorage.getTaskList(workerName, section, maxCount); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Any getTaskInfo(final String workerName, final String section, final String taskName) | |
throws TaskmanagerException { | |
checkWorkerName(workerName); | |
return _taskStorage.getTaskInfo(workerName, section, taskName); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public long getFailSafetyLevel() { | |
return _taskStorage.getFailSafetyLevel(); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public long getMaxScaleUp() { | |
return _taskStorage.getMaxNoOfTasksPerHost(); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public Map<String, Integer> getScaleUpCounters() throws TaskmanagerException { | |
final Map<String, Integer> scaleUpCounters = _taskStorage.getScaleUpCounters(); | |
try { | |
if (_clusterService.isConfigured()) { | |
final List<String> clusterNodes = _clusterService.getClusterNodes(); | |
if (clusterNodes != null && clusterNodes.size() > 1) { | |
final Map<String, Integer> sortedCounters = new LinkedHashMap<String, Integer>(); | |
for (final String cluster : clusterNodes) { | |
final Integer counter = scaleUpCounters.get(cluster); | |
if (counter != null) { | |
sortedCounters.put(cluster, counter); | |
} | |
} | |
return sortedCounters; | |
} | |
} | |
} catch (final Exception e) { | |
;// if something goes wrong during sorting return original scaleUpCounters | |
} | |
return scaleUpCounters; | |
} | |
/** | |
* checks workerName Parameter. | |
* | |
* @param workerName | |
* the workerName | |
* @throws TaskmanagerException | |
* the worker does not exist or the workerName parameter is invalid | |
*/ | |
private void checkWorkerName(final String workerName) throws TaskmanagerException { | |
if (workerName == null) { | |
final String msg = "parameter workerName is null"; | |
if (_log.isDebugEnabled()) { | |
_log.error(msg); | |
} | |
throw new BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.workerName); | |
} | |
if (workerName.trim().length() == 0) { | |
final String msg = "parameter workerName is an empty String"; | |
if (_log.isDebugEnabled()) { | |
_log.error(msg); | |
} | |
throw new BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.workerName); | |
} | |
// if worker is not internal, it has to be registered (via addWorker()) before. | |
if (!isInternalWorker(workerName) && !_taskStorage.hasTaskQueue(workerName)) { | |
final String msg = "Worker '" + workerName + "' does not exist."; | |
if (_log.isDebugEnabled()) { | |
_log.error(msg); | |
} | |
throw new BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.workerName); | |
} | |
} | |
/** | |
* @param worker | |
* @return 'true', if worker is an internal worker, false 'otherwise'. | |
*/ | |
private boolean isInternalWorker(final String worker) { | |
return worker.startsWith(TaskManager.PREFIX_INTERNAL); | |
} | |
// ---- OSGI stuff goes here. ---- | |
/** | |
* @param taskStorage | |
* the new task storage | |
*/ | |
public void setTaskStorage(final TaskStorage taskStorage) { | |
_taskStorage = taskStorage; | |
} | |
/** | |
* @param taskStorage | |
* the task storage | |
*/ | |
public void unsetTaskStorage(final TaskStorage taskStorage) { | |
if (taskStorage == _taskStorage) { | |
_taskStorage = null; | |
} | |
} | |
/** | |
* needed to get cluster configuration. | |
*/ | |
public void setClusterConfigService(final ClusterConfigService ccs) { | |
_clusterService = ccs; | |
} | |
/** | |
* @param ccs | |
* the cluster config service to unset | |
*/ | |
public void unsetClusterConfigService(final ClusterConfigService ccs) { | |
if (_clusterService == ccs) { | |
_clusterService = null; | |
} | |
} | |
} |