/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.taskmanager; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import org.apache.commons.lang.StringUtils; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyConvertException; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.AnySeq; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.ValueFormatHelper; | |
/** | |
* Task class. | |
*/ | |
public class Task { | |
/** The key for the job name in the task properties. */ | |
public static final String PROPERTY_JOB_NAME = "jobName"; | |
/** The key for the job run id in the task properties. */ | |
public static final String PROPERTY_JOB_RUN_ID = "jobRunId"; | |
/** The key for the job id in the task properties. */ | |
public static final String PROPERTY_WORKFLOW_RUN_ID = "workflowRunId"; | |
/** The key for the worker host in the task properties. */ | |
public static final String PROPERTY_WORKER_HOST = "workerHost"; | |
/** The key whether a task is recoverable in the task properties. */ | |
public static final String PROPERTY_RECOVERABLE = "recoverable"; | |
/** the original qualifier to temporarily store the tasks qualifier. */ | |
public static final String PROPERTY_ORIGINAL_QUALIFIER = "originalQualifier"; | |
/** the original qualifier to temporarily store the tasks qualifier. */ | |
public static final String PROPERTY_ORIGINAL_WORKER = "originalWorker"; | |
/** key name for task start time. */ | |
public static final String PROPERTY_START_TIME = "startTime"; | |
/** key name for task end time. */ | |
public static final String PROPERTY_END_TIME = "endTime"; | |
/** key name for task creation time. */ | |
public static final String PROPERTY_CREATED_TIME = "createdTime"; | |
/** difference between createdTime and startTime in milliseconds, computed when the task is retrieved by a worker. */ | |
public static final String PROPERY_TASK_AGE = "taskAge"; | |
/** key name for postponed, how often is the task postponed. */ | |
public static final String PROPERTY_POSTPONED = "postponed"; | |
/** The key for the task id. */ | |
public static final String KEY_TASK_ID = "taskId"; | |
/** The key for the worker name. */ | |
public static final String KEY_WORKER_NAME = "workerName"; | |
/** The key for the qualifier. */ | |
public static final String KEY_QUALIFIER = "qualifier"; | |
/** The key for the properties. */ | |
public static final String KEY_PROPERTIES = "properties"; | |
/** The key for the parameters. */ | |
public static final String KEY_PARAMETERS = "parameters"; | |
/** The key for the input. */ | |
public static final String KEY_INPUT = "input"; | |
/** The key for the output. */ | |
public static final String KEY_OUTPUT = "output"; | |
/** The key for the result description. */ | |
public static final String KEY_RESULT_DESCRIPTION = "resultDescription"; | |
/** The task id. */ | |
private final String _taskId; | |
/** The name of worker for this step ( = queue name of task manager for this kind of workers). */ | |
private final String _workerName; | |
/** The value to be matched by the values in a "Conditional GET" request. */ | |
private String _qualifier; | |
/** task properties. */ | |
private final Map<String, String> _properties; | |
/** parameters for the worker. */ | |
private final Map<String, String> _parameters; | |
/** | |
* named input bulk slots. The key is the worker slot name, the value an object describing the bucket name and storage | |
* information. | |
*/ | |
private final Map<String, List<BulkInfo>> _inputBulks; | |
/** | |
* named output bulk slots. The key is the worker slot name, the value an object describing the bucket name and | |
* storage information. | |
*/ | |
private final Map<String, List<BulkInfo>> _outputBulks; | |
/** result description for a finished task. */ | |
private final ResultDescription _resultDescription; | |
/** | |
* Constructs a new Task. | |
* | |
* @param taskId | |
* The task id. | |
* @param workerName | |
* The name of the worker for this task. | |
*/ | |
public Task(final String taskId, final String workerName) { | |
this(taskId, workerName, null); | |
} | |
/** create task with optional result description. */ | |
private Task(final String taskId, final String workerName, final ResultDescription result) { | |
_taskId = taskId; | |
_workerName = workerName; | |
_properties = new LinkedHashMap<String, String>(); | |
_parameters = new LinkedHashMap<String, String>(); | |
_inputBulks = new LinkedHashMap<String, List<BulkInfo>>(); | |
_outputBulks = new LinkedHashMap<String, List<BulkInfo>>(); | |
_resultDescription = result; | |
} | |
/** | |
* Constructs a new Task from an existing Task with empty ResultDescription and own task id for retrying (the | |
* qualifier is restored from {@link Task#PROPERTY_ORIGINAL_QUALIFIER} property. | |
* <p> | |
* <b>Note</b>: the bulks and parameters are copied, as well as the properties. | |
* </p> | |
* | |
* @param taskId | |
* the task id for the new task. | |
*/ | |
public Task createRetryTask(final String taskId) { | |
final String originalWorkerName = getProperties().remove(Task.PROPERTY_ORIGINAL_WORKER); | |
final Task newTask = new Task(taskId, originalWorkerName); | |
newTask.getProperties().putAll(getProperties()); | |
newTask.getParameters().putAll(getParameters()); | |
newTask.getInputBulks().putAll(getInputBulks()); | |
newTask.getOutputBulks().putAll(getOutputBulks()); | |
// and original qualifier: | |
newTask.setQualifier(getProperties().remove(Task.PROPERTY_ORIGINAL_QUALIFIER)); | |
newTask.getProperties().remove(Task.PROPERTY_START_TIME); | |
newTask.getProperties().remove(Task.PROPERTY_END_TIME); | |
newTask.getProperties().remove(Task.PROPERY_TASK_AGE); | |
return newTask; | |
} | |
/** | |
* save worker name and qualifier to properties and set worker name to finish worker. | |
*/ | |
public Task createFinishTask(final ResultDescription result, final String finishWorkerName) { | |
final Task finishTask = new Task(_taskId, finishWorkerName, result); | |
finishTask.getProperties().putAll(getProperties()); | |
finishTask.getParameters().putAll(getParameters()); | |
finishTask.getInputBulks().putAll(getInputBulks()); | |
finishTask.getOutputBulks().putAll(getOutputBulks()); | |
finishTask.getProperties().put(Task.PROPERTY_END_TIME, | |
ValueFormatHelper.getDefaultDateTimeFormat().format(new Date())); | |
finishTask.getProperties().put(Task.PROPERTY_ORIGINAL_WORKER, _workerName); | |
if (!StringUtils.isEmpty(_qualifier)) { | |
finishTask.getProperties().put(Task.PROPERTY_ORIGINAL_QUALIFIER, _qualifier); | |
} | |
return finishTask; | |
} | |
/** | |
* parse task from any. | |
* | |
* @param any | |
* Any representation of task, expected to be an AnyMap | |
* @return parsed task. | |
* @throws AnyConvertException | |
* an exception during conversion from Any | |
*/ | |
public static Task fromAny(final Any any) throws AnyConvertException { | |
if (!(any instanceof AnyMap)) { | |
throw new AnyConvertException("Error parsing task from Any object that is no Map."); | |
} | |
try { | |
final AnyMap anyTask = (AnyMap) any; | |
final String taskId = anyTask.getStringValue(KEY_TASK_ID); | |
final String workerName = anyTask.getStringValue(KEY_WORKER_NAME); | |
ResultDescription result = null; | |
if (anyTask.get(KEY_RESULT_DESCRIPTION) != null) { | |
result = ResultDescription.fromAny(anyTask.getMap(KEY_RESULT_DESCRIPTION)); | |
} | |
final Task task = new Task(taskId, workerName, result); | |
if (anyTask.containsKey(KEY_QUALIFIER)) { | |
task.setQualifier(anyTask.getStringValue(KEY_QUALIFIER)); | |
} | |
parseStrings(anyTask.get(KEY_PROPERTIES), task.getProperties()); | |
parseStrings(anyTask.get(KEY_PARAMETERS), task.getParameters()); | |
parseBulks(anyTask.get(KEY_INPUT), task.getInputBulks()); | |
parseBulks(anyTask.get(KEY_OUTPUT), task.getOutputBulks()); | |
return task; | |
} catch (final Exception ex) { | |
throw new AnyConvertException("Error parsing task from Any object", ex); | |
} | |
} | |
/** | |
* parse an Any and put the any key and string values into a map. | |
* | |
* @param any | |
* the any to parse | |
* @param stringMap | |
* the Map to be modified | |
* @throws Exception | |
* an exception during Any parsing | |
*/ | |
private static void parseStrings(final Any any, final Map<String, String> stringMap) throws Exception { | |
if (any != null && any.isMap()) { | |
final Iterator<String> keys = ((AnyMap) any).keySet().iterator(); | |
while (keys.hasNext()) { | |
final String key = keys.next(); | |
final String value = ((AnyMap) any).getStringValue(key); | |
stringMap.put(key, value); | |
} | |
} | |
} | |
/** | |
* Parse the bulks out of an Any object. | |
* | |
* @param any | |
* the input Any to be parsed | |
* @param bulkMap | |
* the bulkMap to be filled | |
* @throws Exception | |
* an exception while parsing the Any | |
*/ | |
private static void parseBulks(final Any any, final Map<String, List<BulkInfo>> bulkMap) throws Exception { | |
if (any != null && any.isMap()) { | |
final AnyMap anyMap = (AnyMap) any; | |
for (final Entry<String, Any> entry : anyMap.entrySet()) { | |
final String key = entry.getKey(); | |
final List<BulkInfo> bulkList = new ArrayList<BulkInfo>(); | |
for (final Any bulkAny : entry.getValue()) { | |
bulkList.add(BulkInfo.fromAny(bulkAny)); | |
} | |
bulkMap.put(key, bulkList); | |
} | |
} | |
} | |
/** | |
* @return the taskId | |
*/ | |
public String getTaskId() { | |
return _taskId; | |
} | |
/** | |
* @return the workerName | |
*/ | |
public String getWorkerName() { | |
return _workerName; | |
} | |
/** @return qualifier value, if set. Else null. */ | |
public String getQualifier() { | |
return _qualifier; | |
} | |
/** | |
* @param qualifier | |
* new qualifier. | |
*/ | |
public void setQualifier(final String qualifier) { | |
_qualifier = qualifier; | |
} | |
/** | |
* @return the properties | |
*/ | |
public Map<String, String> getProperties() { | |
return _properties; | |
} | |
/** | |
* @return the parameters | |
*/ | |
public Map<String, String> getParameters() { | |
return _parameters; | |
} | |
/** | |
* @return the inputBulks | |
*/ | |
public Map<String, List<BulkInfo>> getInputBulks() { | |
return _inputBulks; | |
} | |
/** | |
* @return the outputBulks | |
*/ | |
public Map<String, List<BulkInfo>> getOutputBulks() { | |
return _outputBulks; | |
} | |
/** | |
* @return the result description (which is only set for a finished task) | |
*/ | |
public ResultDescription getResultDescription() { | |
return _resultDescription; | |
} | |
/** set {@link Task#PROPERTY_CREATED_TIME} if not set already and it's not a finishing task. */ | |
public void setTaskCreatedProperties() { | |
if (!TaskManager.FINISHING_TASKS_WORKER.equals(_workerName) && !_properties.containsKey(PROPERTY_CREATED_TIME)) { | |
_properties.put(Task.PROPERTY_CREATED_TIME, ValueFormatHelper.getDefaultDateTimeFormat().format(new Date())); | |
} | |
} | |
/** | |
* set {@link Task#PROPERTY_START_TIME} and {@value Task#PROPERY_TASK_AGE}, if it's not a finishing task. | |
*/ | |
public void setTaskStartedProperties() { | |
if (!TaskManager.FINISHING_TASKS_WORKER.equals(_workerName)) { | |
final SimpleDateFormat df = ValueFormatHelper.getDefaultDateTimeFormat(); | |
final Date startTime = new Date(); | |
_properties.put(Task.PROPERTY_START_TIME, df.format(startTime)); | |
final String createdTimeString = _properties.get(Task.PROPERTY_CREATED_TIME); | |
long taskAge = 0; | |
if (createdTimeString != null) { | |
try { | |
final Date createdTime = df.parse(createdTimeString); | |
taskAge = startTime.getTime() - createdTime.getTime(); | |
} catch (final ParseException ex) { | |
; // ignore. | |
} | |
} | |
_properties.put(Task.PROPERY_TASK_AGE, Long.toString(taskAge)); | |
} | |
} | |
/** | |
* Returns the task as an Any representation. If "parameters" are set to null it will not be guaranteed if the | |
* "parameters" property will be present in the resulting Any with a value of null or if it will not be present at | |
* all. | |
* | |
* @return Any object describing this task. | |
*/ | |
public AnyMap toAny() { | |
try { | |
final AnyMap taskAny = DataFactory.DEFAULT.createAnyMap(); | |
taskAny.put(KEY_TASK_ID, getTaskId()); | |
taskAny.put(KEY_WORKER_NAME, getWorkerName()); | |
if (getQualifier() != null) { | |
taskAny.put(KEY_QUALIFIER, getQualifier()); | |
} | |
if (getProperties() != null) { | |
taskAny.put(KEY_PROPERTIES, convertStringMap(getProperties())); | |
} | |
if (getParameters() != null) { | |
taskAny.put(KEY_PARAMETERS, convertStringMap(getParameters())); | |
} | |
if (getInputBulks() != null) { | |
taskAny.put(KEY_INPUT, convertSlotMap(getInputBulks())); | |
} | |
if (getOutputBulks() != null) { | |
taskAny.put(KEY_OUTPUT, convertSlotMap(getOutputBulks())); | |
} | |
if (getResultDescription() != null) { | |
taskAny.put(KEY_RESULT_DESCRIPTION, getResultDescription().toAny()); | |
} | |
return taskAny; | |
} catch (final Exception ex) { | |
throw new IllegalStateException("Failed to create Any object for task " + getTaskId(), ex); | |
} | |
} | |
/** | |
* Convert a Map(of String to List of BulkInfo) to an AnyMap. | |
* | |
* @param slotMap | |
* the input Map of string to list of BulkInfos | |
* @return an Any representing the Map. | |
* @throws Exception | |
* an exception while converting the map to an Any. | |
*/ | |
private AnyMap convertSlotMap(final Map<String, List<BulkInfo>> slotMap) throws Exception { | |
final AnyMap slotsAny = DataFactory.DEFAULT.createAnyMap(); | |
for (final Entry<String, List<BulkInfo>> entry : slotMap.entrySet()) { | |
final String slotName = entry.getKey(); | |
final List<BulkInfo> bulkInfos = entry.getValue(); | |
final AnySeq bulkInfosAny = DataFactory.DEFAULT.createAnySeq(); | |
for (final BulkInfo bulkInfo : bulkInfos) { | |
bulkInfosAny.add(bulkInfo.toAny()); | |
} | |
slotsAny.put(slotName, bulkInfosAny); | |
} | |
return slotsAny; | |
} | |
/** | |
* Convert a Map(of String to String) to an Any. | |
* | |
* @param stringMap | |
* the input Map | |
* @return an Any representing the Map. | |
* @throws Exception | |
* an exception while converting the map to an Any. | |
*/ | |
private AnyMap convertStringMap(final Map<String, String> stringMap) throws Exception { | |
final AnyMap mapAny = DataFactory.DEFAULT.createAnyMap(); | |
for (final Entry<String, String> entry : stringMap.entrySet()) { | |
mapAny.put(entry.getKey(), entry.getValue()); | |
} | |
return mapAny; | |
} | |
@Override | |
public String toString() { | |
return getTaskId(); | |
} | |
} |