blob: bb4f47eaad5a24347f66e041b62d5c1af669f295 [file] [log] [blame]
/**********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.processing.worker;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.WorkflowProcessor;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.RecordOutput;
/**
* A worker that is able to execute a pipeline.
*/
public class PipelineProcessorWorker extends ProcessingWorker {
/** worker's name. */
public static final String WORKER_NAME = "pipelineProcessor";
/** key for the pipeline's name. */
public static final String KEY_PIPELINE_NAME = "pipelineName";
public static final String ATTR_PIPELINE_NAME = "_pipeline";
/** key for the number of parallel records for one execution of the pipeline. */
public static final String KEY_PIPELINERUN_BULKSIZE = "pipelineRunBulkSize";
/** default number of parallel records to be processed in one go. */
public static final int DEFAULT_PIPELINERUN_BULKSIZE = 1;
/** associated workflow processor. Set by DS. */
private WorkflowProcessor _processor;
/** @return {@value #WORKER_NAME} */
@Override
public String getName() {
return WORKER_NAME;
}
/** {@inheritDoc} */
@Override
public boolean perform(final AnyMap parameters, final RecordInput recordInput, final RecordOutput recordOutput,
final TaskContext taskContext) throws Exception {
final long pipelineRunBulkSize = getPipelineRunBulkSize(parameters);
final String defaultPipelineName = getPipelineName(parameters);
final List<String> validPipelineNames = _processor.getWorkflowNames();
String currentPipelineName = null;
boolean success = false; // to check if at least some records were processed successful
boolean bulkFinished = false;
Record record = null;
String recordPipelineName = null;
while (!bulkFinished && !taskContext.isCanceled()) {
final Blackboard blackboard = getBlackboard(taskContext);
final Collection<String> recordIds = new LinkedHashSet<String>();
try {
if (record != null) {
// leftover record from previous loop that wants to use a different pipeline than the predecessors
// or has the same record ID than another record on the same blackboard.
putRecordOnBlackboard(record, blackboard, recordIds);
currentPipelineName = recordPipelineName;
record = null;
}
// load records on blackboard as long as pipelineRunBulkSize is not reached, all records want to go to the
// same pipeline and have different record IDs.
while (record == null && !bulkFinished && recordIds.size() < pipelineRunBulkSize) {
record = recordInput.getRecord();
if (record == null) {
bulkFinished = true;
} else {
setTaskParameters(record, parameters);
recordPipelineName = getPipelineNameForRecord(record, defaultPipelineName, validPipelineNames);
if (currentPipelineName == null) {
currentPipelineName = recordPipelineName;
}
if (recordPipelineName.equals(currentPipelineName) && !recordIds.contains(record.getId())) {
putRecordOnBlackboard(record, blackboard, recordIds);
record = null;
// if record is not added to blackboard here, the inner loop will end now, the current records will be
// processed, and the leftover record will be put on the next blackboard in the next round of the outer
// loop
}
}
}
if (!recordIds.isEmpty() && !taskContext.isCanceled()) {
success |= processRecords(blackboard, recordIds, currentPipelineName, recordOutput, taskContext);
currentPipelineName = null;
}
} finally {
cleanupBlackboard(blackboard, taskContext);
}
}
return success;
}
private String getPipelineNameForRecord(final Record record, final String defaultPipelineName,
final List<String> validPipelineNames) {
final AnyMap metadata = record.getMetadata();
final Any attributeValue = metadata.get(ATTR_PIPELINE_NAME);
if (attributeValue != null && attributeValue.isString()) {
final String recordPipelineName = attributeValue.asValue().asString();
if (validPipelineNames.contains(recordPipelineName)) {
return recordPipelineName;
} else {
_log.warn("Record " + record.getId() + " specifies invalid pipeline name " + recordPipelineName
+ ", will be processed by default pipeline " + defaultPipelineName);
}
}
return defaultPipelineName;
}
private void putRecordOnBlackboard(final Record record, final Blackboard blackboard,
final Collection<String> recordIds) throws BlackboardAccessException {
blackboard.setRecord(record);
recordIds.add(record.getId());
}
/** write task parameters to record. */
private void setTaskParameters(final Record record, final AnyMap parameters) {
final AnyMap parameterMap = record.getMetadata().getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
for (final Map.Entry<String, Any> parameter : parameters.entrySet()) {
parameterMap.put(parameter.getKey(), parameter.getValue());
}
}
/**
* process records. If processing throws a recoverable exception it is passed through so that the task can be retried
* and may succeed then. Non-recoverable exceptions are catched and logged as warnings to the task log.
*
* @return true, if processing was successful, false if a non-recoverable exception occured.
*/
private boolean processRecords(final Blackboard blackboard, final Collection<String> recordIds,
final String pipelineName, final RecordOutput recordOutput, final TaskContext taskContext) throws Exception {
try {
final String inputIds[] = recordIds.toArray(new String[recordIds.size()]);
final String[] resultIds = _processor.process(pipelineName, blackboard, inputIds);
writeResultRecords(blackboard, resultIds, recordOutput, taskContext);
return true;
} catch (final ProcessingException ex) {
if (ex.isRecoverable()) {
throw ex;
} else {
taskContext.getLog().warn("Failed to process records " + recordIds + ", skipping them.", ex);
return false;
}
}
}
/**
* @return value of pipelineName parameter.
* @throws IllegalArgumentException
* if parameter is not set.
*/
private String getPipelineName(final AnyMap parameters) {
final String pipelineName = parameters.getStringValue(KEY_PIPELINE_NAME);
if (pipelineName == null || pipelineName.isEmpty()) {
throw new IllegalArgumentException("Pipeline name parameter '" + KEY_PIPELINE_NAME + "' is not set.");
}
if (!_processor.getWorkflowNames().contains(pipelineName)) {
throw new IllegalArgumentException("Configured pipeline '" + pipelineName + "' doesn't exist.");
}
return pipelineName;
}
/**
* @return value of {@value #KEY_PIPELINERUN_BULKSIZE} parameter, if set and grater than 0. Else default value
* {@link #DEFAULT_PIPELINERUN_BULKSIZE}.
*/
private long getPipelineRunBulkSize(final AnyMap parameters) {
final Long pipelineRunBulkSize = parameters.getLongValue(KEY_PIPELINERUN_BULKSIZE);
if (pipelineRunBulkSize != null && pipelineRunBulkSize > 0) {
return pipelineRunBulkSize;
}
return DEFAULT_PIPELINERUN_BULKSIZE;
}
/** set workflow processor reference (used by DS). */
public void setProcessor(final WorkflowProcessor processor) {
_processor = processor;
}
/** remove workflow processor reference (used by DS). */
public void unsetProcessor(final WorkflowProcessor processor) {
if (_processor == processor) {
_processor = null;
}
}
}