| /******************************************************************************* |
| * Copyright (c) 2013, Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This program |
| * and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which |
| * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: Andreas Weber (Empolis Information Management GmbH) - initial implementation |
| *******************************************************************************/ |
| package org.eclipse.smila.processing.pipelets; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.blackboard.Blackboard; |
| import org.eclipse.smila.blackboard.BlackboardAccessException; |
| import org.eclipse.smila.datamodel.Any; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.processing.Pipelet; |
| import org.eclipse.smila.processing.ProcessingException; |
| import org.eclipse.smila.processing.parameters.ParameterAccessor; |
| import org.eclipse.smila.processing.util.ProcessingConstants; |
| import org.eclipse.smila.processing.util.ResultCollector; |
| |
| /** |
| * Pipelet to split input record(s) with (multiple) parts in separate records. The parts are stored in a record |
| * attribute whose name is given as parameter for the pipelet. This record attribute's value must be a Sequence of Maps. |
| * |
| * If the partsAttribute parameter is not set or the input records don't have parts, they are returned unchanged. |
| * |
| * @author Andreas Weber |
| */ |
| public class DocumentSplitterPipelet implements Pipelet { |
| |
| /** Configuration property containing the record attribute name where the parts to split are stored. */ |
| public static final String PROP_PARTS_ATTRIBUTE = "partsAttribute"; |
| |
| /** id where the link to the original record is stored in the spilt record. */ |
| public static final String DOCUMENT_ID = "_documentId"; |
| |
| /** separator used to create the record id of the split records. */ |
| public static final String SPLIT_ID_SEPARATOR = "###"; |
| |
| /** local logger. */ |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| /** The configuration. */ |
| private AnyMap _configuration; |
| |
| @Override |
| public void configure(final AnyMap configuration) throws ProcessingException { |
| _configuration = configuration; |
| } |
| |
| @Override |
| public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException { |
| final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _configuration); |
| final ResultCollector resultCollector = |
| new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT); |
| for (final String id : recordIds) { |
| try { |
| paramAccessor.setCurrentRecord(id); |
| final String partsAttribute = paramAccessor.getParameter(PROP_PARTS_ATTRIBUTE, null); |
| final Record record = blackboard.getRecord(id); |
| if (partsAttribute != null && record.getMetadata().containsKey(partsAttribute)) { |
| splitRecord(record, blackboard, partsAttribute, resultCollector); |
| } else { |
| resultCollector.addResult(id); // that'ok, in this case we just return the input record |
| } |
| } catch (final Exception ex) { |
| resultCollector.addFailedResult(id, ex); |
| } |
| } // for |
| return resultCollector.getResultIds(); |
| } |
| |
| /** create split records from original record. */ |
| private void splitRecord(final Record record, final Blackboard blackboard, final String partsAttribute, |
| final ResultCollector resultCollector) throws BlackboardAccessException, ProcessingException { |
| final Any parts = record.getMetadata().get(partsAttribute); |
| if (!parts.isSeq()) { |
| throw new ProcessingException("Parts attribute '" + partsAttribute |
| + "' has invalid value, must be a Sequence, but was: " + parts); |
| } |
| if (parts.asSeq().isEmpty()) { |
| // empty parts attribute: no split, but we remove the parts attribute |
| record.getMetadata().remove(partsAttribute); |
| resultCollector.addResult(record.getId()); |
| } else { |
| for (int i = 0; i < parts.asSeq().size(); i++) { |
| final Any part = parts.asSeq().get(i); |
| if (!part.isMap()) { |
| throw new ProcessingException("Part " + (i + 1) + " + in parts attribute '" + partsAttribute |
| + "' has invalid value, must be a Map, but was: " + part); |
| } |
| final String splitId = record.getId() + SPLIT_ID_SEPARATOR + i; |
| final Record splitRecord = blackboard.getDataFactory().createRecord(splitId); |
| final AnyMap splitMetadata = DataFactory.DEFAULT.cloneAnyMap(record.getMetadata()); |
| splitMetadata.remove(Record.RECORD_ID); |
| splitRecord.getMetadata().putAll(splitMetadata); |
| splitRecord.getMetadata().put(DOCUMENT_ID, record.getId()); |
| splitRecord.getMetadata().remove(partsAttribute); // split record must not contain parts attribute |
| splitRecord.getMetadata().putAll(part.asMap()); // overwrite all attributes from original record |
| blackboard.setRecord(splitRecord); |
| resultCollector.addResult(splitId); |
| } |
| } |
| } |
| } |