blob: 3fdbc87df4b3a484d543932e8633ef9705565e6a [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.taskmanager;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyConvertException;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.ValueFormatHelper;
/**
* Task class.
*/
public class Task {
/** The key for the job name in the task properties. */
public static final String PROPERTY_JOB_NAME = "jobName";
/** The key for the job run id in the task properties. */
public static final String PROPERTY_JOB_RUN_ID = "jobRunId";
/** The key for the job id in the task properties. */
public static final String PROPERTY_WORKFLOW_RUN_ID = "workflowRunId";
/** The key for the worker host in the task properties. */
public static final String PROPERTY_WORKER_HOST = "workerHost";
/** The key whether a task is recoverable in the task properties. */
public static final String PROPERTY_RECOVERABLE = "recoverable";
/** the original qualifier to temporarily store the tasks qualifier. */
public static final String PROPERTY_ORIGINAL_QUALIFIER = "originalQualifier";
/** the original qualifier to temporarily store the tasks qualifier. */
public static final String PROPERTY_ORIGINAL_WORKER = "originalWorker";
/** key name for task start time. */
public static final String PROPERTY_START_TIME = "startTime";
/** key name for task end time. */
public static final String PROPERTY_END_TIME = "endTime";
/** key name for task creation time. */
public static final String PROPERTY_CREATED_TIME = "createdTime";
/** difference between createdTime and startTime in milliseconds, computed when the task is retrieved by a worker. */
public static final String PROPERY_TASK_AGE = "taskAge";
/** key name for postponed, how often is the task postponed. */
public static final String PROPERTY_POSTPONED = "postponed";
/** The key for the task id. */
public static final String KEY_TASK_ID = "taskId";
/** The key for the worker name. */
public static final String KEY_WORKER_NAME = "workerName";
/** The key for the qualifier. */
public static final String KEY_QUALIFIER = "qualifier";
/** The key for the properties. */
public static final String KEY_PROPERTIES = "properties";
/** The key for the parameters. */
public static final String KEY_PARAMETERS = "parameters";
/** The key for the input. */
public static final String KEY_INPUT = "input";
/** The key for the output. */
public static final String KEY_OUTPUT = "output";
/** The key for the result description. */
public static final String KEY_RESULT_DESCRIPTION = "resultDescription";
/** The task id. */
private final String _taskId;
/** The name of worker for this step ( = queue name of task manager for this kind of workers). */
private final String _workerName;
/** The value to be matched by the values in a "Conditional GET" request. */
private String _qualifier;
/** task properties. */
private final Map<String, String> _properties;
/** parameters for the worker. */
private final Map<String, String> _parameters;
/**
* named input bulk slots. The key is the worker slot name, the value an object describing the bucket name and storage
* information.
*/
private final Map<String, List<BulkInfo>> _inputBulks;
/**
* named output bulk slots. The key is the worker slot name, the value an object describing the bucket name and
* storage information.
*/
private final Map<String, List<BulkInfo>> _outputBulks;
/** result description for a finished task. */
private final ResultDescription _resultDescription;
/**
* Constructs a new Task.
*
* @param taskId
* The task id.
* @param workerName
* The name of the worker for this task.
*/
public Task(final String taskId, final String workerName) {
this(taskId, workerName, null);
}
/** create task with optional result description. */
private Task(final String taskId, final String workerName, final ResultDescription result) {
_taskId = taskId;
_workerName = workerName;
_properties = new LinkedHashMap<String, String>();
_parameters = new LinkedHashMap<String, String>();
_inputBulks = new LinkedHashMap<String, List<BulkInfo>>();
_outputBulks = new LinkedHashMap<String, List<BulkInfo>>();
_resultDescription = result;
}
/**
* Constructs a new Task from an existing Task with empty ResultDescription and own task id for retrying (the
* qualifier is restored from {@link Task#PROPERTY_ORIGINAL_QUALIFIER} property.
* <p>
* <b>Note</b>: the bulks and parameters are copied, as well as the properties.
* </p>
*
* @param taskId
* the task id for the new task.
*/
public Task createRetryTask(final String taskId) {
final String originalWorkerName = getProperties().remove(Task.PROPERTY_ORIGINAL_WORKER);
final Task newTask = new Task(taskId, originalWorkerName);
newTask.getProperties().putAll(getProperties());
newTask.getParameters().putAll(getParameters());
newTask.getInputBulks().putAll(getInputBulks());
newTask.getOutputBulks().putAll(getOutputBulks());
// and original qualifier:
newTask.setQualifier(getProperties().remove(Task.PROPERTY_ORIGINAL_QUALIFIER));
newTask.getProperties().remove(Task.PROPERTY_START_TIME);
newTask.getProperties().remove(Task.PROPERTY_END_TIME);
newTask.getProperties().remove(Task.PROPERY_TASK_AGE);
return newTask;
}
/**
* save worker name and qualifier to properties and set worker name to finish worker.
*/
public Task createFinishTask(final ResultDescription result, final String finishWorkerName) {
final Task finishTask = new Task(_taskId, finishWorkerName, result);
finishTask.getProperties().putAll(getProperties());
finishTask.getParameters().putAll(getParameters());
finishTask.getInputBulks().putAll(getInputBulks());
finishTask.getOutputBulks().putAll(getOutputBulks());
finishTask.getProperties().put(Task.PROPERTY_END_TIME,
ValueFormatHelper.getDefaultDateTimeFormat().format(new Date()));
finishTask.getProperties().put(Task.PROPERTY_ORIGINAL_WORKER, _workerName);
if (!StringUtils.isEmpty(_qualifier)) {
finishTask.getProperties().put(Task.PROPERTY_ORIGINAL_QUALIFIER, _qualifier);
}
return finishTask;
}
/**
* parse task from any.
*
* @param any
* Any representation of task, expected to be an AnyMap
* @return parsed task.
* @throws AnyConvertException
* an exception during conversion from Any
*/
public static Task fromAny(final Any any) throws AnyConvertException {
if (!(any instanceof AnyMap)) {
throw new AnyConvertException("Error parsing task from Any object that is no Map.");
}
try {
final AnyMap anyTask = (AnyMap) any;
final String taskId = anyTask.getStringValue(KEY_TASK_ID);
final String workerName = anyTask.getStringValue(KEY_WORKER_NAME);
ResultDescription result = null;
if (anyTask.get(KEY_RESULT_DESCRIPTION) != null) {
result = ResultDescription.fromAny(anyTask.getMap(KEY_RESULT_DESCRIPTION));
}
final Task task = new Task(taskId, workerName, result);
if (anyTask.containsKey(KEY_QUALIFIER)) {
task.setQualifier(anyTask.getStringValue(KEY_QUALIFIER));
}
parseStrings(anyTask.get(KEY_PROPERTIES), task.getProperties());
parseStrings(anyTask.get(KEY_PARAMETERS), task.getParameters());
parseBulks(anyTask.get(KEY_INPUT), task.getInputBulks());
parseBulks(anyTask.get(KEY_OUTPUT), task.getOutputBulks());
return task;
} catch (final Exception ex) {
throw new AnyConvertException("Error parsing task from Any object", ex);
}
}
/**
* parse an Any and put the any key and string values into a map.
*
* @param any
* the any to parse
* @param stringMap
* the Map to be modified
* @throws Exception
* an exception during Any parsing
*/
private static void parseStrings(final Any any, final Map<String, String> stringMap) throws Exception {
if (any != null && any.isMap()) {
final Iterator<String> keys = ((AnyMap) any).keySet().iterator();
while (keys.hasNext()) {
final String key = keys.next();
final String value = ((AnyMap) any).getStringValue(key);
stringMap.put(key, value);
}
}
}
/**
* Parse the bulks out of an Any object.
*
* @param any
* the input Any to be parsed
* @param bulkMap
* the bulkMap to be filled
* @throws Exception
* an exception while parsing the Any
*/
private static void parseBulks(final Any any, final Map<String, List<BulkInfo>> bulkMap) throws Exception {
if (any != null && any.isMap()) {
final AnyMap anyMap = (AnyMap) any;
for (final Entry<String, Any> entry : anyMap.entrySet()) {
final String key = entry.getKey();
final List<BulkInfo> bulkList = new ArrayList<BulkInfo>();
for (final Any bulkAny : entry.getValue()) {
bulkList.add(BulkInfo.fromAny(bulkAny));
}
bulkMap.put(key, bulkList);
}
}
}
/**
* @return the taskId
*/
public String getTaskId() {
return _taskId;
}
/**
* @return the workerName
*/
public String getWorkerName() {
return _workerName;
}
/** @return qualifier value, if set. Else null. */
public String getQualifier() {
return _qualifier;
}
/**
* @param qualifier
* new qualifier.
*/
public void setQualifier(final String qualifier) {
_qualifier = qualifier;
}
/**
* @return the properties
*/
public Map<String, String> getProperties() {
return _properties;
}
/**
* @return the parameters
*/
public Map<String, String> getParameters() {
return _parameters;
}
/**
* @return the inputBulks
*/
public Map<String, List<BulkInfo>> getInputBulks() {
return _inputBulks;
}
/**
* @return the outputBulks
*/
public Map<String, List<BulkInfo>> getOutputBulks() {
return _outputBulks;
}
/**
* @return the result description (which is only set for a finished task)
*/
public ResultDescription getResultDescription() {
return _resultDescription;
}
/** set {@link Task#PROPERTY_CREATED_TIME} if not set already and it's not a finishing task. */
public void setTaskCreatedProperties() {
if (!TaskManager.FINISHING_TASKS_WORKER.equals(_workerName) && !_properties.containsKey(PROPERTY_CREATED_TIME)) {
_properties.put(Task.PROPERTY_CREATED_TIME, ValueFormatHelper.getDefaultDateTimeFormat().format(new Date()));
}
}
/**
* set {@link Task#PROPERTY_START_TIME} and {@value Task#PROPERY_TASK_AGE}, if it's not a finishing task.
*/
public void setTaskStartedProperties() {
if (!TaskManager.FINISHING_TASKS_WORKER.equals(_workerName)) {
final SimpleDateFormat df = ValueFormatHelper.getDefaultDateTimeFormat();
final Date startTime = new Date();
_properties.put(Task.PROPERTY_START_TIME, df.format(startTime));
final String createdTimeString = _properties.get(Task.PROPERTY_CREATED_TIME);
long taskAge = 0;
if (createdTimeString != null) {
try {
final Date createdTime = df.parse(createdTimeString);
taskAge = startTime.getTime() - createdTime.getTime();
} catch (final ParseException ex) {
; // ignore.
}
}
_properties.put(Task.PROPERY_TASK_AGE, Long.toString(taskAge));
}
}
/**
* Returns the task as an Any representation. If "parameters" are set to null it will not be guaranteed if the
* "parameters" property will be present in the resulting Any with a value of null or if it will not be present at
* all.
*
* @return Any object describing this task.
*/
public AnyMap toAny() {
try {
final AnyMap taskAny = DataFactory.DEFAULT.createAnyMap();
taskAny.put(KEY_TASK_ID, getTaskId());
taskAny.put(KEY_WORKER_NAME, getWorkerName());
if (getQualifier() != null) {
taskAny.put(KEY_QUALIFIER, getQualifier());
}
if (getProperties() != null) {
taskAny.put(KEY_PROPERTIES, convertStringMap(getProperties()));
}
if (getParameters() != null) {
taskAny.put(KEY_PARAMETERS, convertStringMap(getParameters()));
}
if (getInputBulks() != null) {
taskAny.put(KEY_INPUT, convertSlotMap(getInputBulks()));
}
if (getOutputBulks() != null) {
taskAny.put(KEY_OUTPUT, convertSlotMap(getOutputBulks()));
}
if (getResultDescription() != null) {
taskAny.put(KEY_RESULT_DESCRIPTION, getResultDescription().toAny());
}
return taskAny;
} catch (final Exception ex) {
throw new IllegalStateException("Failed to create Any object for task " + getTaskId(), ex);
}
}
/**
* Convert a Map(of String to List of BulkInfo) to an AnyMap.
*
* @param slotMap
* the input Map of string to list of BulkInfos
* @return an Any representing the Map.
* @throws Exception
* an exception while converting the map to an Any.
*/
private AnyMap convertSlotMap(final Map<String, List<BulkInfo>> slotMap) throws Exception {
final AnyMap slotsAny = DataFactory.DEFAULT.createAnyMap();
for (final Entry<String, List<BulkInfo>> entry : slotMap.entrySet()) {
final String slotName = entry.getKey();
final List<BulkInfo> bulkInfos = entry.getValue();
final AnySeq bulkInfosAny = DataFactory.DEFAULT.createAnySeq();
for (final BulkInfo bulkInfo : bulkInfos) {
bulkInfosAny.add(bulkInfo.toAny());
}
slotsAny.put(slotName, bulkInfosAny);
}
return slotsAny;
}
/**
* Convert a Map(of String to String) to an Any.
*
* @param stringMap
* the input Map
* @return an Any representing the Map.
* @throws Exception
* an exception while converting the map to an Any.
*/
private AnyMap convertStringMap(final Map<String, String> stringMap) throws Exception {
final AnyMap mapAny = DataFactory.DEFAULT.createAnyMap();
for (final Entry<String, String> entry : stringMap.entrySet()) {
mapAny.put(entry.getKey(), entry.getValue());
}
return mapAny;
}
@Override
public String toString() {
return getTaskId();
}
}