| /********************************************************************************************************************* |
| * Copyright (c) 2008, 2012 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the |
| * accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this |
| * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.processing.worker; |
| |
| import java.io.IOException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.blackboard.Blackboard; |
| import org.eclipse.smila.blackboard.BlackboardAccessException; |
| import org.eclipse.smila.blackboard.BlackboardFactory; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.InvalidValueTypeException; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.objectstore.ObjectStoreException; |
| import org.eclipse.smila.processing.ProcessingException; |
| import org.eclipse.smila.processing.util.ProcessingConstants; |
| import org.eclipse.smila.taskworker.TaskContext; |
| import org.eclipse.smila.taskworker.Worker; |
| import org.eclipse.smila.taskworker.input.RecordInput; |
| import org.eclipse.smila.taskworker.output.RecordOutput; |
| |
| /** |
| * Common stuff for PipeletProcessorWorker and PipelineProcessorWorker. |
| */ |
| public abstract class ProcessingWorker implements Worker { |
| |
| /** the workers input slot name . */ |
| public static final String INPUT_SLOT_NAME = "input"; |
| |
| /** the workers output slot name . */ |
| public static final String OUTPUT_SLOT_NAME = "output"; |
| |
| /** name for "keep attachments in memory" parameter. */ |
| public static final String KEY_KEEPATTACHMENTSINMEMORY = "keepAttachmentsInMemory"; |
| |
| /** name for "write attachments to output" parameter. */ |
| public static final String KEY_WRITEATTACHMENTSTOOUTPUT = "writeAttachmentsToOutput"; |
| |
| /** flag if the blackboard warning has already been logged to prevent excessive logging. */ |
| static boolean s_noBinStorageAvailableLogged; |
| |
| /** protected log. */ |
| protected final Log _log = LogFactory.getLog(getClass()); |
| |
| /** blackboard factory. Set by DS */ |
| protected BlackboardFactory _blackboardFactory; |
| |
| /** use bin storage? */ |
| protected boolean _binStorageAvailable = true; |
| |
| /** |
| * @param parameters |
| * task parameters, converted to an AnyMap |
| * @param recordInput |
| * input bulk |
| * @param recordOutput |
| * output bulk, can be null |
| * @param taskContext |
| * task context |
| * @return true if at least one record was processed successfully. |
| * @throws Exception |
| * bulk could not be processed |
| */ |
| public abstract boolean perform(AnyMap parameters, RecordInput recordInput, RecordOutput recordOutput, |
| TaskContext taskContext) throws Exception; |
| |
| @Override |
| public void perform(final TaskContext taskContext) throws Exception { |
| final AnyMap parameters = getTaskParameters(taskContext); |
| final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT_NAME); |
| final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT_NAME); |
| _binStorageAvailable = true; |
| final boolean success = perform(parameters, recordInput, recordOutput, taskContext); |
| if (!success) { |
| throw new ProcessingException("None of the records of task " + taskContext.getTask() |
| + " could be successfully processed, have a look at the log for details."); |
| } |
| } |
| |
| /** create AnyMap with task parameters. */ |
| private AnyMap getTaskParameters(final TaskContext taskContext) { |
| final AnyMap parameterMap = DataFactory.DEFAULT.cloneAnyMap(taskContext.getTask().getParameters()); |
| if (parameterMap.get(ProcessingConstants.KEY_FAIL_ON_ERROR) == null) { |
| parameterMap.put(ProcessingConstants.KEY_FAIL_ON_ERROR, "false"); |
| } |
| return parameterMap; |
| } |
| |
| /** |
| * append the resulting records to the bulk. Errors on blackboard access are catched and logged as warnings to the |
| * task log, as they are considered as record-specific non-recoverable errors. In any case, the blackboard is emptied |
| * afterwards and attachments should be removed from binary storage (if used). |
| * |
| * @param recordOutput |
| * where to write the records. Can be null (is optional in worker description) |
| */ |
| protected void writeResultRecords(final Blackboard blackboard, final String[] resultIds, |
| final RecordOutput recordOutput, final TaskContext taskContext) throws ObjectStoreException, IOException { |
| final boolean withAttachments = shouldWriteAttachments(taskContext); |
| if (resultIds != null && recordOutput != null) { |
| for (final String resultId : resultIds) { |
| try { |
| final Record record = blackboard.getRecordCopy(resultId, withAttachments); |
| recordOutput.writeRecord(record); |
| } catch (final BlackboardAccessException ex) { |
| taskContext.getLog().warn("Failed to read result record " + resultId + " from blackboard, skipping it.", |
| ex); |
| } |
| } |
| } |
| } |
| |
| protected void cleanupBlackboard(final Blackboard blackboard, final TaskContext taskContext) { |
| try { |
| blackboard.removeAll(); |
| blackboard.commit(); |
| } catch (final BlackboardAccessException ex) { |
| taskContext.getLog().warn( |
| "Error while cleaning up blackboard. Attachments may be leftover in BinaryStorage.", ex); |
| } |
| } |
| |
| /** get parameter {@link #KEY_WRITEATTACHMENTSTOOUTPUT}, default value is true. */ |
| private boolean shouldWriteAttachments(final TaskContext taskContext) { |
| final Boolean writeAttachments = |
| taskContext.getTask().getParameters().getBooleanValue(KEY_WRITEATTACHMENTSTOOUTPUT); |
| return writeAttachments == null ? true : writeAttachments; |
| } |
| |
| /** Creates the blackboard to hold the records during processing */ |
| protected Blackboard getBlackboard(final TaskContext taskContext) { |
| try { |
| return _blackboardFactory.createBlackboard(false, useBinaryStorage(taskContext)); |
| } catch (final BlackboardAccessException e) { |
| return getTransientBlackboard(); |
| } |
| } |
| |
| /** create a pure transient blackboard, and keep in mind that we don't currently have a bin storage available. */ |
| private Blackboard getTransientBlackboard() { |
| _binStorageAvailable = false; |
| if (!s_noBinStorageAvailableLogged) { |
| _log.warn("Could not get a blackboard with binary storage, using a transient one. " |
| + "Attachments won't be accessible by the worker."); |
| s_noBinStorageAvailableLogged = true; |
| } |
| return _blackboardFactory.createTransientBlackboard(); |
| } |
| |
| /** |
| * evaluate {@value #KEY_KEEPATTACHMENTSINMEMORY} parameter to determine if attachments should be written to |
| * binstorage. If we already know that there is no binstorage available, the parameter does not have an effect. If the |
| * paramater is not set, the method returns "false" |
| */ |
| private boolean useBinaryStorage(final TaskContext taskContext) { |
| if (_binStorageAvailable) { |
| try { |
| final Boolean keepInMemory = |
| taskContext.getTask().getParameters().getBooleanValue(KEY_KEEPATTACHMENTSINMEMORY); |
| if (keepInMemory != null) { |
| return !keepInMemory; |
| } |
| } catch (final InvalidValueTypeException ex) { |
| ; // ignore. |
| } |
| } |
| return false; // default: don't use binstorage because attachments should be kept in memory |
| } |
| |
| /** set blackboard factory reference (used by DS). */ |
| public void setBlackboardFactory(final BlackboardFactory factory) { |
| _blackboardFactory = factory; |
| } |
| |
| /** remove blackboard factory reference (used by DS). */ |
| public void unsetBlackboardFactory(final BlackboardFactory factory) { |
| if (_blackboardFactory == factory) { |
| _blackboardFactory = null; |
| } |
| } |
| } |