blob: 40928ae6b8478a42a40e3e55a5f37b0e435ed614 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*********************************************************************************************************************/
package org.eclipse.smila.importing.crawler.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.Value;
import org.eclipse.smila.importing.DeltaImportStrategy;
import org.eclipse.smila.importing.ImportingConstants;
import org.eclipse.smila.importing.ImportingException;
import org.eclipse.smila.importing.util.RecordOutputHandler;
import org.eclipse.smila.jdbc.JdbcAccessService;
import org.eclipse.smila.jdbc.SqlExecutor;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.Inputs;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.utils.digest.DigestHelper;
public class JdbcCrawlerWorker implements Worker {
/** Name of the worker, used in worker description and workflows. */
public static final String NAME = "jdbcCrawler";
/** name of output slot taking the file records to process in ETL. */
public static final String OUTPUT_SLOT_CRAWLED_RECORDS = "crawledRecords";
/** name of input slot taking the splits to be crawled (optional). */
public static final String INPUT_SLOT_SPLITS_TO_CRAWL = "splitsToCrawl";
/** name of output slot taking the splits to be crawled (optional). */
public static final String OUTPUT_SLOT_SPLITS_TO_CRAWL = INPUT_SLOT_SPLITS_TO_CRAWL;
/** Maximum number of records in one bulk object. */
public static final String TASK_PARAM_MAX_RECORDS_PER_BULK = "maxRecordsPerBulk";
/** database url */
public static final String TASK_PARAM_DB_URL = "dbUrl";
/** database properties, e.g. user, password */
public static final String TASK_PARAM_DB_PROPS = "dbProps";
/** crawl sql statement */
public static final String TASK_PARAM_CRAWL_SQL = "crawlSql";
/** id columns */
public static final String TASK_PARAM_ID_COLUMNS = "idColumns";
/** delta columns */
public static final String TASK_PARAM_DELTA_COLUMNS = "deltaColumns";
/** max. size of binary content (BLOB etc.) in bytes. */
public static final String TASK_PARAM_MAX_ATTACHMENT_SIZE = "maxAttachmentSize";
/** increment for splitted database crawling. */
public static final String TASK_PARAM_SPLIT_INCREMENT = "splitIncrement";
/** SQL statement to determine split limits for splitted database crawling. */
public static final String TASK_PARAM_SPLIT_LIMITS_SQL = "splitLimitsSql";
/** default: write up to 1000 records to one file bulk. */
public static final Long MAX_RECORDS_PER_BULK_DEFAULT = 1000L;
/** default max size of binary content is 1 GB. */
public static final Long MAX_ATTACHMENT_SIZE_DEFAULT = 1000L * 1000L * 1000L;
/** separator used for id and delta generation */
private static final String COLUMN_SEPARATOR = "-";
/** name for MIN field in splitSql result. */
private static final String SPLIT_MIN = "MIN";
/** name for MAX field in splitSql result. */
private static final String SPLIT_MAX = "MAX";
private final Log _log = LogFactory.getLog(getClass());
private JdbcAccessService _jdbcAccess;
@Override
public String getName() {
return NAME;
}
@Override
public void perform(final TaskContext taskContext) throws Exception {
try {
final Inputs inputs = taskContext.getInputs();
final JdbcCrawlingContext crawlContext = new JdbcCrawlingContext(taskContext);
if (!inputs.getSlotNames().contains(INPUT_SLOT_SPLITS_TO_CRAWL)
|| inputs.getDataObjectCount(INPUT_SLOT_SPLITS_TO_CRAWL) == 0) {
initiateCrawling(crawlContext);
} else {
final RecordInput splitsToCrawl = inputs.getAsRecordInput(INPUT_SLOT_SPLITS_TO_CRAWL);
crawlSplits(splitsToCrawl, crawlContext);
}
} catch (final Exception e) {
_log.error(e);
throw e;
}
}
/** DS service reference bind method. */
public void setJdbcAccessService(final JdbcAccessService jdbcAccess) {
_jdbcAccess = jdbcAccess;
}
/** DS service reference unbind method. */
public void unsetJdbcAccessService(final JdbcAccessService jdbcAccess) {
if (_jdbcAccess == jdbcAccess) {
_jdbcAccess = null;
}
}
/**
* initiates crawling. if no splitting is configured, the db is crawled directly. if splitting is configured,
* splitsToCrawl records will be created and sent.
*/
private void initiateCrawling(final JdbcCrawlingContext crawlContext) throws Exception {
// is this a splitted database crawl?
final AnyMap splitIntervalMetaData = determineSplitInterval(crawlContext);
Long min = null;
Long max = null;
if (splitIntervalMetaData != null) {
min = splitIntervalMetaData.getLongValue(SPLIT_MIN);
max = splitIntervalMetaData.getLongValue(SPLIT_MAX);
}
if (min != null && max != null) {
// create and send split records.
createAndSendSplitRecords(crawlContext, min, max);
} else {
// start crawling directly with no split limits.
crawl(crawlContext, null);
}
}
/** creates the splitsToCrawl records with one record in each bulk for maximum parallelization. */
private void createAndSendSplitRecords(final JdbcCrawlingContext crawlContext, final Long min, final Long max)
throws IOException, ObjectStoreException {
final RecordOutputHandler splitsToCrawlBulks =
new RecordOutputHandler(crawlContext.getTaskContext().getOutputs(), 1, OUTPUT_SLOT_SPLITS_TO_CRAWL);
long splitStart = min;
do {
final long splitEnd = Math.min(max, splitStart + crawlContext.getSplitIncrement() - 1);
final Record splitsToCrawlRecord = DataFactory.DEFAULT.createRecord();
// send only one split per record.
splitsToCrawlRecord.getMetadata().put(SPLIT_MIN, splitStart);
splitsToCrawlRecord.getMetadata().put(SPLIT_MAX, splitEnd);
splitsToCrawlBulks.writeRecord(splitsToCrawlRecord);
splitStart = splitEnd + 1;
} while (splitStart <= max);
}
/** determines the split limits. */
private AnyMap determineSplitInterval(final JdbcCrawlingContext crawlContext) throws SQLException, IOException {
if (crawlContext.getSplitLimitsSql() != null) {
final Collection<Record> results =
_jdbcAccess.executeSql(crawlContext.getDbUrl(), crawlContext.getDbProperties(),
crawlContext.getSplitLimitsSql(), 0, crawlContext.getMessages());
// the first record must contain MIN and MAX values.
if (results != null && results.size() > 0) {
return results.iterator().next().getMetadata();
}
}
return null;
}
/** crawl a split. */
private void crawlSplits(final RecordInput splitsToCrawl, final JdbcCrawlingContext crawlContext)
throws Exception {
Record record = splitsToCrawl.getRecord();
while (record != null && !crawlContext.getTaskContext().isCanceled()) {
final List<Value> splitValues = new ArrayList<>();
splitValues.add(record.getMetadata().getValue(SPLIT_MIN));
splitValues.add(record.getMetadata().getValue(SPLIT_MAX));
crawl(crawlContext, splitValues);
record = splitsToCrawl.getRecord();
}
}
/**
* crawls the complete DB or only a split.
*
* @param splitValues
* the split values (MIN/MAX), if null or less than two values, the complete db will be crawled (i.e. no
* parameters will be applied to the crawl sql statement), if not null, these parameters will be applied to
* the crqwl db statement.
*/
private void crawl(final JdbcCrawlingContext crawlContext, final List<Value> splitValues) throws Exception {
final RecordOutputHandler recordBulks =
new RecordOutputHandler(crawlContext.getTaskContext().getOutputs(), crawlContext.getMaxRecordsPerBulk(),
OUTPUT_SLOT_CRAWLED_RECORDS);
final Collection<Record> results;
if (splitValues == null || splitValues.size() < 2) {
// no splitting needed...
results =
_jdbcAccess.executeSql(crawlContext.getDbUrl(), crawlContext.getDbProperties(), crawlContext.getCrawlSql(),
crawlContext.getMaxAttachmentSize(), crawlContext.getMessages());
} else {
// in that case, the crawlSql must be parameterizable with min/max values...
try (final SqlExecutor sqlExecutor =
_jdbcAccess.executePrepared(crawlContext.getDbUrl(), crawlContext.getDbProperties(),
crawlContext.getCrawlSql(), crawlContext.getMaxAttachmentSize(), crawlContext.getMessages())) {
results = sqlExecutor.execute(splitValues);
}
}
final Iterator<Record> it = results.iterator();
while (it.hasNext() && !crawlContext.getTaskContext().isCanceled()) {
final Record record = it.next();
augmentRecord(record, crawlContext);
crawlContext.getMapper().mapNames(record, getColumnNames(record));
recordBulks.writeRecord(record);
}
}
private Record augmentRecord(final Record record, final JdbcCrawlingContext crawlContext)
throws ImportingException {
record.setId(createId(crawlContext, record));
record.setSource(crawlContext.getDataSource());
addDeltaHashToRecord(record, crawlContext);
return record;
}
private String createId(final JdbcCrawlingContext crawlContext, final Record record) throws ImportingException {
return crawlContext.getDataSource() + ":" + concatColumnValues(crawlContext.getIdColumns(), record);
}
private void addDeltaHashToRecord(final Record record, final JdbcCrawlingContext crawlContext)
throws ImportingException {
final DeltaImportStrategy deltaStrategy =
getDeltaImportStrategy(crawlContext.getTaskContext().getTaskParameters());
if (deltaStrategy != DeltaImportStrategy.DISABLED) {
final Any deltaColumns = crawlContext.getDeltaColumns();
if (deltaColumns == null) {
final Any mappingColumns = crawlContext.getMapper().getMappingKeys();
if (!mappingColumns.isEmpty()) {
record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH,
concatColumnValues(mappingColumns, record));
}
} else if (!deltaColumns.isEmpty()) {
record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH, concatColumnValues(deltaColumns, record));
}
}
}
private String concatColumnValues(final Any columnNames, final Record record) throws ImportingException {
final StringBuffer idBuffer = new StringBuffer();
if (columnNames != null) {
final AnyMap metadata = record.getMetadata();
for (final Any column : columnNames) {
final String columnName = column.asValue().asString();
if (metadata.containsKey(columnName)) {
String value = metadata.getStringValue(columnName);
if (value.length() > 100) {
value = DigestHelper.calculateDigest(value);
}
idBuffer.append(value);
} else if (record.hasAttachment(columnName)) {
idBuffer.append(DigestHelper.calculateDigest(record.getAttachmentAsBytes(columnName)));
} else {
idBuffer.append("NULL");
}
idBuffer.append(COLUMN_SEPARATOR);
}
idBuffer.deleteCharAt(idBuffer.length() - 1); // remove last added column separator
}
return idBuffer.toString();
}
private Set<String> getColumnNames(final Record resultRecord) {
final HashSet<String> columnNames = new HashSet<String>();
columnNames.addAll(resultRecord.getMetadata().keySet());
// remove keys with special meanings
columnNames.remove(Record.RECORD_ID);
columnNames.remove(Record.SOURCE);
columnNames.remove(ImportingConstants.ATTRIBUTE_DELTA_HASH);
final Iterator<String> it = resultRecord.getAttachmentNames();
while (it.hasNext()) {
columnNames.add(it.next());
}
return columnNames;
}
/**
* get deltaImportStrategy parameter from task parameters. Fall back to {@link DeltaImportStrategy#FULL} if parameter
* is not set or value is invalid.
*/
private static DeltaImportStrategy getDeltaImportStrategy(final AnyMap taskParameters) {
DeltaImportStrategy usage = DeltaImportStrategy.FULL;
final String paramValue = taskParameters.getStringValue(DeltaImportStrategy.TASK_PARAM);
if (paramValue != null) {
try {
usage = DeltaImportStrategy.valueOf(paramValue.toUpperCase(Locale.ENGLISH));
} catch (final Exception ex) {
; // ignore.
}
}
return usage;
}
}