/********************************************************************************************************************* | |
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. | |
* This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 | |
* which accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
*********************************************************************************************************************/ | |
package org.eclipse.smila.importing.crawler.jdbc; | |
import java.io.IOException; | |
import java.sql.SQLException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.datamodel.Value; | |
import org.eclipse.smila.datamodel.util.RecordMerger; | |
import org.eclipse.smila.jdbc.JdbcAccessService; | |
import org.eclipse.smila.jdbc.SqlExecutor; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.taskworker.TaskContext; | |
import org.eclipse.smila.taskworker.Worker; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
import org.eclipse.smila.taskworker.output.RecordOutput; | |
public class JdbcFetcherWorker implements Worker { | |
/** Name of the worker, used in worker description and workflows. */ | |
public static final String NAME = "jdbcFetcher"; | |
/** sql statement used for fetching. */ | |
public static final String TASK_PARAM_FETCH_SQL = "fetchSql"; | |
/** attribute containing the values to use as parameters in the fetch sql (Prepared)statement. */ | |
public static final String TASK_PARAM_FETCH_PARAM_ATTRIBUTES = "fetchParameterAttributes"; | |
/** input slot name. */ | |
private static final String INPUT_SLOT = "recordsToFetch"; | |
/** output slot name. */ | |
private static final String OUTPUT_SLOT = "fetchedRecords"; | |
private final Log _log = LogFactory.getLog(getClass()); | |
private JdbcAccessService _jdbcAccess; | |
@Override | |
public String getName() { | |
return NAME; | |
} | |
@Override | |
public void perform(final TaskContext taskContext) throws Exception { | |
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT); | |
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT); | |
final String fetchSql = taskContext.getTaskParameters().getStringValue(TASK_PARAM_FETCH_SQL); | |
Record record = null; | |
if (fetchSql != null && fetchSql.trim().length() != 0) { | |
final JdbcCrawlingContext crawlContext = new JdbcCrawlingContext(taskContext); | |
final List<String> fetchParams = getFetchParameters(crawlContext); | |
try (SqlExecutor sqlExecutor = | |
_jdbcAccess.executePrepared(crawlContext.getDbUrl(), crawlContext.getDbProperties(), fetchSql, | |
crawlContext.getMaxAttachmentSize(), crawlContext.getMessages())) { | |
do { | |
record = recordInput.getRecord(); | |
if (record != null) { | |
augmentRecord(crawlContext, fetchParams, sqlExecutor, record); | |
recordOutput.writeRecord(record); | |
} | |
} while (record != null && !taskContext.isCanceled()); | |
} | |
} else { // no fetch sql -> just copy records to output. | |
copyRecords(taskContext, recordInput, recordOutput); | |
} | |
} | |
private void augmentRecord(final JdbcCrawlingContext crawlContext, final List<String> fetchParams, | |
final SqlExecutor sqlExecutor, final Record record) throws SQLException, IOException { | |
final List<Value> preparedStmtParams = new ArrayList<>(); | |
for (final String attribute : fetchParams) { | |
final Any paramValue = record.getMetadata().get(attribute); | |
if (paramValue == null) { | |
_log.info("Record contains no value for attribute '" + attribute + "'. Skip fetching additional data."); | |
return; | |
} | |
if (!paramValue.isValue()) { | |
_log.warn("Value '" + paramValue + "' of attribute '" + attribute | |
+ "' cannot be used for fetching. Skip fetching additional data."); | |
return; | |
} | |
preparedStmtParams.add((Value) paramValue); | |
} | |
final Record rec = sqlExecutor.executeAndMerge(preparedStmtParams); | |
if (rec != null) { | |
// map selected column names to attributes and add them to result record | |
crawlContext.getMapper().mapNames(rec, null); | |
RecordMerger.mergeRecords(record, rec, false, false); | |
} else { | |
_log.info("Fetch SQL statement '" + sqlExecutor.getSql() + "' delivered no result for parameters: " | |
+ preparedStmtParams + ". Originally crawled record will be used in output."); | |
} | |
} | |
private List<String> getFetchParameters(final JdbcCrawlingContext crawlContext) { | |
final List<String> fetchParams = new ArrayList<String>(); // params used in 'fetchSql' PreparedStatement | |
final Any fetchParameterAttributes = | |
crawlContext.getTaskContext().getTaskParameters().get(TASK_PARAM_FETCH_PARAM_ATTRIBUTES); | |
if (fetchParameterAttributes == null) { | |
// mapped attributes for id columns are used as fetch parameters if 'fetchParameterAttributes' is not set. | |
final List<String> mappedIdAtts = getMappedIdAttributes(crawlContext); | |
fetchParams.addAll(mappedIdAtts); | |
} else { | |
for (final Any p : fetchParameterAttributes) { | |
fetchParams.add(p.asValue().asString()); | |
} | |
} | |
return fetchParams; | |
} | |
/** @return list with attribute names mapped for id columns. */ | |
private List<String> getMappedIdAttributes(final JdbcCrawlingContext context) { | |
final List<String> mappedAtts = new ArrayList<String>(); | |
final Any idColumns = context.getIdColumns(); | |
for (final Any columnAny : idColumns) { | |
final String idCol = columnAny.asValue().asString(); | |
// if idCol is not mapped this throws an exception: | |
final List<String> mappedAtt = context.getMapper().get(idCol); | |
mappedAtts.add(mappedAtt.get(0)); // we want only one of the mapped attributes | |
} | |
return mappedAtts; | |
} | |
private void copyRecords(final TaskContext taskContext, final RecordInput recordInput, | |
final RecordOutput recordOutput) throws ObjectStoreException, IOException { | |
Record record; | |
do { | |
record = recordInput.getRecord(); | |
if (record != null) { | |
recordOutput.writeRecord(record); | |
} | |
} while (record != null && !taskContext.isCanceled()); | |
} | |
/** DS service reference bind method. */ | |
public void setJdbcAccessService(final JdbcAccessService jdbcAccess) { | |
_jdbcAccess = jdbcAccess; | |
} | |
/** DS service reference unbind method. */ | |
public void unsetJdbcAccessService(final JdbcAccessService jdbcAccess) { | |
if (_jdbcAccess == jdbcAccess) { | |
_jdbcAccess = null; | |
} | |
} | |
} |