blob: 858d3b41f5bae0a9849e81b066dcb9d9572afbb6 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2013, Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This program
* and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Empolis Information Management GmbH) - initial implementation
*******************************************************************************/
package org.eclipse.smila.processing.pipelets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.blackboard.BlackboardAccessException;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.processing.util.ProcessingConstants;
import org.eclipse.smila.processing.util.ResultCollector;
/**
* Pipelet to split input record(s) with (multiple) parts in separate records. The parts are stored in a record
* attribute whose name is given as parameter for the pipelet. This record attribute's value must be a Sequence of Maps.
*
* If the partsAttribute parameter is not set or the input records don't have parts, they are returned unchanged.
*
* @author Andreas Weber
*/
public class DocumentSplitterPipelet implements Pipelet {
/** Configuration property containing the record attribute name where the parts to split are stored. */
public static final String PROP_PARTS_ATTRIBUTE = "partsAttribute";
/** id where the link to the original record is stored in the spilt record. */
public static final String DOCUMENT_ID = "_documentId";
/** separator used to create the record id of the split records. */
public static final String SPLIT_ID_SEPARATOR = "###";
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** The configuration. */
private AnyMap _configuration;
@Override
public void configure(final AnyMap configuration) throws ProcessingException {
_configuration = configuration;
}
@Override
public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException {
final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _configuration);
final ResultCollector resultCollector =
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
for (final String id : recordIds) {
try {
paramAccessor.setCurrentRecord(id);
final String partsAttribute = paramAccessor.getParameter(PROP_PARTS_ATTRIBUTE, null);
final Record record = blackboard.getRecord(id);
if (partsAttribute != null && record.getMetadata().containsKey(partsAttribute)) {
splitRecord(record, blackboard, partsAttribute, resultCollector);
} else {
resultCollector.addResult(id); // that'ok, in this case we just return the input record
}
} catch (final Exception ex) {
resultCollector.addFailedResult(id, ex);
}
} // for
return resultCollector.getResultIds();
}
/** create split records from original record. */
private void splitRecord(final Record record, final Blackboard blackboard, final String partsAttribute,
final ResultCollector resultCollector) throws BlackboardAccessException, ProcessingException {
final Any parts = record.getMetadata().get(partsAttribute);
if (!parts.isSeq()) {
throw new ProcessingException("Parts attribute '" + partsAttribute
+ "' has invalid value, must be a Sequence, but was: " + parts);
}
if (parts.asSeq().isEmpty()) {
// empty parts attribute: no split, but we remove the parts attribute
record.getMetadata().remove(partsAttribute);
resultCollector.addResult(record.getId());
} else {
for (int i = 0; i < parts.asSeq().size(); i++) {
final Any part = parts.asSeq().get(i);
if (!part.isMap()) {
throw new ProcessingException("Part " + (i + 1) + " + in parts attribute '" + partsAttribute
+ "' has invalid value, must be a Map, but was: " + part);
}
final String splitId = record.getId() + SPLIT_ID_SEPARATOR + i;
final Record splitRecord = blackboard.getDataFactory().createRecord(splitId);
final AnyMap splitMetadata = DataFactory.DEFAULT.cloneAnyMap(record.getMetadata());
splitMetadata.remove(Record.RECORD_ID);
splitRecord.getMetadata().putAll(splitMetadata);
splitRecord.getMetadata().put(DOCUMENT_ID, record.getId());
splitRecord.getMetadata().remove(partsAttribute); // split record must not contain parts attribute
splitRecord.getMetadata().putAll(part.asMap()); // overwrite all attributes from original record
blackboard.setRecord(splitRecord);
resultCollector.addResult(splitId);
}
}
}
}