/********************************************************************************************************************** | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.processing.worker; | |
import java.util.Collection; | |
import java.util.LinkedHashSet; | |
import java.util.List; | |
import java.util.Map; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.blackboard.BlackboardAccessException; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.WorkflowProcessor; | |
import org.eclipse.smila.processing.parameters.ParameterAccessor; | |
import org.eclipse.smila.taskworker.TaskContext; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
import org.eclipse.smila.taskworker.output.RecordOutput; | |
/** | |
* A worker that is able to execute a pipeline. | |
*/ | |
public class PipelineProcessorWorker extends ProcessingWorker { | |
/** worker's name. */ | |
public static final String WORKER_NAME = "pipelineProcessor"; | |
/** key for the pipeline's name. */ | |
public static final String KEY_PIPELINE_NAME = "pipelineName"; | |
public static final String ATTR_PIPELINE_NAME = "_pipeline"; | |
/** key for the number of parallel records for one execution of the pipeline. */ | |
public static final String KEY_PIPELINERUN_BULKSIZE = "pipelineRunBulkSize"; | |
/** default number of parallel records to be processed in one go. */ | |
public static final int DEFAULT_PIPELINERUN_BULKSIZE = 1; | |
/** associated workflow processor. Set by DS. */ | |
private WorkflowProcessor _processor; | |
/** @return {@value #WORKER_NAME} */ | |
@Override | |
public String getName() { | |
return WORKER_NAME; | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public boolean perform(final AnyMap parameters, final RecordInput recordInput, final RecordOutput recordOutput, | |
final TaskContext taskContext) throws Exception { | |
final long pipelineRunBulkSize = getPipelineRunBulkSize(parameters); | |
final String defaultPipelineName = getPipelineName(parameters); | |
final List<String> validPipelineNames = _processor.getWorkflowNames(); | |
String currentPipelineName = null; | |
boolean success = false; // to check if at least some records were processed successful | |
boolean bulkFinished = false; | |
Record record = null; | |
String recordPipelineName = null; | |
while (!bulkFinished && !taskContext.isCanceled()) { | |
final Blackboard blackboard = getBlackboard(taskContext); | |
final Collection<String> recordIds = new LinkedHashSet<String>(); | |
try { | |
if (record != null) { | |
// leftover record from previous loop that wants to use a different pipeline than the predecessors | |
// or has the same record ID than another record on the same blackboard. | |
putRecordOnBlackboard(record, blackboard, recordIds); | |
currentPipelineName = recordPipelineName; | |
record = null; | |
} | |
// load records on blackboard as long as pipelineRunBulkSize is not reached, all records want to go to the | |
// same pipeline and have different record IDs. | |
while (record == null && !bulkFinished && recordIds.size() < pipelineRunBulkSize) { | |
record = recordInput.getRecord(); | |
if (record == null) { | |
bulkFinished = true; | |
} else { | |
setTaskParameters(record, parameters); | |
recordPipelineName = getPipelineNameForRecord(record, defaultPipelineName, validPipelineNames); | |
if (currentPipelineName == null) { | |
currentPipelineName = recordPipelineName; | |
} | |
if (recordPipelineName.equals(currentPipelineName) && !recordIds.contains(record.getId())) { | |
putRecordOnBlackboard(record, blackboard, recordIds); | |
record = null; | |
// if record is not added to blackboard here, the inner loop will end now, the current records will be | |
// processed, and the leftover record will be put on the next blackboard in the next round of the outer | |
// loop | |
} | |
} | |
} | |
if (!recordIds.isEmpty() && !taskContext.isCanceled()) { | |
success |= processRecords(blackboard, recordIds, currentPipelineName, recordOutput, taskContext); | |
currentPipelineName = null; | |
} | |
} finally { | |
cleanupBlackboard(blackboard, taskContext); | |
} | |
} | |
return success; | |
} | |
private String getPipelineNameForRecord(final Record record, final String defaultPipelineName, | |
final List<String> validPipelineNames) { | |
final AnyMap metadata = record.getMetadata(); | |
final Any attributeValue = metadata.get(ATTR_PIPELINE_NAME); | |
if (attributeValue != null && attributeValue.isString()) { | |
final String recordPipelineName = attributeValue.asValue().asString(); | |
if (validPipelineNames.contains(recordPipelineName)) { | |
return recordPipelineName; | |
} else { | |
_log.warn("Record " + record.getId() + " specifies invalid pipeline name " + recordPipelineName | |
+ ", will be processed by default pipeline " + defaultPipelineName); | |
} | |
} | |
return defaultPipelineName; | |
} | |
private void putRecordOnBlackboard(final Record record, final Blackboard blackboard, | |
final Collection<String> recordIds) throws BlackboardAccessException { | |
blackboard.setRecord(record); | |
recordIds.add(record.getId()); | |
} | |
/** write task parameters to record. */ | |
private void setTaskParameters(final Record record, final AnyMap parameters) { | |
final AnyMap parameterMap = record.getMetadata().getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); | |
for (final Map.Entry<String, Any> parameter : parameters.entrySet()) { | |
parameterMap.put(parameter.getKey(), parameter.getValue()); | |
} | |
} | |
/** | |
* process records. If processing throws a recoverable exception it is passed through so that the task can be retried | |
* and may succeed then. Non-recoverable exceptions are catched and logged as warnings to the task log. | |
* | |
* @return true, if processing was successful, false if a non-recoverable exception occured. | |
*/ | |
private boolean processRecords(final Blackboard blackboard, final Collection<String> recordIds, | |
final String pipelineName, final RecordOutput recordOutput, final TaskContext taskContext) throws Exception { | |
try { | |
final String inputIds[] = recordIds.toArray(new String[recordIds.size()]); | |
final String[] resultIds = _processor.process(pipelineName, blackboard, inputIds); | |
writeResultRecords(blackboard, resultIds, recordOutput, taskContext); | |
return true; | |
} catch (final ProcessingException ex) { | |
if (ex.isRecoverable()) { | |
throw ex; | |
} else { | |
taskContext.getLog().warn("Failed to process records " + recordIds + ", skipping them.", ex); | |
return false; | |
} | |
} | |
} | |
/** | |
* @return value of pipelineName parameter. | |
* @throws IllegalArgumentException | |
* if parameter is not set. | |
*/ | |
private String getPipelineName(final AnyMap parameters) { | |
final String pipelineName = parameters.getStringValue(KEY_PIPELINE_NAME); | |
if (pipelineName == null || pipelineName.isEmpty()) { | |
throw new IllegalArgumentException("Pipeline name parameter '" + KEY_PIPELINE_NAME + "' is not set."); | |
} | |
if (!_processor.getWorkflowNames().contains(pipelineName)) { | |
throw new IllegalArgumentException("Configured pipeline '" + pipelineName + "' doesn't exist."); | |
} | |
return pipelineName; | |
} | |
/** | |
* @return value of {@value #KEY_PIPELINERUN_BULKSIZE} parameter, if set and grater than 0. Else default value | |
* {@link #DEFAULT_PIPELINERUN_BULKSIZE}. | |
*/ | |
private long getPipelineRunBulkSize(final AnyMap parameters) { | |
final Long pipelineRunBulkSize = parameters.getLongValue(KEY_PIPELINERUN_BULKSIZE); | |
if (pipelineRunBulkSize != null && pipelineRunBulkSize > 0) { | |
return pipelineRunBulkSize; | |
} | |
return DEFAULT_PIPELINERUN_BULKSIZE; | |
} | |
/** set workflow processor reference (used by DS). */ | |
public void setProcessor(final WorkflowProcessor processor) { | |
_processor = processor; | |
} | |
/** remove workflow processor reference (used by DS). */ | |
public void unsetProcessor(final WorkflowProcessor processor) { | |
if (_processor == processor) { | |
_processor = null; | |
} | |
} | |
} |