blob: 48fafa72f826e705d73b1554d186aafe384acf15 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved.
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*********************************************************************************************************************/
package org.eclipse.smila.processing.pipelets;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.ipc.IpcAnyReader;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
import org.eclipse.smila.processing.util.ProcessingConstants;
import org.eclipse.smila.processing.util.ResultCollector;
/**
* Reads an {@link Any} object from JSON.
*
* Copyright (c) 2012 Attensity Europe GmbH
*
* @author Tobias Liefke
*/
public class JSONReaderPipelet implements Pipelet {
/** Name of the property that contains the name of the input attribute or attachment. */
public static final String INPUT_NAME_PROPERTY = ATransformationPipelet.PROP_INPUT_NAME;
/** Name of the property that contains the type of the input (attribute/attachment). */
public static final String INPUT_TYPE_PROPERTY = ATransformationPipelet.PROP_INPUT_TYPE;
/** Name of the property that contains the name of the output attribute. */
public static final String OUTPUT_ATTRIBUTE_PROPERTY = "outputAttribute";
/** The name of the input attribute / attachment. */
private String _inputName;
/** The type of the input. */
private SourceType _inputType;
/** The output attribute name. */
private String _outputAttribute;
/** Our configuration. */
private AnyMap _config;
/** Our logger. */
private final Log _log = LogFactory.getLog(getClass());
/**
* Initializes parameters.
*
* @see Pipelet#configure(AnyMap)
*/
@Override
public void configure(AnyMap config) throws ProcessingException {
this._config = config;
final ParameterAccessor paramAccessor = new ParameterAccessor(config);
_inputName = paramAccessor.getRequiredParameter(INPUT_NAME_PROPERTY);
final String inputType = paramAccessor.getRequiredParameter(INPUT_TYPE_PROPERTY);
try {
this._inputType = SourceType.valueOf(inputType);
} catch (IllegalArgumentException e) {
throw new ProcessingException("Illegal value for input type: " + inputType);
}
_outputAttribute = paramAccessor.getParameter(OUTPUT_ATTRIBUTE_PROPERTY, null);
}
/**
* @see Pipelet#process(Blackboard, String[])
*/
@Override
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
final ParameterAccessor paramAccessor = new ParameterAccessor(_config);
final ResultCollector results =
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
final IpcAnyReader reader = new IpcAnyReader();
for (final String id : recordIds) {
paramAccessor.setCurrentRecord(id);
try {
// Determine input
Any object = null;
final AnyMap metadata = blackboard.getMetadata(id);
if (_inputType == SourceType.ATTACHMENT) {
final InputStream stream = blackboard.getAttachmentAsStream(id, _inputName);
if (stream != null) {
object = reader.readJsonStream(stream);
}
} else {
final Any input = metadata.get(_inputName);
if (input != null) {
final Iterator<Any> it = input.iterator();
if (it.hasNext()) {
object = reader.readJsonObject(it.next().toString());
}
}
}
// Write created object
if (_outputAttribute == null) {
if (object instanceof AnyMap) {
for (Map.Entry<String, Any> entry : object.asMap().entrySet()) {
if (!Record.RECORD_ID.equals(entry.getKey())) {
metadata.put(entry.getKey(), entry.getValue());
}
}
}
} else if (object != null) {
metadata.put(_outputAttribute, object);
}
results.addResult(id);
} catch (Exception e) {
results.addFailedResult(id, e);
}
}
return results.getResultIds();
}
}