/********************************************************************************************************************** | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.processing.worker; | |
import java.io.IOException; | |
import java.util.Collection; | |
import java.util.LinkedHashSet; | |
import java.util.Map; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.blackboard.BlackboardAccessException; | |
import org.eclipse.smila.blackboard.BlackboardFactory; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.WorkflowProcessor; | |
import org.eclipse.smila.processing.parameters.ParameterAccessor; | |
import org.eclipse.smila.taskworker.TaskContext; | |
import org.eclipse.smila.taskworker.Worker; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
import org.eclipse.smila.taskworker.output.RecordOutput; | |
/** | |
* A worker that is able to execute a pipeline. | |
*/ | |
public class PipelineProcessingWorker implements Worker { | |
/** worker's name. */ | |
public static final String WORKER_NAME = "pipelineProcessingWorker"; | |
/** key for the pipeline's name. */ | |
public static final String KEY_PIPELINE_NAME = "pipelineName"; | |
/** key for the number of parallel records for one execution of the pipeline. */ | |
public static final String KEY_NUMBER_OF_PARALLEL_RECORDS = "numberOfParallelRecords"; | |
/** default number of parallel records to be processed in one go. */ | |
public static final int DEFAULT_NUMBER_OF_PARALLEL_RECORDS = 1; | |
/** the workers input slot name . */ | |
public static final String INPUT_SLOT_NAME = "input"; | |
/** the workers output slot name . */ | |
public static final String OUTPUT_SLOT_NAME = "output"; | |
/** flag if the blackboard warning has already been logged? */ | |
private static boolean s_noBinStorageAvailableLogged; | |
/** associated workflow processor. Set by DS. */ | |
private WorkflowProcessor _processor; | |
/** blackboard factory. Set by DS */ | |
private BlackboardFactory _blackboardFactory; | |
/** private log. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** {@inheritDoc} */ | |
@Override | |
public void perform(final TaskContext taskContext) throws Exception { | |
// get parameters | |
final Map<String, String> parameters = taskContext.getTask().getParameters(); | |
final int numberOfParallelRecords = getNumberOfParallelRecords(parameters); | |
final String pipelineName = getPipelineName(parameters); | |
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT_NAME); | |
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT_NAME); | |
// iterate over the complete input bulk | |
Record record = null; | |
boolean success = false; // to check if at least some records were processed successful | |
do { | |
// process numberOfParallelRecords records at one go | |
// try to get a blackboard with binary storage. | |
Blackboard blackboard; | |
try { | |
blackboard = _blackboardFactory.createBlackboard(false, true); | |
} catch (final BlackboardAccessException e) { | |
if (!s_noBinStorageAvailableLogged) { | |
_log.warn("Could not get a blackboard with binary storage, using a transient one. " | |
+ "Attachments won't be accessible by the worker."); | |
s_noBinStorageAvailableLogged = true; | |
} | |
blackboard = _blackboardFactory.createBlackboard(false, false); | |
} | |
final Collection<String> recordIds = new LinkedHashSet<String>(); | |
for (int i = 0; i < numberOfParallelRecords; i++) { | |
record = recordInput.getRecord(); | |
if (record == null) { | |
break; // for | |
} | |
setTaskParameters(record, parameters); | |
blackboard.setRecord(record); | |
recordIds.add(record.getId()); | |
} | |
// process the bunch of records | |
if (!recordIds.isEmpty() && !taskContext.isCanceled()) { | |
success = processRecords(taskContext, recordOutput, pipelineName, blackboard, recordIds) || success; | |
} | |
} while (record != null && !taskContext.isCanceled()); | |
if (!success) { | |
throw new ProcessingException("None of the records of task " + taskContext.getTask() | |
+ " could be successfully processed, have a look at the log for details."); | |
} | |
} | |
/** write task parameters to record. */ | |
private void setTaskParameters(final Record record, final Map<String, String> parameters) { | |
final AnyMap parameterMap = record.getMetadata().getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true); | |
for (final Map.Entry<String, String> parameter : parameters.entrySet()) { | |
parameterMap.put(parameter.getKey(), parameter.getValue()); | |
} | |
} | |
/** | |
* process records. If processing throws a recoverable exception it is passed through so that the task can be retried | |
* and may succeed then. Non-recoverable exceptions are catched and logged as warnings to the task log. | |
* | |
* @return true, if processing was successful, false if a non-recoverable exception occured. | |
*/ | |
private boolean processRecords(final TaskContext taskContext, final RecordOutput recordOutput, | |
final String pipelineName, final Blackboard blackboard, final Collection<String> recordIds) | |
throws ObjectStoreException, IOException, ProcessingException { | |
try { | |
final String inputIds[] = recordIds.toArray(new String[recordIds.size()]); | |
final String[] resultIds = _processor.process(pipelineName, blackboard, inputIds); | |
writeResultRecords(taskContext, recordOutput, blackboard, resultIds); | |
return true; | |
} catch (final ProcessingException ex) { | |
if (ex.isRecoverable()) { | |
throw ex; | |
} else { | |
taskContext.getLog().warn("Failed to process records " + recordIds + ", skipping them.", ex); | |
return false; | |
} | |
} finally { | |
blackboard.unload(); // TODO: when we use Reocrd-/BinaryStorage someday, we can think about commit() here | |
} | |
} | |
/** | |
* append the resulting records to the bulk. Errors on blackboard access are catched and logged as warnings to the | |
* task log, as they are considered as record-specific non-recoverable errors. | |
* | |
* @param recordOutput | |
* where to write the records. Can be null (is optional in worker description) | |
*/ | |
private void writeResultRecords(final TaskContext taskContext, final RecordOutput recordOutput, | |
final Blackboard blackboard, final String[] resultIds) throws ObjectStoreException, IOException { | |
if (resultIds != null && recordOutput != null) { | |
for (final String resultId : resultIds) { | |
try { | |
recordOutput.writeRecord(blackboard.getRecord(resultId)); | |
} catch (final BlackboardAccessException ex) { | |
taskContext.getLog().warn("Failed to read result record " + resultId + " from blackboard, skipping it.", | |
ex); | |
} | |
} | |
} | |
} | |
/** | |
* @return value of pipelineName parameter. | |
* @throws IllegalArgumentException | |
* if parameter is not set. | |
*/ | |
private String getPipelineName(final Map<String, String> parameters) { | |
final String pipelineName = parameters.get(KEY_PIPELINE_NAME); | |
if (pipelineName == null || pipelineName.isEmpty()) { | |
throw new IllegalArgumentException("Pipeline name parameter " + KEY_PIPELINE_NAME + " is not set."); | |
} | |
if (!_processor.getWorkflowNames().contains(pipelineName)) { | |
throw new IllegalArgumentException("Configured pipeline " + KEY_PIPELINE_NAME + " doesn't exist."); | |
} | |
return pipelineName; | |
} | |
/** | |
* @return value of numberOfParallelRecords parameter, if set and grater than 0. Else default value | |
* {@link #DEFAULT_NUMBER_OF_PARALLEL_RECORDS}. | |
*/ | |
private int getNumberOfParallelRecords(final Map<String, String> parameters) { | |
final String parameterValue = parameters.get(KEY_NUMBER_OF_PARALLEL_RECORDS); | |
if (parameterValue != null) { | |
try { | |
final int numberOfParallelRecords = Integer.parseInt(parameters.get(KEY_NUMBER_OF_PARALLEL_RECORDS)); | |
if (numberOfParallelRecords > 0) { | |
return numberOfParallelRecords; | |
} | |
} catch (final NumberFormatException ex) { | |
; // use default value | |
} | |
} | |
return DEFAULT_NUMBER_OF_PARALLEL_RECORDS; | |
} | |
/** @return {@value #WORKER_NAME} */ | |
@Override | |
public String getName() { | |
return WORKER_NAME; | |
} | |
/** set workflow processor reference (used by DS). */ | |
public void setProcessor(final WorkflowProcessor processor) { | |
_processor = processor; | |
} | |
/** remove workflow processor reference (used by DS). */ | |
public void unsetProcessor(final WorkflowProcessor processor) { | |
if (_processor == processor) { | |
_processor = null; | |
} | |
} | |
/** set blackboard factory reference (used by DS). */ | |
public void setBlackboardFactory(final BlackboardFactory factory) { | |
_blackboardFactory = factory; | |
} | |
/** remove blackboard factory reference (used by DS). */ | |
public void unsetBlackboardFactory(final BlackboardFactory factory) { | |
if (_blackboardFactory == factory) { | |
_blackboardFactory = null; | |
} | |
} | |
} |