blob: e1d1dff76cb7125385d07db8386ced52b17d7d59 [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation Andreas Weber (Attensity Europe GmbH) -
* data model simplification
**********************************************************************************************************************/
package org.eclipse.smila.processing.pipelets.xmlprocessing;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.bulkbuilder.BulkbuilderException;
import org.eclipse.smila.bulkbuilder.BulkbuilderService;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.validation.InvalidRecordException;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.MissingParameterException;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.processing.pipelets.ATransformationPipelet;
import org.eclipse.smila.processing.util.ProcessingConstants;
import org.eclipse.smila.processing.util.ResultCollector;
import org.eclipse.smila.utils.service.ServiceUtils;
import org.eclipse.smila.utils.xml.stax.SimpleTagExtractor;
import org.eclipse.smila.utils.xml.stax.XmlSnippetHandler;
import org.eclipse.smila.utils.xml.stax.XmlSnippetSplitter;
/**
* Pipelet that splits a XML stream into multiple xml snippets. For each snippet a new Record is created where the XML
* snippet is stored in either an attribute or attachment. The created records are not returned as a PipeletResult (this
* is just the same as the incoming RecordIds) but are directly send to the ConnectivityManager and are routed once more
* to the Queue.
*
* On each created record the Annotation <tt>MessageProperties</tt> is set with the key value pair
* <tt>isXmlSnippet</tt>=<tt>true</tt>. This can be used in Listener rules to select for XML snippets to process.
*
* The possible properties are:
* <ul>
* <li>beginTagName: the name of the tag to start the xml snippet</li>
* <li>beginTagNamespace: the (optional) namespace of the tag to start the xml snippet</li>
* <li>endTagName: the name of the tag to end the xml snippet</li>
* <li>endTagNamespace: the (optional) namespace of the tag to end the xml snippet</li>
* <li>isEndClosingTag: boolean flag if the endTagName is a closing tag (true) or not (false)</li>
* <li>keyTagName: the name of the tag used to create a record id</li>
* <li>maxBufferSize: the maximum size of the internal record buffer (optional, default is 20)</li>
* <li>inputName: name of the Attribute/Attachment to read the XML Document from.</li>
* <li>outputName: name of the Attribute/Attachment to store the extracted value in</li>
* <li>inputType: the type (Attribute or Attachment of the inputName). An input Attribute is not interpreted as content
* but as a file path or an URL to the XML document</li>
* <li>outputType: the type (Attribute or Attachment of the outputName)</li>
* <li>xmlSnippetJobName: destination job for snippets.</li>
* </ul>
*/
public class XmlSplitterPipelet extends ATransformationPipelet {
/** Constant for the property beginTagName. */
public static final String PROP_BEGIN_TAG_NAME = "beginTagName";
/** Constant for the property beginTagNamespace. */
public static final String PROP_BEGIN_TAG_NAMESPACE = "beginTagNamespace";
/** Constant for the property endTagName. */
public static final String PROP_END_TAG_NAME = "endTagName";
/** Constant for the property endTagNamespace. */
public static final String PROP_END_TAG_NAMESPACE = "endTagNamespace";
/** Constant for the property keyTagName. */
public static final String PROP_KEY_TAG_NAME = "keyTagName";
/** Constant for the property maxBufferSize. */
public static final String PROP_MAX_BUFFER_SIZE = "maxBufferSize";
/** Constant for the property idSeparator. */
public static final String PROP_ID_SEPARATOR = "idSeparator";
/** Constant for the configuration property for the jobName to submit split records to. */
public static final String PROP_JOB_NAME = "xmlSnippetJobName";
/** Name of 'isXmlSnippet' internal attribute. */
public static final String ATTRIBUTE_IS_XML_SNIPPET = "__isXmlSnippet";
/** default id separator. */
public static final String DEFAULT_ID_SEPARATOR = "#";
/** Constant for the default max buffer size (20). */
public static final int DEFAULT_MAX_BUFFER_SIZE = 20;
/** SimpleTagExtractor instance to extract key values. */
private final SimpleTagExtractor _extractor = new SimpleTagExtractor(true);
/** Reference to the BulkbuilderService. */
private BulkbuilderService _bulkBuilder;
/**
* {@inheritDoc}
*/
@Override
public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException {
if (recordIds == null) {
return recordIds;
}
// IDs of records that have already been splitted -> not processed again, but returned as output IDs.
final List<String> snippetRecordIds = new ArrayList<String>(recordIds.length);
final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _config);
final ResultCollector resultCollector =
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
for (final String id : recordIds) {
try {
final Any isSnippet = blackboard.getMetadata(id).get(ATTRIBUTE_IS_XML_SNIPPET);
if (isSnippet != null && isSnippet.isBoolean() && isSnippet.asValue().asBoolean()) {
snippetRecordIds.add(id);
} else {
paramAccessor.setCurrentRecord(id);
final String beginTagName = paramAccessor.getRequiredParameter(PROP_BEGIN_TAG_NAME);
final String beginTagNamespace = paramAccessor.getParameter(PROP_BEGIN_TAG_NAMESPACE, "");
final String endTagName = paramAccessor.getParameter(PROP_END_TAG_NAME, beginTagName);
final String endTagNamespace = paramAccessor.getParameter(PROP_END_TAG_NAMESPACE, beginTagNamespace);
final String keyTagName = paramAccessor.getRequiredParameter(PROP_KEY_TAG_NAME);
final String idSeparator = paramAccessor.getParameter(PROP_ID_SEPARATOR, DEFAULT_ID_SEPARATOR);
final int bufferSize = paramAccessor.getIntParameter(PROP_MAX_BUFFER_SIZE, DEFAULT_MAX_BUFFER_SIZE);
final String jobName = paramAccessor.getRequiredParameter(PROP_JOB_NAME);
final QName beginTag = new QName(beginTagNamespace, beginTagName);
final QName endTag = new QName(endTagNamespace, endTagName);
final InternalHandler snippetHandler =
new InternalHandler(keyTagName, idSeparator, jobName, bufferSize, paramAccessor);
final XmlSnippetSplitter splitter = new XmlSnippetSplitter(snippetHandler, beginTag, endTag);
final InputStream inputStream = getXmlInputStream(blackboard, id, paramAccessor);
snippetHandler.setCurrentRecord(id, blackboard.getRecord(id).getSource());
splitter.read(inputStream);
snippetHandler.flushRecordBuffer();
if (_log.isInfoEnabled()) {
_log.info("Created " + snippetHandler.getRecordCount() + " records from processing record " + id);
}
}
} catch (final Exception e) {
resultCollector.addFailedResult(id, e);
}
} // for
return snippetRecordIds.toArray(new String[snippetRecordIds.size()]);
}
/** get XML input Stream. */
private InputStream getXmlInputStream(final Blackboard blackboard, final String id,
final ParameterAccessor paramAccessor) throws IOException, BlackboardAccessException, MissingParameterException {
InputStream inputStream = null;
if (isReadFromAttribute(getInputType(paramAccessor))) {
inputStream = loadExternalInputStream(readStringInput(blackboard, id, paramAccessor));
} else {
inputStream = blackboard.getAttachmentAsStream(id, getInputName(paramAccessor));
}
return inputStream;
}
/** Resolve the BulkbuilderService lazily. */
private BulkbuilderService getBulkbuilderService() throws ProcessingException {
if (_bulkBuilder == null) {
try {
_bulkBuilder = ServiceUtils.getService(BulkbuilderService.class);
} catch (final InterruptedException e) {
throw new ProcessingException("Could not retrieve BulkbuilderService.", e);
}
}
return _bulkBuilder;
}
/**
* Sets the BulkbuilderService for testing purposes.
*
* @param bulkBuilder
* The new BulkbuilderService to set.
*/
public void setBulkbuilderService(final BulkbuilderService bulkBuilder) {
_bulkBuilder = bulkBuilder;
}
/**
* Get the external InputStream to the given url or file path.
*
* @param attrtibuteValue
* the attrtibuteValue denoting an URL or file path
* @return a InputStream or null
* @throws IOException
* if any error occurs
*/
private InputStream loadExternalInputStream(final String attrtibuteValue) throws IOException {
InputStream stream = null;
if (attrtibuteValue != null && attrtibuteValue.trim().length() > 0) {
if (attrtibuteValue.startsWith("file")) {
final URL url = new URL(attrtibuteValue);
stream = new FileInputStream(url.getAuthority() + url.getPath());
} else if (attrtibuteValue.startsWith("http")) {
final URL url = new URL(attrtibuteValue);
final HttpClient httpClient = new HttpClient();
final GetMethod getMethod = new GetMethod(url.toString());
httpClient.executeMethod(getMethod);
stream = getMethod.getResponseBodyAsStream();
} else {
stream = new FileInputStream(attrtibuteValue);
}
} // if
return stream;
}
/**
* Internal XmlSnippetHandler implementation to handle the snippets, create id and record objects and send them to
* Connectivity.
*/
class InternalHandler implements XmlSnippetHandler {
/** The currently processed Id. used to generate fragment id objects. */
private String _currentId;
/** The currently processed source. used to generate fragment record snippet. */
private String _source;
/** Counts the total number of created records. */
private int _recordCounter;
/** Counts the number of invokes of handleSnippet() for the _currentId. */
private int _countById;
/** the name of the tag used to create a record id. */
private final String _keyTagName;
/** string to use for attaching the fragment count to the base ID. */
private final String _idSeparator;
/** destination job. */
private final String _jobName;
/** number of records to collect before pushing them to ConnectivityManager. */
private final int _bufferSize;
/** buffer for records to push to ConnectivityManager. */
private final List<Record> _recordBuffer = new ArrayList<Record>();
/** parameter accessor needed to access the output parameters. */
private final ParameterAccessor _paramAccessor;
/**
* Create instance.
*
* @param keyTagName
* the name of the tag used to create a record id
* @param idSeparator
* string to use for attaching the fragment count to the base ID
* @param jobName
* destination job
* @param bufferSize
* number of records to collect before pushing them to ConnectivityManager
* @param paramAccessor
* parameter accessor needed to access the output parameters
*/
public InternalHandler(final String keyTagName, final String idSeparator, final String jobName,
final int bufferSize, final ParameterAccessor paramAccessor) {
super();
_keyTagName = keyTagName;
_idSeparator = idSeparator;
_jobName = jobName;
_bufferSize = bufferSize;
_paramAccessor = paramAccessor;
}
/**
* Set the current record id and source used for snippet record creation.
*
* @param id
* the current Id.
* @param source
* the current source.
*/
void setCurrentRecord(final String id, final String source) {
_currentId = id;
_source = source;
_countById = 0;
}
/**
* Returns the number of created records.
*
* @return the number of created records
*/
int getRecordCount() {
return _recordCounter;
}
/**
* Flushes the record buffer if it is not empty.
*
* @throws ProcessingException
* if any error occurs
* @throws BulkbuilderException
* if any error occurs
* @throws InvalidRecordException
* if any error occurs
* @throws InterruptedException
* if any error occurs
*/
public void flushRecordBuffer() throws ProcessingException, BulkbuilderException, InvalidRecordException,
InterruptedException {
if (!_recordBuffer.isEmpty()) {
try {
for (final Record record : _recordBuffer) {
getBulkbuilderService().addRecord(_jobName, record);
}
} finally {
_recordBuffer.clear();
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void handleSnippet(final byte[] snippet) {
_countById++;
String snippetId = null;
try {
final List<String> keys = _extractor.getTags(_keyTagName, new ByteArrayInputStream(snippet));
if (!keys.isEmpty()) {
snippetId = _currentId + _idSeparator + keys.get(0);
final Record record = DataFactory.DEFAULT.createRecord(snippetId, _source);
if (isStoreInAttribute(getOutputType(_paramAccessor))) {
record.getMetadata().put(getOutputName(_paramAccessor), new String(snippet, ENCODING_CHARSET));
} else {
record.setAttachment(getOutputName(_paramAccessor), snippet);
}
record.getMetadata().put(ATTRIBUTE_IS_XML_SNIPPET, true);
_recordCounter++;
_recordBuffer.add(record);
if (_recordBuffer.size() > _bufferSize) {
flushRecordBuffer();
}
} else {
if (_log.isWarnEnabled()) {
_log.warn("could not find tag " + _keyTagName + " in snippet number " + _countById + " of record "
+ _currentId);
}
if (_log.isTraceEnabled()) {
_log.trace("snippet content: " + new String(snippet, Charset.forName("UTF-8")));
}
}
} catch (final RuntimeException | ProcessingException | BulkbuilderException | InvalidRecordException
| InterruptedException | XMLStreamException e) {
if (_log.isErrorEnabled()) {
_log.error("error creating record for xml snippet number " + _countById + " with id " + snippetId
+ " of record " + _currentId, e);
}
if (_log.isTraceEnabled()) {
_log.trace("snippet content: " + new String(snippet, Charset.forName("UTF-8")));
}
}
}
} // InternalHandler
}