blob: ed792ef2d810a9b6e2d1a43a0e2097087eb0a941 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation Juergen Schumacher (empolis GmbH) -
* enhancements Andreas Weber (Attensity Europe GmbH) - data model simplification
**********************************************************************************************************************/
package org.eclipse.smila.processing.pipelets;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Collection;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.Any.ValueType;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.AnySeq;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.MissingParameterException;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.utils.file.EncodingHelper;
/**
* Abstract base class for transformation pipelets. The general properties are:
* <ul>
* <li>inputName: name of the Attribute/Attachment to apply the transformation to</li>
* <li>outputName: name of the Attribute/Attachment to store the transformation in</li>
* <li>inputType: the type (Attribute or Attachment of the inputName</li>
* <li>outputType: the type (Attribute or Attachment of the outputtName</li>
* </ul>
*/
public abstract class ATransformationPipelet implements Pipelet {
/**
* The type of the inputName: Attribute/Attachment.
*/
public static final String PROP_INPUT_TYPE = "inputType";
/**
* The type of the outputName: Attribute/Attachment.
*/
public static final String PROP_OUTPUT_TYPE = "outputType";
/**
*
*/
public static final String PROP_OUTPUT_VALUE_TYPE = "outputValueType";
/**
* Name of the input Attribute/Attachment.
*/
public static final String PROP_INPUT_NAME = "inputName";
/**
* Name of the output Attribute/Attachment.
*/
public static final String PROP_OUTPUT_NAME = "outputName";
/**
* encoding to use for storing results as attachments.
*/
public static final String ENCODING_ATTACHMENT = "utf-8";
/**
* charset to use for storing results as attachments.
*/
public static final Charset ENCODING_CHARSET = Charset.forName(ENCODING_ATTACHMENT);
/**
* local logger.
*/
protected final Log _log = LogFactory.getLog(ATransformationPipelet.class);
/** the configuration. */
protected AnyMap _config;
/**
* {@inheritDoc}
*/
@Override
public void configure(final AnyMap configuration) throws ProcessingException {
_config = configuration;
}
/**
* @return input type
*/
public SourceType getInputType(final ParameterAccessor paramAccessor) throws MissingParameterException {
return SourceType.valueOf(paramAccessor.getRequiredParameter(PROP_INPUT_TYPE));
}
/**
* @return input name
*/
public String getInputName(final ParameterAccessor paramAccessor) throws MissingParameterException {
return paramAccessor.getRequiredParameter(PROP_INPUT_NAME);
}
/**
* @return output type
*/
public SourceType getOutputType(final ParameterAccessor paramAccessor) throws MissingParameterException {
return SourceType.valueOf(paramAccessor.getRequiredParameter(PROP_OUTPUT_TYPE));
}
/**
* @return output type
*/
public ValueType getOutputValueType(final ParameterAccessor paramAccessor) throws MissingParameterException {
final String valTypeStr = paramAccessor.getParameter(PROP_OUTPUT_VALUE_TYPE, null);
if (valTypeStr != null) {
return ValueType.valueOf(valTypeStr);
}
return null;
}
/**
* @return output name
*/
public String getOutputName(final ParameterAccessor paramAccessor) throws MissingParameterException {
return paramAccessor.getRequiredParameter(PROP_OUTPUT_NAME);
}
/**
* Checks if to read the input from an Attribute.
*
* @return true if to read the input from an Attribute, false otherwise
*/
public boolean isReadFromAttribute(final SourceType inputType) {
return SourceType.ATTRIBUTE.equals(inputType);
}
/**
* Checks if to store the output in an Attribute.
*
* @return true if to store the output in an Attribute, false otherwise
*/
public boolean isStoreInAttribute(final SourceType outputType) {
return SourceType.ATTRIBUTE.equals(outputType);
}
/**
* store result strings on the blackboard.
*
* @param blackboard
* blackboard
* @param id
* record id
* @param result
* result string
* @throws ProcessingException
* error.
*/
protected void storeResult(final Blackboard blackboard, final String id, final String result,
final ParameterAccessor paramAccessor) throws ProcessingException {
if (isStoreInAttribute(getOutputType(paramAccessor))) {
try {
final ValueType outputValueType = getOutputValueType(paramAccessor);
final Value value =
blackboard.getDataFactory().parseFromString(result,
outputValueType == null ? ValueType.STRING : outputValueType);
blackboard.getMetadata(id).put(getOutputName(paramAccessor), value);
} catch (final Exception e) {
throw new ProcessingException("Could not set attribute " + getOutputName(paramAccessor) + " of record "
+ id, e);
}
} else {
try {
final InputStream stringStream = IOUtils.toInputStream(result, ENCODING_ATTACHMENT);
blackboard.setAttachmentFromStream(id, getOutputName(paramAccessor), stringStream);
} catch (final Exception e) {
throw new ProcessingException("Could not set attachment " + getOutputName(paramAccessor) + " of record "
+ id, e);
}
}
}
/**
* store result from a stream on the blackboard.
*
* @param blackboard
* blackboard
* @param id
* record id
* @param resultStream
* result stream
* @throws ProcessingException
* error.
*/
protected void storeResult(final Blackboard blackboard, final String id, final InputStream resultStream,
final ParameterAccessor paramAccessor) throws ProcessingException {
if (isStoreInAttribute(getOutputType(paramAccessor))) {
try {
final ValueType outputValueType = getOutputValueType(paramAccessor);
try (final Reader reader = new InputStreamReader(resultStream, ENCODING_CHARSET);) {
final StringBuilder builder = new StringBuilder();
try (Reader in = new BufferedReader(reader);) {
int ch;
while ((ch = in.read()) > -1) {
builder.append((char) ch);
}
}
final Value value =
blackboard.getDataFactory().parseFromString(builder.toString(),
outputValueType == null ? ValueType.STRING : outputValueType);
blackboard.getMetadata(id).put(getOutputName(paramAccessor), value);
}
} catch (final Exception e) {
throw new ProcessingException("Could not set attribute " + getOutputName(paramAccessor) + " of record "
+ id, e);
}
} else {
try {
final InputStream stringStream = resultStream;
blackboard.setAttachmentFromStream(id, getOutputName(paramAccessor), stringStream);
} catch (final Exception e) {
throw new ProcessingException("Could not set attachment " + getOutputName(paramAccessor) + " of record "
+ id, e);
}
}
}
/**
* store result strings on the blackboard.
*
* @param blackboard
* blackboard
* @param id
* record id
* @param results
* result strings
* @throws ProcessingException
* error.
* @throws BlackboardAccessException
* error.
*/
protected void storeResults(final Blackboard blackboard, final String id, final Collection<String> results,
final ParameterAccessor paramAccessor) throws ProcessingException, BlackboardAccessException {
if (!results.isEmpty()) {
if (isStoreInAttribute(getOutputType(paramAccessor))) {
if (results.size() == 1) {
blackboard.getMetadata(id).put(getOutputName(paramAccessor), results.iterator().next());
} else {
final AnySeq valueSeq = DataFactory.DEFAULT.createAnySeq();
for (final String result : results) {
valueSeq.add(result);
}
blackboard.getMetadata(id).put(getOutputName(paramAccessor), valueSeq);
}
} else {
storeResult(blackboard, id, results.iterator().next(), paramAccessor);
}
}
}
/**
* Stores result byte[] on the blackboard.
*
* @param blackboard
* the Blackboard
* @param id
* the Id of the record
* @param bytes
* the byte[] to save
* @throws BlackboardAccessException
* @throws MissingParameterException
* @throws Exception
* if any error occurs
*/
protected void storeResult(final Blackboard blackboard, final String id, final byte[] bytes,
final ParameterAccessor paramAccessor) throws MissingParameterException, BlackboardAccessException {
if (isStoreInAttribute(getOutputType(paramAccessor))) {
blackboard.getMetadata(id).put(getOutputName(paramAccessor), new String(bytes, ENCODING_CHARSET));
} else {
blackboard.setAttachment(id, getOutputName(paramAccessor), bytes);
}
}
/**
* Reads input data from the Blackboard as byte[].
*
* @param blackboard
* the Blackboard
* @param id
* the Id of the record
* @return a byte[]
* @throws BlackboardAccessException
* if any error occurs
* @throws UnsupportedEncodingException
* if converting string to bytes fails
* @throws MissingParameterException
*/
protected byte[] readInput(final Blackboard blackboard, final String id, final ParameterAccessor paramAccessor)
throws BlackboardAccessException, MissingParameterException {
byte[] bytes = null;
if (isReadFromAttribute(getInputType(paramAccessor))) {
final Any inputAny = blackboard.getMetadata(id).get(getInputName(paramAccessor));
if (inputAny != null && inputAny.isValue()) {
final String value = ((Value) inputAny).asString();
if (value != null) {
bytes = value.getBytes(ENCODING_CHARSET);
}
}
} else if (blackboard.hasAttachment(id, getInputName(paramAccessor))) {
bytes = blackboard.getAttachmentAsBytes(id, getInputName(paramAccessor));
}
return bytes;
}
/**
* Reads input data from the Blackboard as a String.
*
* @param blackboard
* the Blackboard
* @param id
* the Id of the record
* @return a String
* @throws BlackboardAccessException
* if any error occurs
* @throws UnsupportedEncodingException
* if converting bytes to string fails
* @throws MissingParameterException
*/
protected String readStringInput(final Blackboard blackboard, final String id,
final ParameterAccessor paramAccessor) throws BlackboardAccessException, MissingParameterException {
String string = null;
if (isReadFromAttribute(getInputType(paramAccessor))) {
final Any inputAny = blackboard.getMetadata(id).get(getInputName(paramAccessor));
if (inputAny != null && inputAny.isValue()) {
string = ((Value) inputAny).asString();
}
} else if (blackboard.hasAttachment(id, getInputName(paramAccessor))) {
final byte[] bytes = blackboard.getAttachmentAsBytes(id, getInputName(paramAccessor));
if (bytes != null && bytes.length > 0) {
final Charset charset = detectCharset(bytes);
string = new String(bytes, charset);
}
}
return string;
}
/**
* Detects charset information in passed data based on potential BOM marks or xml or html encoding information.
*
* @param bytes
* data for which charset is detected
* @return charset if detected or default charset if not detected or the detection failed
*/
private Charset detectCharset(final byte[] bytes) {
Charset charset = ENCODING_CHARSET;
try {
final String detectedEncoding = EncodingHelper.getEncoding(bytes);
if (detectedEncoding != null) {
charset = Charset.forName(detectedEncoding);
}
} catch (final IOException | UnsupportedCharsetException | IllegalCharsetNameException ex) {
; // do nothing
}
return charset;
}
/**
* Reads input data from the Blackboard as InputStream.
*
* @param blackboard
* the Blackboard
* @param id
* the Id of the record
* @return an InputStream
* @throws BlackboardAccessException
* if any error occurs
* @throws MissingParameterException
*/
protected InputStream getInputStream(final Blackboard blackboard, final String id,
final ParameterAccessor paramAccessor) throws BlackboardAccessException, MissingParameterException {
InputStream bytesStream = null;
if (isReadFromAttribute(getInputType(paramAccessor))) {
final Any inputAny = blackboard.getMetadata(id).get(getInputName(paramAccessor));
if (inputAny != null && inputAny.isValue()) {
final String value = ((Value) inputAny).asString();
if (value != null) {
bytesStream = new ByteArrayInputStream(value.getBytes(ENCODING_CHARSET));
}
}
} else if (blackboard.hasAttachment(id, getInputName(paramAccessor))) {
bytesStream = blackboard.getAttachmentAsStream(id, getInputName(paramAccessor));
}
return bytesStream;
}
}