blob: b383a53262f6ed2ab755379ff18f3903c59b9d55 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2012 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.processing.worker;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.blackboard.BlackboardFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.InvalidValueTypeException;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.util.ProcessingConstants;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.RecordOutput;
/**
* Common stuff for PipeletProcessorWorker and PipelineProcessorWorker.
*/
public abstract class ProcessingWorker implements Worker {
/** the workers input slot name . */
public static final String INPUT_SLOT_NAME = "input";
/** the workers output slot name . */
public static final String OUTPUT_SLOT_NAME = "output";
/** name for "keep attachments in memory" parameter. */
public static final String KEY_KEEPATTACHMENTSINMEMORY = "keepAttachmentsInMemory";
/** name for "write attachments to output" parameter. */
public static final String KEY_WRITEATTACHMENTSTOOUTPUT = "writeAttachmentsToOutput";
/** flag if the blackboard warning has already been logged to prevent excessive logging. */
static boolean s_noBinStorageAvailableLogged;
/** protected log. */
protected final Log _log = LogFactory.getLog(getClass());
/** blackboard factory. Set by DS */
protected BlackboardFactory _blackboardFactory;
/** use bin storage? */
protected boolean _binStorageAvailable = true;
/**
* @param parameters
* task parameters, converted to an AnyMap
* @param recordInput
* input bulk
* @param recordOutput
* output bulk, can be null
* @param taskContext
* task context
* @return true if at least one record was processed successfully.
* @throws Exception
* bulk could not be processed
*/
public abstract boolean perform(AnyMap parameters, RecordInput recordInput, RecordOutput recordOutput,
TaskContext taskContext) throws Exception;
@Override
public void perform(final TaskContext taskContext) throws Exception {
final AnyMap parameters = getTaskParameters(taskContext);
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT_NAME);
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT_NAME);
_binStorageAvailable = true;
final boolean success = perform(parameters, recordInput, recordOutput, taskContext);
if (!success) {
throw new ProcessingException("None of the records of task " + taskContext.getTask()
+ " could be successfully processed, have a look at the log for details.");
}
}
/** create AnyMap with task parameters. */
private AnyMap getTaskParameters(final TaskContext taskContext) {
final AnyMap parameterMap = DataFactory.DEFAULT.cloneAnyMap(taskContext.getTask().getParameters());
if (parameterMap.get(ProcessingConstants.KEY_FAIL_ON_ERROR) == null) {
parameterMap.put(ProcessingConstants.KEY_FAIL_ON_ERROR, "false");
}
return parameterMap;
}
/**
* append the resulting records to the bulk. Errors on blackboard access are catched and logged as warnings to the
* task log, as they are considered as record-specific non-recoverable errors. In any case, the blackboard is emptied
* afterwards and attachments should be removed from binary storage (if used).
*
* @param recordOutput
* where to write the records. Can be null (is optional in worker description)
*/
protected void writeResultRecords(final Blackboard blackboard, final String[] resultIds,
final RecordOutput recordOutput, final TaskContext taskContext) throws ObjectStoreException, IOException {
final boolean withAttachments = shouldWriteAttachments(taskContext);
if (resultIds != null && recordOutput != null) {
for (final String resultId : resultIds) {
try {
final Record record = blackboard.getRecordCopy(resultId, withAttachments);
recordOutput.writeRecord(record);
} catch (final BlackboardAccessException ex) {
taskContext.getLog().warn("Failed to read result record " + resultId + " from blackboard, skipping it.",
ex);
}
}
}
}
protected void cleanupBlackboard(final Blackboard blackboard, final TaskContext taskContext) {
try {
blackboard.removeAll();
blackboard.commit();
} catch (final BlackboardAccessException ex) {
taskContext.getLog().warn(
"Error while cleaning up blackboard. Attachments may be leftover in BinaryStorage.", ex);
}
}
/** get parameter {@link #KEY_WRITEATTACHMENTSTOOUTPUT}, default value is true. */
private boolean shouldWriteAttachments(final TaskContext taskContext) {
final Boolean writeAttachments =
taskContext.getTask().getParameters().getBooleanValue(KEY_WRITEATTACHMENTSTOOUTPUT);
return writeAttachments == null ? true : writeAttachments;
}
/** Creates the blackboard to hold the records during processing */
protected Blackboard getBlackboard(final TaskContext taskContext) {
try {
return _blackboardFactory.createBlackboard(false, useBinaryStorage(taskContext));
} catch (final BlackboardAccessException e) {
return getTransientBlackboard();
}
}
/** create a pure transient blackboard, and keep in mind that we don't currently have a bin storage available. */
private Blackboard getTransientBlackboard() {
_binStorageAvailable = false;
if (!s_noBinStorageAvailableLogged) {
_log.warn("Could not get a blackboard with binary storage, using a transient one. "
+ "Attachments won't be accessible by the worker.");
s_noBinStorageAvailableLogged = true;
}
return _blackboardFactory.createTransientBlackboard();
}
/**
* evaluate {@value #KEY_KEEPATTACHMENTSINMEMORY} parameter to determine if attachments should be written to
* binstorage. If we already know that there is no binstorage available, the parameter does not have an effect. If the
* paramater is not set, the method returns "false"
*/
private boolean useBinaryStorage(final TaskContext taskContext) {
if (_binStorageAvailable) {
try {
final Boolean keepInMemory =
taskContext.getTask().getParameters().getBooleanValue(KEY_KEEPATTACHMENTSINMEMORY);
if (keepInMemory != null) {
return !keepInMemory;
}
} catch (final InvalidValueTypeException ex) {
; // ignore.
}
}
return false; // default: don't use binstorage because attachments should be kept in memory
}
/** set blackboard factory reference (used by DS). */
public void setBlackboardFactory(final BlackboardFactory factory) {
_blackboardFactory = factory;
}
/** remove blackboard factory reference (used by DS). */
public void unsetBlackboardFactory(final BlackboardFactory factory) {
if (_blackboardFactory == factory) {
_blackboardFactory = null;
}
}
}