| /********************************************************************************************************************** |
| * Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the |
| * accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this |
| * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: Drazen Cindric (Attensity Europe GmbH) - initial implementation |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.processing.worker; |
| |
| import org.eclipse.smila.blackboard.Blackboard; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.processing.Pipelet; |
| import org.eclipse.smila.processing.PipeletTracker; |
| import org.eclipse.smila.processing.ProcessingException; |
| import org.eclipse.smila.taskworker.TaskContext; |
| import org.eclipse.smila.taskworker.input.RecordInput; |
| import org.eclipse.smila.taskworker.output.RecordOutput; |
| |
| /** |
| * A worker that is able to execute a single pipelet directly, without pipeline overhead. |
| */ |
| public class PipeletProcessorWorker extends ProcessingWorker { |
| |
| /** worker's name. */ |
| public static final String WORKER_NAME = "pipeletProcessor"; |
| |
| /** key for the pipelets name. */ |
| public static final String KEY_PIPELET_NAME = "pipeletName"; |
| |
| /** pipelet tracker. */ |
| private PipeletTracker _pipeletTracker; |
| |
| /** @return {@value #WORKER_NAME} */ |
| @Override |
| public String getName() { |
| return WORKER_NAME; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public boolean perform(final AnyMap parameters, final RecordInput recordInput, final RecordOutput recordOutput, |
| final TaskContext taskContext) throws Exception { |
| final Pipelet pipelet = createPipeletInstance(parameters); |
| Record record = null; |
| boolean success = false; // to check if at least some records were processed successful |
| do { |
| record = recordInput.getRecord(); |
| if (record != null && !taskContext.isCanceled()) { |
| final Blackboard blackboard = getBlackboard(taskContext); |
| try { |
| blackboard.setRecord(record); |
| success |= processRecord(blackboard, record.getId(), pipelet, recordOutput, taskContext); |
| } finally { |
| cleanupBlackboard(blackboard, taskContext); |
| } |
| } |
| } while (record != null && !taskContext.isCanceled()); |
| return success; |
| } |
| |
| /** create instance of pipelet and configure it. */ |
| private Pipelet createPipeletInstance(final AnyMap parameters) throws ProcessingException, |
| InstantiationException, IllegalAccessException { |
| final String pipeletName = getPipeletName(parameters); |
| final Class<? extends Pipelet> pipeletClass = _pipeletTracker.getRegisteredPipelets().get(pipeletName); |
| if (pipeletClass == null) { |
| throw new ProcessingException("Pipelet '" + pipeletName + "' is not available currently.", false); |
| } |
| final Pipelet pipelet = pipeletClass.newInstance(); |
| pipelet.configure(parameters); |
| return pipelet; |
| } |
| |
| /** |
| * process records. If processing throws a recoverable exception it is passed through so that the task can be retried |
| * and may succeed then. Non-recoverable exceptions are catched and logged as warnings to the task log. |
| * |
| * @return true, if processing was successful, false if a non-recoverable exception occurred. |
| */ |
| private boolean processRecord(final Blackboard blackboard, final String recordId, final Pipelet pipelet, |
| final RecordOutput recordOutput, final TaskContext taskContext) throws Exception { |
| try { |
| final String[] resultIds = pipelet.process(blackboard, new String[] { recordId }); |
| writeResultRecords(blackboard, resultIds, recordOutput, taskContext); |
| return true; |
| } catch (final ProcessingException ex) { |
| if (ex.isRecoverable()) { |
| throw ex; |
| } else { |
| taskContext.getLog().warn("Failed to process record " + recordId + ", skipping it.", ex); |
| return false; |
| } |
| } |
| } |
| |
| /** |
| * @return value of pipeletName parameter. |
| * @throws IllegalArgumentException |
| * if parameter is not set. |
| */ |
| private String getPipeletName(final AnyMap parameters) { |
| final String pipeletName = parameters.getStringValue(KEY_PIPELET_NAME); |
| if (pipeletName == null || pipeletName.isEmpty()) { |
| throw new IllegalArgumentException("Pipeline name parameter '" + KEY_PIPELET_NAME + "' is not set."); |
| } |
| if (_pipeletTracker.getRegisteredPipelets().get(pipeletName) == null) { |
| throw new IllegalArgumentException("Configured pipelet '" + pipeletName + "' doesn't exist."); |
| } |
| return pipeletName; |
| } |
| |
| /** set pipelet tracker reference (used by DS). */ |
| public void setPipeletTracker(final PipeletTracker pipeletTracker) { |
| _pipeletTracker = pipeletTracker; |
| } |
| |
| /** remove pipelet tracker reference (used by DS). */ |
| public void unsetPipeletTracker(final PipeletTracker pipeletTracker) { |
| if (_pipeletTracker == pipeletTracker) { |
| _pipeletTracker = null; |
| } |
| } |
| |
| } |