/********************************************************************************************************************* | |
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. | |
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 | |
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
*********************************************************************************************************************/ | |
package org.eclipse.smila.processing.pipelets; | |
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.io.UnsupportedEncodingException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.StringTokenizer; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.blackboard.BlackboardAccessException; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.processing.Pipelet; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.parameters.ParameterAccessor; | |
import org.eclipse.smila.processing.util.ProcessingConstants; | |
import org.eclipse.smila.processing.util.ResultCollector; | |
/** | |
* Executes a program using {@link Runtime#exec(String)}. | |
* | |
* Copyright (c) 2012 Attensity Europe GmbH | |
* | |
* @author Tobias Liefke | |
*/ | |
public class ExecPipelet implements Pipelet { | |
/** The name of the property that contains the program (including its path). */ | |
public static final String COMMAND_PROPERTY = "command"; | |
/** The name of the property that contains the working directory for the command. */ | |
public static final String DIRECTORY_PROPERTY = "directory"; | |
/** | |
* The name of the property that contains the parameters to execute (ignored if the content of the parameter attribute | |
* exists). | |
*/ | |
public static final String PARAMETERS_PROPERTY = "parameters"; | |
/** The name of the property that contains the attribute name of the parameters to execute. */ | |
public static final String PARAMETERS_ATTRIBUTE_PROPERTY = "parametersAttribute"; | |
/** The name of the property that contains the name of the input attribute or attachment for the program. */ | |
public static final String INPUT_ATTACHMENT_PROPERTY = "inputAttachment"; | |
/** The name of the property that contains the name of attachment that receives the standard output of the program. */ | |
public static final String OUTPUT_ATTACHMENT_PROPERTY = "outputAttachment"; | |
/** The name of the property that contains the name of attachment that receives the error output of the program. */ | |
public static final String ERROR_ATTACHMENT_PROPERTY = "errorAttachment"; | |
/** The name of the property that contains the name of attribute that receives the exit code of the program. */ | |
public static final String EXIT_CODE_ATTRIBUTE_PROPERTY = "exitCodeAttribute"; | |
/** The name of the property that indicates to mark a record as failed if the program returns an error code. */ | |
public static final String FAIL_ON_ERROR_PROPERTY = "failOnError"; | |
/** Our log file. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** The current configuration. */ | |
private AnyMap _config; | |
/** The path to the program to execute. */ | |
private String _command; | |
/** | |
* @see Pipelet#configure(AnyMap) | |
*/ | |
@Override | |
public void configure(AnyMap configuration) throws ProcessingException { | |
this._config = configuration; | |
// Initialize the command | |
final ParameterAccessor paramAccessor = new ParameterAccessor(configuration); | |
_command = paramAccessor.getRequiredParameter(COMMAND_PROPERTY); | |
} | |
/** | |
* @see Pipelet#process(Blackboard, String[]) | |
*/ | |
@Override | |
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException { | |
final ParameterAccessor paramAccessor = new ParameterAccessor(_config); | |
final ResultCollector resultCollector = | |
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT); | |
// Remember command array, if it is the same for every record | |
String[] cmdArray = { _command }; | |
Any previousParameters = null; | |
// Iterate over all record | |
for (String recordId : recordIds) { | |
try { | |
paramAccessor.setCurrentRecord(recordId); | |
// Find parameters and build command line (if it has changed) | |
final String parametersAttribute = paramAccessor.getParameter(PARAMETERS_ATTRIBUTE_PROPERTY, null); | |
Any parameters = null; | |
if (parametersAttribute != null) { | |
parameters = blackboard.getMetadata(recordId).get(parametersAttribute); | |
} | |
if (parameters == null) { | |
parameters = paramAccessor.getParameterAny(PARAMETERS_PROPERTY); | |
} | |
if (parameters != previousParameters) { | |
if (parameters == null) { | |
cmdArray = new String[] { _command }; | |
} else { | |
final List<String> list = new ArrayList<String>(); | |
list.add(_command); | |
if (parameters.isValue()) { | |
for (final StringTokenizer tokenizer = new StringTokenizer(_command); tokenizer.hasMoreTokens();) { | |
list.add(tokenizer.nextToken()); | |
} | |
} else if (parameters.isSeq()) { | |
list.addAll(parameters.asSeq().asStrings()); | |
} | |
cmdArray = list.toArray(new String[list.size()]); | |
} | |
previousParameters = parameters; | |
} | |
// Resolve working directory | |
final String directory = paramAccessor.getParameter(DIRECTORY_PROPERTY, null); | |
final File wd = directory == null ? null : new File(directory); | |
// Execute program | |
final Process process = Runtime.getRuntime().exec(cmdArray, null, wd); | |
// Attach input to the program (if any) | |
sendInput(blackboard, paramAccessor, recordId, process); | |
// Attach output to the blackboard (if required) and wait for the end of the process | |
receiveOutput(blackboard, paramAccessor, recordId, process, cmdArray); | |
resultCollector.addResult(recordId); | |
} catch (final Exception e) { | |
resultCollector.addFailedResult(recordId, e instanceof ProcessingException ? e : new ProcessingException( | |
"Error in ExecPipelet processing id " + recordId, e)); | |
} | |
} | |
return recordIds; | |
} | |
/** | |
* Read the output of the process and wait for its end. | |
*/ | |
private void receiveOutput(Blackboard blackboard, final ParameterAccessor paramAccessor, String recordId, | |
final Process process, String[] cmdArray) throws InterruptedException, BlackboardAccessException, | |
UnsupportedEncodingException, ProcessingException { | |
// Read all the standard output (or dump it - if it is not required) | |
final ProcessStreamThread outputThread = | |
new ProcessStreamThread(process.getInputStream(), paramAccessor.getParameter(OUTPUT_ATTACHMENT_PROPERTY, | |
null)); | |
outputThread.start(); | |
// Read all the error output | |
final ProcessStreamThread errorThread = | |
new ProcessStreamThread(process.getErrorStream(), paramAccessor.getParameter(ERROR_ATTACHMENT_PROPERTY, | |
null)); | |
// Check if we need to capture the error output | |
final Any failOnError = paramAccessor.getParameterAny(FAIL_ON_ERROR_PROPERTY); | |
if (failOnError != null) { | |
// Store the error output for any error messages | |
if (errorThread._outputAttachment == null) { | |
errorThread._out = new ByteArrayOutputStream(); | |
} | |
} | |
errorThread.start(); | |
// Wait until the process has finished | |
final int exitValue = process.waitFor(); | |
final String exitCodeAttribute = paramAccessor.getParameter(EXIT_CODE_ATTRIBUTE_PROPERTY, null); | |
if (exitCodeAttribute != null) { | |
blackboard.getMetadata(recordId).put(exitCodeAttribute, exitValue); | |
} | |
// Save all requested attachments | |
outputThread.save(blackboard, recordId); | |
errorThread.save(blackboard, recordId); | |
// Check if we need to fail if the program has a specific exit code | |
if (contains(failOnError, exitValue)) { | |
String message = "Execution of\n " + Arrays.toString(cmdArray) + "\nfailed with exit code " + exitValue; | |
if (errorThread._out instanceof ByteArrayOutputStream) { | |
errorThread.waitFor(); | |
final ByteArrayOutputStream out = (ByteArrayOutputStream) errorThread._out; | |
if (out.size() > 0) { | |
message += "\nMessage:\n" + new String(out.toByteArray(), "ISO-8859-1"); | |
} | |
} | |
throw new ProcessingException(message); | |
} | |
} | |
/** | |
* Attach the provided input (if any) to the program. | |
*/ | |
private void sendInput(Blackboard blackboard, final ParameterAccessor paramAccessor, String recordId, | |
final Process process) throws BlackboardAccessException, UnsupportedEncodingException { | |
final String inputAttribute = paramAccessor.getParameter(INPUT_ATTACHMENT_PROPERTY, null); | |
if (inputAttribute != null) { | |
final InputStream in = blackboard.getAttachmentAsStream(recordId, inputAttribute); | |
if (in != null) { | |
new ProcessStreamThread(in, process.getOutputStream()).start(); | |
} else { | |
final String input = blackboard.getMetadata(recordId).getStringValue(inputAttribute); | |
if (input != null) { | |
new ProcessStreamThread(new ByteArrayInputStream(input.getBytes("ISO-8859-1")), | |
process.getOutputStream()).start(); | |
} | |
} | |
} | |
} | |
/** | |
* Checks if the given parameter contains the given value, either as explicit number, or in a list of values. | |
*/ | |
private boolean contains(final Any parameter, final int value) { | |
if (parameter == null) { | |
return false; | |
} | |
if (parameter.isBoolean()) { | |
return parameter.asValue().asBoolean() && value != 0; | |
} | |
for (Any entry : parameter) { | |
if (entry.isNumber()) { | |
if (entry.asValue().asLong() == value) { | |
return true; | |
} | |
} else if (entry.isString() && contains(entry.toString(), value)) { | |
return true; | |
} | |
} | |
return false; | |
} | |
/** | |
* Checks if the given range contains the given value. | |
*/ | |
private boolean contains(final String range, final int value) { | |
final int dash = range.indexOf('-'); | |
if (dash < 0) { | |
return Integer.parseInt(range) == value; | |
} else { | |
final int min = dash == 0 ? Integer.MIN_VALUE : Integer.parseInt(range.substring(0, dash)); | |
final int max = dash + 1 == range.length() ? Integer.MAX_VALUE : Integer.parseInt(range.substring(dash + 1)); | |
return value >= min && value <= max; | |
} | |
} | |
/** Used to read from/write to the streams of the process in extra threads. */ | |
private final class ProcessStreamThread extends Thread { | |
/** The stream to read from. */ | |
private final InputStream _in; | |
/** The stream to write to. */ | |
private OutputStream _out; | |
/** The name of the attachment that will receive the found bytes. */ | |
private final String _outputAttachment; | |
/** | |
* Creates a new instance of ProcessStreamThread. | |
* | |
* @param in | |
* contains the standard/error output of the process | |
* @param outputAttachment | |
* the name of the attachment on the blackboard that receives the bytes at the end | |
*/ | |
private ProcessStreamThread(InputStream in, String outputAttachment) { | |
this._in = in; | |
this._outputAttachment = outputAttachment; | |
if (outputAttachment != null) { | |
this._out = new ByteArrayOutputStream(); | |
} else { | |
this._out = null; | |
} | |
} | |
/** | |
* Creates a new instance of ProcessStreamThread. | |
* | |
* @param in | |
* the stream that contains bytes | |
* @param out | |
* the stream that receives the bytes | |
*/ | |
private ProcessStreamThread(InputStream in, OutputStream out) { | |
this._in = in; | |
this._out = out; | |
this._outputAttachment = null; | |
} | |
@Override | |
public void run() { | |
try { | |
for (int i = _in.read(); i >= 0; i = _in.read()) { | |
if (_out != null) { | |
_out.write(i); | |
} | |
} | |
} catch (IOException e) { | |
// Ignore and stop | |
_log.warn("Error during evaluation of the output of an executed proces.", e); | |
} | |
} | |
/** | |
* Save the recorded bytes to the configured attachment (if any). | |
* | |
* @param blackboard | |
* the current blackboard | |
* @param recordId | |
* the current record id | |
* @throws BlackboardAccessException | |
* if the blackboard throws one during access | |
*/ | |
public void save(Blackboard blackboard, String recordId) throws BlackboardAccessException { | |
if (_outputAttachment != null) { | |
waitFor(); | |
blackboard.setAttachment(recordId, _outputAttachment, ((ByteArrayOutputStream) _out).toByteArray()); | |
} | |
} | |
/** | |
* Wait until this thread is finished. | |
*/ | |
private void waitFor() { | |
try { | |
this.join(); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
} |