/*********************************************************************************************************************** | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Daniel Stucky (empolis GmbH) - initial API and implementation Andreas Weber (Attensity Europe GmbH) - | |
* data model simplification | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.processing.pipelets.xmlprocessing; | |
import java.io.ByteArrayInputStream; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.net.URL; | |
import java.nio.charset.Charset; | |
import java.util.ArrayList; | |
import java.util.List; | |
import javax.xml.namespace.QName; | |
import javax.xml.stream.XMLStreamException; | |
import org.apache.commons.httpclient.HttpClient; | |
import org.apache.commons.httpclient.methods.GetMethod; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.blackboard.BlackboardAccessException; | |
import org.eclipse.smila.bulkbuilder.BulkbuilderException; | |
import org.eclipse.smila.bulkbuilder.BulkbuilderService; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.datamodel.validation.InvalidRecordException; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.parameters.MissingParameterException; | |
import org.eclipse.smila.processing.parameters.ParameterAccessor; | |
import org.eclipse.smila.processing.pipelets.ATransformationPipelet; | |
import org.eclipse.smila.processing.util.ProcessingConstants; | |
import org.eclipse.smila.processing.util.ResultCollector; | |
import org.eclipse.smila.utils.service.ServiceUtils; | |
import org.eclipse.smila.utils.xml.stax.SimpleTagExtractor; | |
import org.eclipse.smila.utils.xml.stax.XmlSnippetHandler; | |
import org.eclipse.smila.utils.xml.stax.XmlSnippetSplitter; | |
/** | |
* Pipelet that splits a XML stream into multiple xml snippets. For each snippet a new Record is created where the XML | |
* snippet is stored in either an attribute or attachment. The created records are not returned as a PipeletResult (this | |
* is just the same as the incoming RecordIds) but are directly send to the ConnectivityManager and are routed once more | |
* to the Queue. | |
* | |
* On each created record the Annotation <tt>MessageProperties</tt> is set with the key value pair | |
* <tt>isXmlSnippet</tt>=<tt>true</tt>. This can be used in Listener rules to select for XML snippets to process. | |
* | |
* The possible properties are: | |
* <ul> | |
* <li>beginTagName: the name of the tag to start the xml snippet</li> | |
* <li>beginTagNamespace: the (optional) namespace of the tag to start the xml snippet</li> | |
* <li>endTagName: the name of the tag to end the xml snippet</li> | |
* <li>endTagNamespace: the (optional) namespace of the tag to end the xml snippet</li> | |
* <li>isEndClosingTag: boolean flag if the endTagName is a closing tag (true) or not (false)</li> | |
* <li>keyTagName: the name of the tag used to create a record id</li> | |
* <li>maxBufferSize: the maximum size of the internal record buffer (optional, default is 20)</li> | |
* <li>inputName: name of the Attribute/Attachment to read the XML Document from.</li> | |
* <li>outputName: name of the Attribute/Attachment to store the extracted value in</li> | |
* <li>inputType: the type (Attribute or Attachment of the inputName). An input Attribute is not interpreted as content | |
* but as a file path or an URL to the XML document</li> | |
* <li>outputType: the type (Attribute or Attachment of the outputName)</li> | |
* <li>xmlSnippetJobName: destination job for snippets.</li> | |
* </ul> | |
*/ | |
public class XmlSplitterPipelet extends ATransformationPipelet { | |
/** Constant for the property beginTagName. */ | |
public static final String PROP_BEGIN_TAG_NAME = "beginTagName"; | |
/** Constant for the property beginTagNamespace. */ | |
public static final String PROP_BEGIN_TAG_NAMESPACE = "beginTagNamespace"; | |
/** Constant for the property endTagName. */ | |
public static final String PROP_END_TAG_NAME = "endTagName"; | |
/** Constant for the property endTagNamespace. */ | |
public static final String PROP_END_TAG_NAMESPACE = "endTagNamespace"; | |
/** Constant for the property keyTagName. */ | |
public static final String PROP_KEY_TAG_NAME = "keyTagName"; | |
/** Constant for the property maxBufferSize. */ | |
public static final String PROP_MAX_BUFFER_SIZE = "maxBufferSize"; | |
/** Constant for the property idSeparator. */ | |
public static final String PROP_ID_SEPARATOR = "idSeparator"; | |
/** Constant for the configuration property for the jobName to submit split records to. */ | |
public static final String PROP_JOB_NAME = "xmlSnippetJobName"; | |
/** Name of 'isXmlSnippet' internal attribute. */ | |
public static final String ATTRIBUTE_IS_XML_SNIPPET = "__isXmlSnippet"; | |
/** default id separator. */ | |
public static final String DEFAULT_ID_SEPARATOR = "#"; | |
/** Constant for the default max buffer size (20). */ | |
public static final int DEFAULT_MAX_BUFFER_SIZE = 20; | |
/** SimpleTagExtractor instance to extract key values. */ | |
private final SimpleTagExtractor _extractor = new SimpleTagExtractor(true); | |
/** Reference to the BulkbuilderService. */ | |
private BulkbuilderService _bulkBuilder; | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException { | |
if (recordIds == null) { | |
return recordIds; | |
} | |
// IDs of records that have already been splitted -> not processed again, but returned as output IDs. | |
final List<String> snippetRecordIds = new ArrayList<String>(recordIds.length); | |
final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _config); | |
final ResultCollector resultCollector = | |
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT); | |
for (final String id : recordIds) { | |
try { | |
final Any isSnippet = blackboard.getMetadata(id).get(ATTRIBUTE_IS_XML_SNIPPET); | |
if (isSnippet != null && isSnippet.isBoolean() && isSnippet.asValue().asBoolean()) { | |
snippetRecordIds.add(id); | |
} else { | |
paramAccessor.setCurrentRecord(id); | |
final String beginTagName = paramAccessor.getRequiredParameter(PROP_BEGIN_TAG_NAME); | |
final String beginTagNamespace = paramAccessor.getParameter(PROP_BEGIN_TAG_NAMESPACE, ""); | |
final String endTagName = paramAccessor.getParameter(PROP_END_TAG_NAME, beginTagName); | |
final String endTagNamespace = paramAccessor.getParameter(PROP_END_TAG_NAMESPACE, beginTagNamespace); | |
final String keyTagName = paramAccessor.getRequiredParameter(PROP_KEY_TAG_NAME); | |
final String idSeparator = paramAccessor.getParameter(PROP_ID_SEPARATOR, DEFAULT_ID_SEPARATOR); | |
final int bufferSize = paramAccessor.getIntParameter(PROP_MAX_BUFFER_SIZE, DEFAULT_MAX_BUFFER_SIZE); | |
final String jobName = paramAccessor.getRequiredParameter(PROP_JOB_NAME); | |
final QName beginTag = new QName(beginTagNamespace, beginTagName); | |
final QName endTag = new QName(endTagNamespace, endTagName); | |
final InternalHandler snippetHandler = | |
new InternalHandler(keyTagName, idSeparator, jobName, bufferSize, paramAccessor); | |
final XmlSnippetSplitter splitter = new XmlSnippetSplitter(snippetHandler, beginTag, endTag); | |
final InputStream inputStream = getXmlInputStream(blackboard, id, paramAccessor); | |
snippetHandler.setCurrentRecord(id, blackboard.getRecord(id).getSource()); | |
splitter.read(inputStream); | |
snippetHandler.flushRecordBuffer(); | |
if (_log.isInfoEnabled()) { | |
_log.info("Created " + snippetHandler.getRecordCount() + " records from processing record " + id); | |
} | |
} | |
} catch (final Exception e) { | |
resultCollector.addFailedResult(id, e); | |
} | |
} // for | |
return snippetRecordIds.toArray(new String[snippetRecordIds.size()]); | |
} | |
/** get XML input Stream. */ | |
private InputStream getXmlInputStream(final Blackboard blackboard, final String id, | |
final ParameterAccessor paramAccessor) throws IOException, BlackboardAccessException, MissingParameterException { | |
InputStream inputStream = null; | |
if (isReadFromAttribute(getInputType(paramAccessor))) { | |
inputStream = loadExternalInputStream(readStringInput(blackboard, id, paramAccessor)); | |
} else { | |
inputStream = blackboard.getAttachmentAsStream(id, getInputName(paramAccessor)); | |
} | |
return inputStream; | |
} | |
/** Resolve the BulkbuilderService lazily. */ | |
private BulkbuilderService getBulkbuilderService() throws ProcessingException { | |
if (_bulkBuilder == null) { | |
try { | |
_bulkBuilder = ServiceUtils.getService(BulkbuilderService.class); | |
} catch (final InterruptedException e) { | |
throw new ProcessingException("Could not retrieve BulkbuilderService.", e); | |
} | |
} | |
return _bulkBuilder; | |
} | |
/** | |
* Sets the BulkbuilderService for testing purposes. | |
* | |
* @param bulkBuilder | |
* The new BulkbuilderService to set. | |
*/ | |
public void setBulkbuilderService(final BulkbuilderService bulkBuilder) { | |
_bulkBuilder = bulkBuilder; | |
} | |
/** | |
* Get the external InputStream to the given url or file path. | |
* | |
* @param attrtibuteValue | |
* the attrtibuteValue denoting an URL or file path | |
* @return a InputStream or null | |
* @throws IOException | |
* if any error occurs | |
*/ | |
private InputStream loadExternalInputStream(final String attrtibuteValue) throws IOException { | |
InputStream stream = null; | |
if (attrtibuteValue != null && attrtibuteValue.trim().length() > 0) { | |
if (attrtibuteValue.startsWith("file")) { | |
final URL url = new URL(attrtibuteValue); | |
stream = new FileInputStream(url.getAuthority() + url.getPath()); | |
} else if (attrtibuteValue.startsWith("http")) { | |
final URL url = new URL(attrtibuteValue); | |
final HttpClient httpClient = new HttpClient(); | |
final GetMethod getMethod = new GetMethod(url.toString()); | |
httpClient.executeMethod(getMethod); | |
stream = getMethod.getResponseBodyAsStream(); | |
} else { | |
stream = new FileInputStream(attrtibuteValue); | |
} | |
} // if | |
return stream; | |
} | |
/** | |
* Internal XmlSnippetHandler implementation to handle the snippets, create id and record objects and send them to | |
* Connectivity. | |
*/ | |
class InternalHandler implements XmlSnippetHandler { | |
/** The currently processed Id. used to generate fragment id objects. */ | |
private String _currentId; | |
/** The currently processed source. used to generate fragment record snippet. */ | |
private String _source; | |
/** Counts the total number of created records. */ | |
private int _recordCounter; | |
/** Counts the number of invokes of handleSnippet() for the _currentId. */ | |
private int _countById; | |
/** the name of the tag used to create a record id. */ | |
private final String _keyTagName; | |
/** string to use for attaching the fragment count to the base ID. */ | |
private final String _idSeparator; | |
/** destination job. */ | |
private final String _jobName; | |
/** number of records to collect before pushing them to ConnectivityManager. */ | |
private final int _bufferSize; | |
/** buffer for records to push to ConnectivityManager. */ | |
private final List<Record> _recordBuffer = new ArrayList<Record>(); | |
/** parameter accessor needed to access the output parameters. */ | |
private final ParameterAccessor _paramAccessor; | |
/** | |
* Create instance. | |
* | |
* @param keyTagName | |
* the name of the tag used to create a record id | |
* @param idSeparator | |
* string to use for attaching the fragment count to the base ID | |
* @param jobName | |
* destination job | |
* @param bufferSize | |
* number of records to collect before pushing them to ConnectivityManager | |
* @param paramAccessor | |
* parameter accessor needed to access the output parameters | |
*/ | |
public InternalHandler(final String keyTagName, final String idSeparator, final String jobName, | |
final int bufferSize, final ParameterAccessor paramAccessor) { | |
super(); | |
_keyTagName = keyTagName; | |
_idSeparator = idSeparator; | |
_jobName = jobName; | |
_bufferSize = bufferSize; | |
_paramAccessor = paramAccessor; | |
} | |
/** | |
* Set the current record id and source used for snippet record creation. | |
* | |
* @param id | |
* the current Id. | |
* @param source | |
* the current source. | |
*/ | |
void setCurrentRecord(final String id, final String source) { | |
_currentId = id; | |
_source = source; | |
_countById = 0; | |
} | |
/** | |
* Returns the number of created records. | |
* | |
* @return the number of created records | |
*/ | |
int getRecordCount() { | |
return _recordCounter; | |
} | |
/** | |
* Flushes the record buffer if it is not empty. | |
* | |
* @throws ProcessingException | |
* if any error occurs | |
* @throws BulkbuilderException | |
* if any error occurs | |
* @throws InvalidRecordException | |
* if any error occurs | |
* @throws InterruptedException | |
* if any error occurs | |
*/ | |
public void flushRecordBuffer() throws ProcessingException, BulkbuilderException, InvalidRecordException, | |
InterruptedException { | |
if (!_recordBuffer.isEmpty()) { | |
try { | |
for (final Record record : _recordBuffer) { | |
getBulkbuilderService().addRecord(_jobName, record); | |
} | |
} finally { | |
_recordBuffer.clear(); | |
} | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void handleSnippet(final byte[] snippet) { | |
_countById++; | |
String snippetId = null; | |
try { | |
final List<String> keys = _extractor.getTags(_keyTagName, new ByteArrayInputStream(snippet)); | |
if (!keys.isEmpty()) { | |
snippetId = _currentId + _idSeparator + keys.get(0); | |
final Record record = DataFactory.DEFAULT.createRecord(snippetId, _source); | |
if (isStoreInAttribute(getOutputType(_paramAccessor))) { | |
record.getMetadata().put(getOutputName(_paramAccessor), new String(snippet, ENCODING_CHARSET)); | |
} else { | |
record.setAttachment(getOutputName(_paramAccessor), snippet); | |
} | |
record.getMetadata().put(ATTRIBUTE_IS_XML_SNIPPET, true); | |
_recordCounter++; | |
_recordBuffer.add(record); | |
if (_recordBuffer.size() > _bufferSize) { | |
flushRecordBuffer(); | |
} | |
} else { | |
if (_log.isWarnEnabled()) { | |
_log.warn("could not find tag " + _keyTagName + " in snippet number " + _countById + " of record " | |
+ _currentId); | |
} | |
if (_log.isTraceEnabled()) { | |
_log.trace("snippet content: " + new String(snippet, Charset.forName("UTF-8"))); | |
} | |
} | |
} catch (final RuntimeException | ProcessingException | BulkbuilderException | InvalidRecordException | |
| InterruptedException | XMLStreamException e) { | |
if (_log.isErrorEnabled()) { | |
_log.error("error creating record for xml snippet number " + _countById + " with id " + snippetId | |
+ " of record " + _currentId, e); | |
} | |
if (_log.isTraceEnabled()) { | |
_log.trace("snippet content: " + new String(snippet, Charset.forName("UTF-8"))); | |
} | |
} | |
} | |
} // InternalHandler | |
} |