blob: 4e8066a4e699cfe015faf3f8a66be55de63a3425 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2012 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.processing.util;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.parameters.ParameterAccessor;
/**
* Helper class that can/should be used by pipelets for collecting their processed record results.
*
* Takes into account that error results may cause an exception ("_failOnError" parameter) or may be dropped from
* pipelet result (see dropRecordOnError).
*
* @author aweber
*/
public class ResultCollector {
/** needed to access failOnError parameter. */
private final ParameterAccessor _paramAccessor;
/** Logger to use. */
private Log _log;
/** whether record(id)s processed with error should appear in result ids or not. */
private final boolean _dropRecordOnError;
/** record ids that should be used as pipelet result. */
private final Collection<String> _results = new ArrayList<>();
/**
* @param paramAccessor
* of the hosting config, needed to access failOnError parameter
* @param log
* the log for which to log the error results
* @param dropRecordOnError
*/
public ResultCollector(final ParameterAccessor paramAccessor, final Log log, final boolean dropRecordOnError) {
if (paramAccessor == null) {
throw new IllegalArgumentException("Parameter 'paramAccessor' is null");
}
_paramAccessor = paramAccessor;
if (log != null) {
_log = log;
}
_dropRecordOnError = dropRecordOnError;
}
/**
* @param recordId
* record id of successful result
*/
public void addResult(final String recordId) {
_results.add(recordId);
}
/**
* @param recordId
* record id of failed result
* @param ex
* exception that will be logged resp. thrown if failOnError=true
* @throws ProcessingException
* will be thrown if failOnError=true
*/
public void addFailedResult(final String recordId, final Exception ex) throws ProcessingException {
_paramAccessor.setCurrentRecord(recordId);
final boolean failOnError = _paramAccessor.getBooleanParameter(ProcessingConstants.KEY_FAIL_ON_ERROR,
ProcessingConstants.FAIL_ON_ERROR_DEFAULT);
if (failOnError) {
if (ex instanceof ProcessingException) {
throw (ProcessingException) ex;
} else {
throw new ProcessingException(ex);
}
}
String errorMessage = "Error while processing record with id '" + recordId + "'.";
if (_dropRecordOnError) {
errorMessage += errorMessage + " Record will be removed from pipelet result.";
} else {
_results.add(recordId);
}
if (_log != null) {
final String message =
String.format("%s cause: %s; messsage: %s", errorMessage, ex.getCause(), ex.getMessage());
_log.warn(message);
}
}
/**
* @return record ids that should be used as pipelet result
*/
public String[] getResultIds() {
return _results.toArray(new String[_results.size()]);
}
}