blob: 261a4603b5ccd2d4bafb97d9d4e2a88b8cb83a95 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved.
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*********************************************************************************************************************/
package org.eclipse.smila.processing.pipelets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.processing.util.ProcessingConstants;
import org.eclipse.smila.processing.util.ResultCollector;
/**
* Executes a program using {@link Runtime#exec(String)}.
*
* Copyright (c) 2012 Attensity Europe GmbH
*
* @author Tobias Liefke
*/
public class ExecPipelet implements Pipelet {
/** The name of the property that contains the program (including its path). */
public static final String COMMAND_PROPERTY = "command";
/** The name of the property that contains the working directory for the command. */
public static final String DIRECTORY_PROPERTY = "directory";
/**
* The name of the property that contains the parameters to execute (ignored if the content of the parameter attribute
* exists).
*/
public static final String PARAMETERS_PROPERTY = "parameters";
/** The name of the property that contains the attribute name of the parameters to execute. */
public static final String PARAMETERS_ATTRIBUTE_PROPERTY = "parametersAttribute";
/** The name of the property that contains the name of the input attribute or attachment for the program. */
public static final String INPUT_ATTACHMENT_PROPERTY = "inputAttachment";
/** The name of the property that contains the name of attachment that receives the standard output of the program. */
public static final String OUTPUT_ATTACHMENT_PROPERTY = "outputAttachment";
/** The name of the property that contains the name of attachment that receives the error output of the program. */
public static final String ERROR_ATTACHMENT_PROPERTY = "errorAttachment";
/** The name of the property that contains the name of attribute that receives the exit code of the program. */
public static final String EXIT_CODE_ATTRIBUTE_PROPERTY = "exitCodeAttribute";
/** The name of the property that indicates to mark a record as failed if the program returns an error code. */
public static final String FAIL_ON_ERROR_PROPERTY = "failOnError";
/** Our log file. */
private final Log _log = LogFactory.getLog(getClass());
/** The current configuration. */
private AnyMap _config;
/** The path to the program to execute. */
private String _command;
/**
* @see Pipelet#configure(AnyMap)
*/
@Override
public void configure(AnyMap configuration) throws ProcessingException {
this._config = configuration;
// Initialize the command
final ParameterAccessor paramAccessor = new ParameterAccessor(configuration);
_command = paramAccessor.getRequiredParameter(COMMAND_PROPERTY);
}
/**
* @see Pipelet#process(Blackboard, String[])
*/
@Override
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
final ParameterAccessor paramAccessor = new ParameterAccessor(_config);
final ResultCollector resultCollector =
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
// Remember command array, if it is the same for every record
String[] cmdArray = { _command };
Any previousParameters = null;
// Iterate over all record
for (String recordId : recordIds) {
try {
paramAccessor.setCurrentRecord(recordId);
// Find parameters and build command line (if it has changed)
final String parametersAttribute = paramAccessor.getParameter(PARAMETERS_ATTRIBUTE_PROPERTY, null);
Any parameters = null;
if (parametersAttribute != null) {
parameters = blackboard.getMetadata(recordId).get(parametersAttribute);
}
if (parameters == null) {
parameters = paramAccessor.getParameterAny(PARAMETERS_PROPERTY);
}
if (parameters != previousParameters) {
if (parameters == null) {
cmdArray = new String[] { _command };
} else {
final List<String> list = new ArrayList<String>();
list.add(_command);
if (parameters.isValue()) {
for (final StringTokenizer tokenizer = new StringTokenizer(_command); tokenizer.hasMoreTokens();) {
list.add(tokenizer.nextToken());
}
} else if (parameters.isSeq()) {
list.addAll(parameters.asSeq().asStrings());
}
cmdArray = list.toArray(new String[list.size()]);
}
previousParameters = parameters;
}
// Resolve working directory
final String directory = paramAccessor.getParameter(DIRECTORY_PROPERTY, null);
final File wd = directory == null ? null : new File(directory);
// Execute program
final Process process = Runtime.getRuntime().exec(cmdArray, null, wd);
// Attach input to the program (if any)
sendInput(blackboard, paramAccessor, recordId, process);
// Attach output to the blackboard (if required) and wait for the end of the process
receiveOutput(blackboard, paramAccessor, recordId, process, cmdArray);
resultCollector.addResult(recordId);
} catch (final Exception e) {
resultCollector.addFailedResult(recordId, e instanceof ProcessingException ? e : new ProcessingException(
"Error in ExecPipelet processing id " + recordId, e));
}
}
return recordIds;
}
/**
* Read the output of the process and wait for its end.
*/
private void receiveOutput(Blackboard blackboard, final ParameterAccessor paramAccessor, String recordId,
final Process process, String[] cmdArray) throws InterruptedException, BlackboardAccessException,
UnsupportedEncodingException, ProcessingException {
// Read all the standard output (or dump it - if it is not required)
final ProcessStreamThread outputThread =
new ProcessStreamThread(process.getInputStream(), paramAccessor.getParameter(OUTPUT_ATTACHMENT_PROPERTY,
null));
outputThread.start();
// Read all the error output
final ProcessStreamThread errorThread =
new ProcessStreamThread(process.getErrorStream(), paramAccessor.getParameter(ERROR_ATTACHMENT_PROPERTY,
null));
// Check if we need to capture the error output
final Any failOnError = paramAccessor.getParameterAny(FAIL_ON_ERROR_PROPERTY);
if (failOnError != null) {
// Store the error output for any error messages
if (errorThread._outputAttachment == null) {
errorThread._out = new ByteArrayOutputStream();
}
}
errorThread.start();
// Wait until the process has finished
final int exitValue = process.waitFor();
final String exitCodeAttribute = paramAccessor.getParameter(EXIT_CODE_ATTRIBUTE_PROPERTY, null);
if (exitCodeAttribute != null) {
blackboard.getMetadata(recordId).put(exitCodeAttribute, exitValue);
}
// Save all requested attachments
outputThread.save(blackboard, recordId);
errorThread.save(blackboard, recordId);
// Check if we need to fail if the program has a specific exit code
if (contains(failOnError, exitValue)) {
String message = "Execution of\n " + Arrays.toString(cmdArray) + "\nfailed with exit code " + exitValue;
if (errorThread._out instanceof ByteArrayOutputStream) {
errorThread.waitFor();
final ByteArrayOutputStream out = (ByteArrayOutputStream) errorThread._out;
if (out.size() > 0) {
message += "\nMessage:\n" + new String(out.toByteArray(), "ISO-8859-1");
}
}
throw new ProcessingException(message);
}
}
/**
* Attach the provided input (if any) to the program.
*/
private void sendInput(Blackboard blackboard, final ParameterAccessor paramAccessor, String recordId,
final Process process) throws BlackboardAccessException, UnsupportedEncodingException {
final String inputAttribute = paramAccessor.getParameter(INPUT_ATTACHMENT_PROPERTY, null);
if (inputAttribute != null) {
final InputStream in = blackboard.getAttachmentAsStream(recordId, inputAttribute);
if (in != null) {
new ProcessStreamThread(in, process.getOutputStream()).start();
} else {
final String input = blackboard.getMetadata(recordId).getStringValue(inputAttribute);
if (input != null) {
new ProcessStreamThread(new ByteArrayInputStream(input.getBytes("ISO-8859-1")),
process.getOutputStream()).start();
}
}
}
}
/**
* Checks if the given parameter contains the given value, either as explicit number, or in a list of values.
*/
private boolean contains(final Any parameter, final int value) {
if (parameter == null) {
return false;
}
if (parameter.isBoolean()) {
return parameter.asValue().asBoolean() && value != 0;
}
for (Any entry : parameter) {
if (entry.isNumber()) {
if (entry.asValue().asLong() == value) {
return true;
}
} else if (entry.isString() && contains(entry.toString(), value)) {
return true;
}
}
return false;
}
/**
* Checks if the given range contains the given value.
*/
private boolean contains(final String range, final int value) {
final int dash = range.indexOf('-');
if (dash < 0) {
return Integer.parseInt(range) == value;
} else {
final int min = dash == 0 ? Integer.MIN_VALUE : Integer.parseInt(range.substring(0, dash));
final int max = dash + 1 == range.length() ? Integer.MAX_VALUE : Integer.parseInt(range.substring(dash + 1));
return value >= min && value <= max;
}
}
/** Used to read from/write to the streams of the process in extra threads. */
private final class ProcessStreamThread extends Thread {
/** The stream to read from. */
private final InputStream _in;
/** The stream to write to. */
private OutputStream _out;
/** The name of the attachment that will receive the found bytes. */
private final String _outputAttachment;
/**
* Creates a new instance of ProcessStreamThread.
*
* @param in
* contains the standard/error output of the process
* @param outputAttachment
* the name of the attachment on the blackboard that receives the bytes at the end
*/
private ProcessStreamThread(InputStream in, String outputAttachment) {
this._in = in;
this._outputAttachment = outputAttachment;
if (outputAttachment != null) {
this._out = new ByteArrayOutputStream();
} else {
this._out = null;
}
}
/**
* Creates a new instance of ProcessStreamThread.
*
* @param in
* the stream that contains bytes
* @param out
* the stream that receives the bytes
*/
private ProcessStreamThread(InputStream in, OutputStream out) {
this._in = in;
this._out = out;
this._outputAttachment = null;
}
@Override
public void run() {
try {
for (int i = _in.read(); i >= 0; i = _in.read()) {
if (_out != null) {
_out.write(i);
}
}
} catch (IOException e) {
// Ignore and stop
_log.warn("Error during evaluation of the output of an executed proces.", e);
}
}
/**
* Save the recorded bytes to the configured attachment (if any).
*
* @param blackboard
* the current blackboard
* @param recordId
* the current record id
* @throws BlackboardAccessException
* if the blackboard throws one during access
*/
public void save(Blackboard blackboard, String recordId) throws BlackboardAccessException {
if (_outputAttachment != null) {
waitFor();
blackboard.setAttachment(recordId, _outputAttachment, ((ByteArrayOutputStream) _out).toByteArray());
}
}
/**
* Wait until this thread is finished.
*/
private void waitFor() {
try {
this.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}