blob: 35802e56a95276b54f8c1ec974162f2654e8f547 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved.
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*********************************************************************************************************************/
package org.eclipse.smila.importing.crawler.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.datamodel.util.RecordMerger;
import org.eclipse.smila.jdbc.JdbcAccessService;
import org.eclipse.smila.jdbc.SqlExecutor;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.RecordOutput;
public class JdbcFetcherWorker implements Worker {
/** Name of the worker, used in worker description and workflows. */
public static final String NAME = "jdbcFetcher";
/** sql statement used for fetching. */
public static final String TASK_PARAM_FETCH_SQL = "fetchSql";
/** attribute containing the values to use as parameters in the fetch sql (Prepared)statement. */
public static final String TASK_PARAM_FETCH_PARAM_ATTRIBUTES = "fetchParameterAttributes";
/** input slot name. */
private static final String INPUT_SLOT = "recordsToFetch";
/** output slot name. */
private static final String OUTPUT_SLOT = "fetchedRecords";
private final Log _log = LogFactory.getLog(getClass());
private JdbcAccessService _jdbcAccess;
@Override
public String getName() {
return NAME;
}
@Override
public void perform(final TaskContext taskContext) throws Exception {
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT);
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT);
final String fetchSql = taskContext.getTaskParameters().getStringValue(TASK_PARAM_FETCH_SQL);
Record record = null;
if (fetchSql != null && fetchSql.trim().length() != 0) {
final JdbcCrawlingContext crawlContext = new JdbcCrawlingContext(taskContext);
final List<String> fetchParams = getFetchParameters(crawlContext);
try (SqlExecutor sqlExecutor =
_jdbcAccess.executePrepared(crawlContext.getDbUrl(), crawlContext.getDbProperties(), fetchSql,
crawlContext.getMaxAttachmentSize(), crawlContext.getMessages())) {
do {
record = recordInput.getRecord();
if (record != null) {
augmentRecord(crawlContext, fetchParams, sqlExecutor, record);
recordOutput.writeRecord(record);
}
} while (record != null && !taskContext.isCanceled());
}
} else { // no fetch sql -> just copy records to output.
copyRecords(taskContext, recordInput, recordOutput);
}
}
private void augmentRecord(final JdbcCrawlingContext crawlContext, final List<String> fetchParams,
final SqlExecutor sqlExecutor, final Record record) throws SQLException, IOException {
final List<Value> preparedStmtParams = new ArrayList<>();
for (final String attribute : fetchParams) {
final Any paramValue = record.getMetadata().get(attribute);
if (paramValue == null) {
_log.info("Record contains no value for attribute '" + attribute + "'. Skip fetching additional data.");
return;
}
if (!paramValue.isValue()) {
_log.warn("Value '" + paramValue + "' of attribute '" + attribute
+ "' cannot be used for fetching. Skip fetching additional data.");
return;
}
preparedStmtParams.add((Value) paramValue);
}
final Record rec = sqlExecutor.executeAndMerge(preparedStmtParams);
if (rec != null) {
// map selected column names to attributes and add them to result record
crawlContext.getMapper().mapNames(rec, null);
RecordMerger.mergeRecords(record, rec, false, false);
} else {
_log.info("Fetch SQL statement '" + sqlExecutor.getSql() + "' delivered no result for parameters: "
+ preparedStmtParams + ". Originally crawled record will be used in output.");
}
}
private List<String> getFetchParameters(final JdbcCrawlingContext crawlContext) {
final List<String> fetchParams = new ArrayList<String>(); // params used in 'fetchSql' PreparedStatement
final Any fetchParameterAttributes =
crawlContext.getTaskContext().getTaskParameters().get(TASK_PARAM_FETCH_PARAM_ATTRIBUTES);
if (fetchParameterAttributes == null) {
// mapped attributes for id columns are used as fetch parameters if 'fetchParameterAttributes' is not set.
final List<String> mappedIdAtts = getMappedIdAttributes(crawlContext);
fetchParams.addAll(mappedIdAtts);
} else {
for (final Any p : fetchParameterAttributes) {
fetchParams.add(p.asValue().asString());
}
}
return fetchParams;
}
/** @return list with attribute names mapped for id columns. */
private List<String> getMappedIdAttributes(final JdbcCrawlingContext context) {
final List<String> mappedAtts = new ArrayList<String>();
final Any idColumns = context.getIdColumns();
for (final Any columnAny : idColumns) {
final String idCol = columnAny.asValue().asString();
// if idCol is not mapped this throws an exception:
final List<String> mappedAtt = context.getMapper().get(idCol);
mappedAtts.add(mappedAtt.get(0)); // we want only one of the mapped attributes
}
return mappedAtts;
}
private void copyRecords(final TaskContext taskContext, final RecordInput recordInput,
final RecordOutput recordOutput) throws ObjectStoreException, IOException {
Record record;
do {
record = recordInput.getRecord();
if (record != null) {
recordOutput.writeRecord(record);
}
} while (record != null && !taskContext.isCanceled());
}
/** DS service reference bind method. */
public void setJdbcAccessService(final JdbcAccessService jdbcAccess) {
_jdbcAccess = jdbcAccess;
}
/** DS service reference unbind method. */
public void unsetJdbcAccessService(final JdbcAccessService jdbcAccess) {
if (_jdbcAccess == jdbcAccess) {
_jdbcAccess = null;
}
}
}