blob: db5f316286f87a144e6b27cba227fc0074716147 [file] [log] [blame]
/**********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Drazen Cindric (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.processing.worker;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.PipeletTracker;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.RecordOutput;
/**
* A worker that is able to execute a single pipelet directly, without pipeline overhead.
*/
public class PipeletProcessorWorker extends ProcessingWorker {
/** worker's name. */
public static final String WORKER_NAME = "pipeletProcessor";
/** key for the pipelets name. */
public static final String KEY_PIPELET_NAME = "pipeletName";
/** pipelet tracker. */
private PipeletTracker _pipeletTracker;
/** @return {@value #WORKER_NAME} */
@Override
public String getName() {
return WORKER_NAME;
}
/** {@inheritDoc} */
@Override
public boolean perform(final AnyMap parameters, final RecordInput recordInput, final RecordOutput recordOutput,
final TaskContext taskContext) throws Exception {
final Pipelet pipelet = createPipeletInstance(parameters);
Record record = null;
boolean success = false; // to check if at least some records were processed successful
do {
record = recordInput.getRecord();
if (record != null && !taskContext.isCanceled()) {
final Blackboard blackboard = getBlackboard(taskContext);
try {
blackboard.setRecord(record);
success |= processRecord(blackboard, record.getId(), pipelet, recordOutput, taskContext);
} finally {
cleanupBlackboard(blackboard, taskContext);
}
}
} while (record != null && !taskContext.isCanceled());
return success;
}
/** create instance of pipelet and configure it. */
private Pipelet createPipeletInstance(final AnyMap parameters) throws ProcessingException,
InstantiationException, IllegalAccessException {
final String pipeletName = getPipeletName(parameters);
final Class<? extends Pipelet> pipeletClass = _pipeletTracker.getRegisteredPipelets().get(pipeletName);
if (pipeletClass == null) {
throw new ProcessingException("Pipelet '" + pipeletName + "' is not available currently.", false);
}
final Pipelet pipelet = pipeletClass.newInstance();
pipelet.configure(parameters);
return pipelet;
}
/**
* process records. If processing throws a recoverable exception it is passed through so that the task can be retried
* and may succeed then. Non-recoverable exceptions are catched and logged as warnings to the task log.
*
* @return true, if processing was successful, false if a non-recoverable exception occurred.
*/
private boolean processRecord(final Blackboard blackboard, final String recordId, final Pipelet pipelet,
final RecordOutput recordOutput, final TaskContext taskContext) throws Exception {
try {
final String[] resultIds = pipelet.process(blackboard, new String[] { recordId });
writeResultRecords(blackboard, resultIds, recordOutput, taskContext);
return true;
} catch (final ProcessingException ex) {
if (ex.isRecoverable()) {
throw ex;
} else {
taskContext.getLog().warn("Failed to process record " + recordId + ", skipping it.", ex);
return false;
}
}
}
/**
* @return value of pipeletName parameter.
* @throws IllegalArgumentException
* if parameter is not set.
*/
private String getPipeletName(final AnyMap parameters) {
final String pipeletName = parameters.getStringValue(KEY_PIPELET_NAME);
if (pipeletName == null || pipeletName.isEmpty()) {
throw new IllegalArgumentException("Pipeline name parameter '" + KEY_PIPELET_NAME + "' is not set.");
}
if (_pipeletTracker.getRegisteredPipelets().get(pipeletName) == null) {
throw new IllegalArgumentException("Configured pipelet '" + pipeletName + "' doesn't exist.");
}
return pipeletName;
}
/** set pipelet tracker reference (used by DS). */
public void setPipeletTracker(final PipeletTracker pipeletTracker) {
_pipeletTracker = pipeletTracker;
}
/** remove pipelet tracker reference (used by DS). */
public void unsetPipeletTracker(final PipeletTracker pipeletTracker) {
if (_pipeletTracker == pipeletTracker) {
_pipeletTracker = null;
}
}
}