blob: 0f04feda91e96e0ac75ea6b39641b71f272a8378 [file] [log] [blame]
/**********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.processing.worker;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.blackboard.BlackboardFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.WorkflowProcessor;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.RecordOutput;
/**
* A worker that is able to execute a pipeline.
*/
public class PipelineProcessingWorker implements Worker {
/** worker's name. */
public static final String WORKER_NAME = "pipelineProcessingWorker";
/** key for the pipeline's name. */
public static final String KEY_PIPELINE_NAME = "pipelineName";
/** key for the number of parallel records for one execution of the pipeline. */
public static final String KEY_NUMBER_OF_PARALLEL_RECORDS = "numberOfParallelRecords";
/** default number of parallel records to be processed in one go. */
public static final int DEFAULT_NUMBER_OF_PARALLEL_RECORDS = 1;
/** the workers input slot name . */
public static final String INPUT_SLOT_NAME = "input";
/** the workers output slot name . */
public static final String OUTPUT_SLOT_NAME = "output";
/** flag if the blackboard warning has already been logged? */
private static boolean s_noBinStorageAvailableLogged;
/** associated workflow processor. Set by DS. */
private WorkflowProcessor _processor;
/** blackboard factory. Set by DS */
private BlackboardFactory _blackboardFactory;
/** private log. */
private final Log _log = LogFactory.getLog(getClass());
/** {@inheritDoc} */
@Override
public void perform(final TaskContext taskContext) throws Exception {
// get parameters
final Map<String, String> parameters = taskContext.getTask().getParameters();
final int numberOfParallelRecords = getNumberOfParallelRecords(parameters);
final String pipelineName = getPipelineName(parameters);
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT_NAME);
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT_NAME);
// iterate over the complete input bulk
Record record = null;
boolean success = false; // to check if at least some records were processed successful
do {
// process numberOfParallelRecords records at one go
// try to get a blackboard with binary storage.
Blackboard blackboard;
try {
blackboard = _blackboardFactory.createBlackboard(false, true);
} catch (final BlackboardAccessException e) {
if (!s_noBinStorageAvailableLogged) {
_log.warn("Could not get a blackboard with binary storage, using a transient one. "
+ "Attachments won't be accessible by the worker.");
s_noBinStorageAvailableLogged = true;
}
blackboard = _blackboardFactory.createBlackboard(false, false);
}
final Collection<String> recordIds = new LinkedHashSet<String>();
for (int i = 0; i < numberOfParallelRecords; i++) {
record = recordInput.getRecord();
if (record == null) {
break; // for
}
setTaskParameters(record, parameters);
blackboard.setRecord(record);
recordIds.add(record.getId());
}
// process the bunch of records
if (!recordIds.isEmpty() && !taskContext.isCanceled()) {
success = processRecords(taskContext, recordOutput, pipelineName, blackboard, recordIds) || success;
}
} while (record != null && !taskContext.isCanceled());
if (!success) {
throw new ProcessingException("None of the records of task " + taskContext.getTask()
+ " could be successfully processed, have a look at the log for details.");
}
}
/** write task parameters to record. */
private void setTaskParameters(final Record record, final Map<String, String> parameters) {
final AnyMap parameterMap = record.getMetadata().getMap(ParameterAccessor.DEFAULT_PARAMETERS_ATTRIBUTE, true);
for (final Map.Entry<String, String> parameter : parameters.entrySet()) {
parameterMap.put(parameter.getKey(), parameter.getValue());
}
}
/**
* process records. If processing throws a recoverable exception it is passed through so that the task can be retried
* and may succeed then. Non-recoverable exceptions are catched and logged as warnings to the task log.
*
* @return true, if processing was successful, false if a non-recoverable exception occured.
*/
private boolean processRecords(final TaskContext taskContext, final RecordOutput recordOutput,
final String pipelineName, final Blackboard blackboard, final Collection<String> recordIds)
throws ObjectStoreException, IOException, ProcessingException {
try {
final String inputIds[] = recordIds.toArray(new String[recordIds.size()]);
final String[] resultIds = _processor.process(pipelineName, blackboard, inputIds);
writeResultRecords(taskContext, recordOutput, blackboard, resultIds);
return true;
} catch (final ProcessingException ex) {
if (ex.isRecoverable()) {
throw ex;
} else {
taskContext.getLog().warn("Failed to process records " + recordIds + ", skipping them.", ex);
return false;
}
} finally {
blackboard.unload(); // TODO: when we use Reocrd-/BinaryStorage someday, we can think about commit() here
}
}
/**
* append the resulting records to the bulk. Errors on blackboard access are catched and logged as warnings to the
* task log, as they are considered as record-specific non-recoverable errors.
*
* @param recordOutput
* where to write the records. Can be null (is optional in worker description)
*/
private void writeResultRecords(final TaskContext taskContext, final RecordOutput recordOutput,
final Blackboard blackboard, final String[] resultIds) throws ObjectStoreException, IOException {
if (resultIds != null && recordOutput != null) {
for (final String resultId : resultIds) {
try {
recordOutput.writeRecord(blackboard.getRecord(resultId));
} catch (final BlackboardAccessException ex) {
taskContext.getLog().warn("Failed to read result record " + resultId + " from blackboard, skipping it.",
ex);
}
}
}
}
/**
* @return value of pipelineName parameter.
* @throws IllegalArgumentException
* if parameter is not set.
*/
private String getPipelineName(final Map<String, String> parameters) {
final String pipelineName = parameters.get(KEY_PIPELINE_NAME);
if (pipelineName == null || pipelineName.isEmpty()) {
throw new IllegalArgumentException("Pipeline name parameter " + KEY_PIPELINE_NAME + " is not set.");
}
if (!_processor.getWorkflowNames().contains(pipelineName)) {
throw new IllegalArgumentException("Configured pipeline " + KEY_PIPELINE_NAME + " doesn't exist.");
}
return pipelineName;
}
/**
* @return value of numberOfParallelRecords parameter, if set and grater than 0. Else default value
* {@link #DEFAULT_NUMBER_OF_PARALLEL_RECORDS}.
*/
private int getNumberOfParallelRecords(final Map<String, String> parameters) {
final String parameterValue = parameters.get(KEY_NUMBER_OF_PARALLEL_RECORDS);
if (parameterValue != null) {
try {
final int numberOfParallelRecords = Integer.parseInt(parameters.get(KEY_NUMBER_OF_PARALLEL_RECORDS));
if (numberOfParallelRecords > 0) {
return numberOfParallelRecords;
}
} catch (final NumberFormatException ex) {
; // use default value
}
}
return DEFAULT_NUMBER_OF_PARALLEL_RECORDS;
}
/** @return {@value #WORKER_NAME} */
@Override
public String getName() {
return WORKER_NAME;
}
/** set workflow processor reference (used by DS). */
public void setProcessor(final WorkflowProcessor processor) {
_processor = processor;
}
/** remove workflow processor reference (used by DS). */
public void unsetProcessor(final WorkflowProcessor processor) {
if (_processor == processor) {
_processor = null;
}
}
/** set blackboard factory reference (used by DS). */
public void setBlackboardFactory(final BlackboardFactory factory) {
_blackboardFactory = factory;
}
/** remove blackboard factory reference (used by DS). */
public void unsetBlackboardFactory(final BlackboardFactory factory) {
if (_blackboardFactory == factory) {
_blackboardFactory = null;
}
}
}