blob: ec52a6c497ad41a9fca506a5d795d46e04dbfba8 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager.internal;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.clusterconfig.ClusterConfigService;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException;
import org.eclipse.smila.taskmanager.BadParameterTaskmanagerException.Cause;
import org.eclipse.smila.taskmanager.ResultDescription;
import org.eclipse.smila.taskmanager.Task;
import org.eclipse.smila.taskmanager.TaskCompletionStatus;
import org.eclipse.smila.taskmanager.TaskManager;
import org.eclipse.smila.taskmanager.TaskmanagerException;
import org.eclipse.smila.taskmanager.persistence.TaskCounter;
import org.eclipse.smila.taskmanager.persistence.TaskList;
import org.eclipse.smila.taskmanager.persistence.TaskStorage;
import org.osgi.service.component.ComponentContext;
/**
* The TaskManager implementation.
*/
public class TaskManagerImpl implements TaskManager {
/** reference to the TaskStorage service. */
private TaskStorage _taskStorage;
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** cluster configuration service. */
private ClusterConfigService _clusterService;
/**
* OSGi Declarative Services service activation method.
*
* @param context
* OSGi service component context.
*/
protected void activate(final ComponentContext context) {
_taskStorage.setMaxNoOfTasksPerHost(_clusterService.getMaxScaleUp());
}
/**
* {@inheritDoc}
*/
@Override
public void addTask(final Task task) throws TaskmanagerException {
if (_log.isDebugEnabled()) {
_log.debug("Taskmanager add task '" + task.getTaskId() + "' for worker '" + task.getWorkerName() + "'");
}
task.setTaskCreatedProperties();
if (hasObsoleteQualifier(task)) {
// if task qualifier was marked as obsolete earlier, finish task as obsolete immediately.
finishTaskAsObsolete(task);
} else {
_taskStorage.storeTask(task.getWorkerName(), task);
}
}
/** {@inheritDoc} */
@Override
public void addTasks(final Collection<Task> taskList) throws TaskmanagerException {
for (final Task task : taskList) {
addTask(task);
}
}
/**
* {@inheritDoc}
*/
@Override
public void addInProgressTask(final Task task) throws TaskmanagerException {
if (_log.isDebugEnabled()) {
_log.debug("Taskmanager add inprogress task '" + task.getTaskId() + "' for worker '" + task.getWorkerName()
+ "'");
}
task.setTaskCreatedProperties();
_taskStorage.storeInProgressTask(task);
}
/**
* {@inheritDoc}
*
* @see TaskManager#getTask(String, Collection)
*/
@Override
public Task getTask(final String workerName, final String host) throws TaskmanagerException {
return getTask(workerName, host, null);
}
/**
* {@inheritDoc}
*/
@Override
public Task getTask(final String workerName, final String host, final Collection<String> qualifiers)
throws TaskmanagerException {
if (_log.isDebugEnabled()) {
String debugMessage = "Taskmanager get task for worker '" + workerName + "', host '" + host + "'";
if (qualifiers != null) {
debugMessage += ", qualifiers = '" + qualifiers.toString() + "'.";
}
_log.debug(debugMessage);
}
checkWorkerName(workerName);
final Task newTask = _taskStorage.getTask(workerName, host, qualifiers);
if (newTask != null) {
if (_log.isDebugEnabled()) {
_log.debug("Taskmanager got task '" + newTask.getTaskId() + "' for worker '" + workerName + "'.");
}
} else {
if (_log.isTraceEnabled()) {
_log.trace("Taskmanager returned no task for worker '" + workerName + "'.");
}
}
return newTask;
}
/**
* {@inheritDoc}
*/
@Override
public void finishTask(final String workerName, final String taskId, final ResultDescription resultDescription)
throws TaskmanagerException {
if (_log.isDebugEnabled()) {
_log.debug("Taskmanager finish task '" + taskId + "' for worker '" + workerName + "'.");
}
checkWorkerName(workerName);
final Task task = _taskStorage.getInProgressTask(workerName, taskId);
if (task == null) {
throw new BadParameterTaskmanagerException("Task '" + taskId + "' for worker '" + workerName
+ "' is not in-progress anymore.", Cause.taskId);
}
if (FINISHING_TASKS_WORKER.equals(workerName)) {
finishTaskForFinishingWorker(task, resultDescription);
} else {
if (hasObsoleteQualifier(task)) {
// if task has survived in inprogress while qualifier was finished, finish it immediately now.
finishTaskAsObsolete(task);
} else if (resultDescription.getStatus() == TaskCompletionStatus.POSTPONE) {
// just put it back to the todo queue after changing some properties
postponeTask(task);
_taskStorage.storeTask(workerName, task);
} else {
// Move in-progress task to finishing todo queue.
// If any of the following actions fail, task will remain in-progress, and will be retried by TTL.
// Worst thing that may happen is a duplicated processing of a task.
final Task finishTask = task.createFinishTask(resultDescription, FINISHING_TASKS_WORKER);
_taskStorage.storeTask(FINISHING_TASKS_WORKER, finishTask);
}
_taskStorage.deleteTask(workerName, taskId);
}
}
/**
* @param task
* finishing-task to finish
* @param finishResultDescription
* the result of the finishing-task processing
*/
private void finishTaskForFinishingWorker(final Task task, final ResultDescription finishResultDescription)
throws TaskmanagerException {
final TaskCompletionStatus finishStatus = finishResultDescription.getStatus();
try {
_taskStorage.deleteTask(FINISHING_TASKS_WORKER, task.getTaskId());
// in case of errors, finishing-task may be repeated
if (finishStatus == TaskCompletionStatus.RECOVERABLE_ERROR
|| finishStatus == TaskCompletionStatus.FATAL_ERROR) {
final ResultDescription originalResultDescription = task.getResultDescription();
final TaskCompletionStatus originalStatus = originalResultDescription.getStatus();
_log.warn("finishing-task '" + task.getTaskId() + "' has completion status '" + finishStatus
+ "', original task had completion status '" + originalStatus + "'");
if (originalStatus == TaskCompletionStatus.OBSOLETE || originalStatus == TaskCompletionStatus.FATAL_ERROR
|| finishStatus == TaskCompletionStatus.FATAL_ERROR) {
_log.warn("Finishing-task will not be retried");
} else {
_log.warn("Finishing-task will be retried");
try {
_taskStorage.storeTask(FINISHING_TASKS_WORKER, task);
} catch (final TaskmanagerException e) {
_log.warn("Error while storing finishing-task '" + task.getTaskId() + "' in finishing-todo-queue", e);
}
}
}
} catch (final TaskmanagerException e) {
_log.warn("Error while deleting finishing-task '" + task.getTaskId() + "' with completion status '"
+ finishStatus + "' from finishing-inprogress-queue", e);
}
}
/** finish task as obsolete. */
private void finishTaskAsObsolete(final Task task) throws TaskmanagerException {
task.setTaskStartedProperties(); // just for completeness.
final Task obsoleteTask =
task.createFinishTask(new ResultDescription(TaskCompletionStatus.OBSOLETE, null, null, null),
FINISHING_TASKS_WORKER);
_taskStorage.storeTask(FINISHING_TASKS_WORKER, obsoleteTask);
}
/** @return true if task has a qualifier that is locked in task storage. */
private boolean hasObsoleteQualifier(final Task task) throws TaskmanagerException {
return task.getQualifier() != null && _taskStorage.isLockedQualifier(task.getWorkerName(), task.getQualifier());
}
/**
* Changes properties for a postponed task.
*
* @param task
* the task to change
*/
private void postponeTask(final Task task) {
task.getProperties().remove(Task.PROPERTY_START_TIME);
final String postponed = task.getProperties().get(Task.PROPERTY_POSTPONED);
if (postponed != null) {
int postponedInt = Integer.parseInt(postponed);
postponedInt++;
task.getProperties().put(Task.PROPERTY_POSTPONED, Integer.toString(postponedInt));
} else {
task.getProperties().put(Task.PROPERTY_POSTPONED, "1");
}
}
/**
* {@inheritDoc}
*/
@Override
public void keepAlive(final String workerName, final String taskId) throws TaskmanagerException {
if (_log.isDebugEnabled()) {
_log.debug("Taskmanager keepAlive for worker '" + workerName + "', taskId '" + taskId + "'.");
}
checkWorkerName(workerName);
_taskStorage.keepAlive(workerName, taskId);
}
/** {@inheritDoc} */
@Override
public void finishTasks(final String workerName, final Collection<String> qualifiers,
final ResultDescription resultDescription) throws TaskmanagerException {
if (qualifiers == null) {
throw new IllegalArgumentException("qualifiers must not be a null value");
}
if (_log.isDebugEnabled()) {
_log.debug("Taskmanager finish tasks for worker '" + workerName + "' and qualifiers = '"
+ qualifiers.toString() + "' as obsolete.");
}
checkWorkerName(workerName);
_taskStorage.lockQualifiers(workerName, qualifiers);
Task task = null;
do {
task = getTask(workerName, null, qualifiers);
if (task != null) {
try {
if (_log.isTraceEnabled()) {
_log.trace(" finish task '" + task.getTaskId() + "', qualifier: '" + task.getQualifier() + "'.");
}
finishTask(workerName, task.getTaskId(), resultDescription);
} catch (final Exception ex) {
_log.warn("Finishing task " + task.getTaskId() + " for worker " + workerName + ".", ex);
}
}
} while (task != null);
}
/** {@inheritDoc} */
@Override
public void removeTasks(final AnyMap filterMap) throws TaskmanagerException {
_taskStorage.removeTasks(filterMap);
}
/** {@inheritDoc} */
@Override
public void addTaskQueue(final String workerName) throws TaskmanagerException {
_taskStorage.createTaskQueue(workerName);
}
/** {@inheritDoc} */
@Override
public Map<String, TaskCounter> getTaskCounters() throws TaskmanagerException {
return _taskStorage.getTaskCounters();
}
/** {@inheritDoc} */
@Override
public TaskList getTaskList(final String workerName, final String section, final int maxCount)
throws TaskmanagerException {
checkWorkerName(workerName);
return _taskStorage.getTaskList(workerName, section, maxCount);
}
/** {@inheritDoc} */
@Override
public Any getTaskInfo(final String workerName, final String section, final String taskName)
throws TaskmanagerException {
checkWorkerName(workerName);
return _taskStorage.getTaskInfo(workerName, section, taskName);
}
/** {@inheritDoc} */
@Override
public long getFailSafetyLevel() {
return _taskStorage.getFailSafetyLevel();
}
/** {@inheritDoc} */
@Override
public long getMaxScaleUp() {
return _taskStorage.getMaxNoOfTasksPerHost();
}
/** {@inheritDoc} */
@Override
public Map<String, Integer> getScaleUpCounters() throws TaskmanagerException {
final Map<String, Integer> scaleUpCounters = _taskStorage.getScaleUpCounters();
try {
if (_clusterService.isConfigured()) {
final List<String> clusterNodes = _clusterService.getClusterNodes();
if (clusterNodes != null && clusterNodes.size() > 1) {
final Map<String, Integer> sortedCounters = new LinkedHashMap<String, Integer>();
for (final String cluster : clusterNodes) {
final Integer counter = scaleUpCounters.get(cluster);
if (counter != null) {
sortedCounters.put(cluster, counter);
}
}
return sortedCounters;
}
}
} catch (final Exception e) {
;// if something goes wrong during sorting return original scaleUpCounters
}
return scaleUpCounters;
}
/**
* checks workerName Parameter.
*
* @param workerName
* the workerName
* @throws TaskmanagerException
* the worker does not exist or the workerName parameter is invalid
*/
private void checkWorkerName(final String workerName) throws TaskmanagerException {
if (workerName == null) {
final String msg = "parameter workerName is null";
if (_log.isDebugEnabled()) {
_log.error(msg);
}
throw new BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.workerName);
}
if (workerName.trim().length() == 0) {
final String msg = "parameter workerName is an empty String";
if (_log.isDebugEnabled()) {
_log.error(msg);
}
throw new BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.workerName);
}
// if worker is not internal, it has to be registered (via addWorker()) before.
if (!isInternalWorker(workerName) && !_taskStorage.hasTaskQueue(workerName)) {
final String msg = "Worker '" + workerName + "' does not exist.";
if (_log.isDebugEnabled()) {
_log.error(msg);
}
throw new BadParameterTaskmanagerException(msg, BadParameterTaskmanagerException.Cause.workerName);
}
}
/**
* @param worker
* @return 'true', if worker is an internal worker, false 'otherwise'.
*/
private boolean isInternalWorker(final String worker) {
return worker.startsWith(TaskManager.PREFIX_INTERNAL);
}
// ---- OSGI stuff goes here. ----
/**
* @param taskStorage
* the new task storage
*/
public void setTaskStorage(final TaskStorage taskStorage) {
_taskStorage = taskStorage;
}
/**
* @param taskStorage
* the task storage
*/
public void unsetTaskStorage(final TaskStorage taskStorage) {
if (taskStorage == _taskStorage) {
_taskStorage = null;
}
}
/**
* needed to get cluster configuration.
*/
public void setClusterConfigService(final ClusterConfigService ccs) {
_clusterService = ccs;
}
/**
* @param ccs
* the cluster config service to unset
*/
public void unsetClusterConfigService(final ClusterConfigService ccs) {
if (_clusterService == ccs) {
_clusterService = null;
}
}
}