/********************************************************************************************************************* | |
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. | |
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 | |
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
*********************************************************************************************************************/ | |
package org.eclipse.smila.processing.pipelets; | |
import java.io.InputStream; | |
import java.util.Iterator; | |
import java.util.Map; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.blackboard.Blackboard; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.datamodel.ipc.IpcAnyReader; | |
import org.eclipse.smila.processing.Pipelet; | |
import org.eclipse.smila.processing.ProcessingException; | |
import org.eclipse.smila.processing.parameters.ParameterAccessor; | |
import org.eclipse.smila.processing.util.ProcessingConstants; | |
import org.eclipse.smila.processing.util.ResultCollector; | |
/** | |
* Reads an {@link Any} object from JSON. | |
* | |
* Copyright (c) 2012 Attensity Europe GmbH | |
* | |
* @author Tobias Liefke | |
*/ | |
public class JSONReaderPipelet implements Pipelet { | |
/** Name of the property that contains the name of the input attribute or attachment. */ | |
public static final String INPUT_NAME_PROPERTY = ATransformationPipelet.PROP_INPUT_NAME; | |
/** Name of the property that contains the type of the input (attribute/attachment). */ | |
public static final String INPUT_TYPE_PROPERTY = ATransformationPipelet.PROP_INPUT_TYPE; | |
/** Name of the property that contains the name of the output attribute. */ | |
public static final String OUTPUT_ATTRIBUTE_PROPERTY = "outputAttribute"; | |
/** The name of the input attribute / attachment. */ | |
private String _inputName; | |
/** The type of the input. */ | |
private SourceType _inputType; | |
/** The output attribute name. */ | |
private String _outputAttribute; | |
/** Our configuration. */ | |
private AnyMap _config; | |
/** Our logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** | |
* Initializes parameters. | |
* | |
* @see Pipelet#configure(AnyMap) | |
*/ | |
@Override | |
public void configure(AnyMap config) throws ProcessingException { | |
this._config = config; | |
final ParameterAccessor paramAccessor = new ParameterAccessor(config); | |
_inputName = paramAccessor.getRequiredParameter(INPUT_NAME_PROPERTY); | |
final String inputType = paramAccessor.getRequiredParameter(INPUT_TYPE_PROPERTY); | |
try { | |
this._inputType = SourceType.valueOf(inputType); | |
} catch (IllegalArgumentException e) { | |
throw new ProcessingException("Illegal value for input type: " + inputType); | |
} | |
_outputAttribute = paramAccessor.getParameter(OUTPUT_ATTRIBUTE_PROPERTY, null); | |
} | |
/** | |
* @see Pipelet#process(Blackboard, String[]) | |
*/ | |
@Override | |
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException { | |
final ParameterAccessor paramAccessor = new ParameterAccessor(_config); | |
final ResultCollector results = | |
new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT); | |
final IpcAnyReader reader = new IpcAnyReader(); | |
for (final String id : recordIds) { | |
paramAccessor.setCurrentRecord(id); | |
try { | |
// Determine input | |
Any object = null; | |
final AnyMap metadata = blackboard.getMetadata(id); | |
if (_inputType == SourceType.ATTACHMENT) { | |
final InputStream stream = blackboard.getAttachmentAsStream(id, _inputName); | |
if (stream != null) { | |
object = reader.readJsonStream(stream); | |
} | |
} else { | |
final Any input = metadata.get(_inputName); | |
if (input != null) { | |
final Iterator<Any> it = input.iterator(); | |
if (it.hasNext()) { | |
object = reader.readJsonObject(it.next().toString()); | |
} | |
} | |
} | |
// Write created object | |
if (_outputAttribute == null) { | |
if (object instanceof AnyMap) { | |
for (Map.Entry<String, Any> entry : object.asMap().entrySet()) { | |
if (!Record.RECORD_ID.equals(entry.getKey())) { | |
metadata.put(entry.getKey(), entry.getValue()); | |
} | |
} | |
} | |
} else if (object != null) { | |
metadata.put(_outputAttribute, object); | |
} | |
results.addResult(id); | |
} catch (Exception e) { | |
results.addFailedResult(id, e); | |
} | |
} | |
return results.getResultIds(); | |
} | |
} |