| /********************************************************************************************************************* |
| * Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This |
| * program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which |
| * accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| *********************************************************************************************************************/ |
| package org.eclipse.smila.importing.crawler.jdbc; |
| |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.datamodel.Any; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.datamodel.Value; |
| import org.eclipse.smila.importing.DeltaImportStrategy; |
| import org.eclipse.smila.importing.ImportingConstants; |
| import org.eclipse.smila.importing.ImportingException; |
| import org.eclipse.smila.importing.util.RecordOutputHandler; |
| import org.eclipse.smila.jdbc.JdbcAccessService; |
| import org.eclipse.smila.jdbc.SqlExecutor; |
| import org.eclipse.smila.objectstore.ObjectStoreException; |
| import org.eclipse.smila.taskworker.TaskContext; |
| import org.eclipse.smila.taskworker.Worker; |
| import org.eclipse.smila.taskworker.input.Inputs; |
| import org.eclipse.smila.taskworker.input.RecordInput; |
| import org.eclipse.smila.utils.digest.DigestHelper; |
| |
| public class JdbcCrawlerWorker implements Worker { |
| |
| /** Name of the worker, used in worker description and workflows. */ |
| public static final String NAME = "jdbcCrawler"; |
| |
| /** name of output slot taking the file records to process in ETL. */ |
| public static final String OUTPUT_SLOT_CRAWLED_RECORDS = "crawledRecords"; |
| |
| /** name of input slot taking the splits to be crawled (optional). */ |
| public static final String INPUT_SLOT_SPLITS_TO_CRAWL = "splitsToCrawl"; |
| |
| /** name of output slot taking the splits to be crawled (optional). */ |
| public static final String OUTPUT_SLOT_SPLITS_TO_CRAWL = INPUT_SLOT_SPLITS_TO_CRAWL; |
| |
| /** Maximum number of records in one bulk object. */ |
| public static final String TASK_PARAM_MAX_RECORDS_PER_BULK = "maxRecordsPerBulk"; |
| |
| /** database url */ |
| public static final String TASK_PARAM_DB_URL = "dbUrl"; |
| |
| /** database properties, e.g. user, password */ |
| public static final String TASK_PARAM_DB_PROPS = "dbProps"; |
| |
| /** crawl sql statement */ |
| public static final String TASK_PARAM_CRAWL_SQL = "crawlSql"; |
| |
| /** id columns */ |
| public static final String TASK_PARAM_ID_COLUMNS = "idColumns"; |
| |
| /** delta columns */ |
| public static final String TASK_PARAM_DELTA_COLUMNS = "deltaColumns"; |
| |
| /** max. size of binary content (BLOB etc.) in bytes. */ |
| public static final String TASK_PARAM_MAX_ATTACHMENT_SIZE = "maxAttachmentSize"; |
| |
| /** increment for splitted database crawling. */ |
| public static final String TASK_PARAM_SPLIT_INCREMENT = "splitIncrement"; |
| |
| /** SQL statement to determine split limits for splitted database crawling. */ |
| public static final String TASK_PARAM_SPLIT_LIMITS_SQL = "splitLimitsSql"; |
| |
| /** default: write up to 1000 records to one file bulk. */ |
| public static final Long MAX_RECORDS_PER_BULK_DEFAULT = 1000L; |
| |
| /** default max size of binary content is 1 GB. */ |
| public static final Long MAX_ATTACHMENT_SIZE_DEFAULT = 1000L * 1000L * 1000L; |
| |
| /** separator used for id and delta generation */ |
| private static final String COLUMN_SEPARATOR = "-"; |
| |
| /** name for MIN field in splitSql result. */ |
| private static final String SPLIT_MIN = "MIN"; |
| |
| /** name for MAX field in splitSql result. */ |
| private static final String SPLIT_MAX = "MAX"; |
| |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| private JdbcAccessService _jdbcAccess; |
| |
| @Override |
| public String getName() { |
| return NAME; |
| } |
| |
| @Override |
| public void perform(final TaskContext taskContext) throws Exception { |
| try { |
| final Inputs inputs = taskContext.getInputs(); |
| final JdbcCrawlingContext crawlContext = new JdbcCrawlingContext(taskContext); |
| if (!inputs.getSlotNames().contains(INPUT_SLOT_SPLITS_TO_CRAWL) |
| || inputs.getDataObjectCount(INPUT_SLOT_SPLITS_TO_CRAWL) == 0) { |
| initiateCrawling(crawlContext); |
| } else { |
| final RecordInput splitsToCrawl = inputs.getAsRecordInput(INPUT_SLOT_SPLITS_TO_CRAWL); |
| crawlSplits(splitsToCrawl, crawlContext); |
| } |
| } catch (final Exception e) { |
| _log.error(e); |
| throw e; |
| } |
| } |
| |
| /** DS service reference bind method. */ |
| public void setJdbcAccessService(final JdbcAccessService jdbcAccess) { |
| _jdbcAccess = jdbcAccess; |
| } |
| |
| /** DS service reference unbind method. */ |
| public void unsetJdbcAccessService(final JdbcAccessService jdbcAccess) { |
| if (_jdbcAccess == jdbcAccess) { |
| _jdbcAccess = null; |
| } |
| } |
| |
| /** |
| * initiates crawling. if no splitting is configured, the db is crawled directly. if splitting is configured, |
| * splitsToCrawl records will be created and sent. |
| */ |
| private void initiateCrawling(final JdbcCrawlingContext crawlContext) throws Exception { |
| // is this a splitted database crawl? |
| final AnyMap splitIntervalMetaData = determineSplitInterval(crawlContext); |
| Long min = null; |
| Long max = null; |
| if (splitIntervalMetaData != null) { |
| min = splitIntervalMetaData.getLongValue(SPLIT_MIN); |
| max = splitIntervalMetaData.getLongValue(SPLIT_MAX); |
| } |
| if (min != null && max != null) { |
| // create and send split records. |
| createAndSendSplitRecords(crawlContext, min, max); |
| } else { |
| // start crawling directly with no split limits. |
| crawl(crawlContext, null); |
| } |
| } |
| |
| /** creates the splitsToCrawl records with one record in each bulk for maximum parallelization. */ |
| private void createAndSendSplitRecords(final JdbcCrawlingContext crawlContext, final Long min, final Long max) |
| throws IOException, ObjectStoreException { |
| final RecordOutputHandler splitsToCrawlBulks = |
| new RecordOutputHandler(crawlContext.getTaskContext().getOutputs(), 1, OUTPUT_SLOT_SPLITS_TO_CRAWL); |
| long splitStart = min; |
| do { |
| final long splitEnd = Math.min(max, splitStart + crawlContext.getSplitIncrement() - 1); |
| final Record splitsToCrawlRecord = DataFactory.DEFAULT.createRecord(); |
| // send only one split per record. |
| splitsToCrawlRecord.getMetadata().put(SPLIT_MIN, splitStart); |
| splitsToCrawlRecord.getMetadata().put(SPLIT_MAX, splitEnd); |
| splitsToCrawlBulks.writeRecord(splitsToCrawlRecord); |
| splitStart = splitEnd + 1; |
| } while (splitStart <= max); |
| } |
| |
| /** determines the split limits. */ |
| private AnyMap determineSplitInterval(final JdbcCrawlingContext crawlContext) throws SQLException, IOException { |
| if (crawlContext.getSplitLimitsSql() != null) { |
| final Collection<Record> results = |
| _jdbcAccess.executeSql(crawlContext.getDbUrl(), crawlContext.getDbProperties(), |
| crawlContext.getSplitLimitsSql(), 0, crawlContext.getMessages()); |
| // the first record must contain MIN and MAX values. |
| if (results != null && results.size() > 0) { |
| return results.iterator().next().getMetadata(); |
| } |
| } |
| return null; |
| } |
| |
| /** crawl a split. */ |
| private void crawlSplits(final RecordInput splitsToCrawl, final JdbcCrawlingContext crawlContext) |
| throws Exception { |
| Record record = splitsToCrawl.getRecord(); |
| while (record != null && !crawlContext.getTaskContext().isCanceled()) { |
| final List<Value> splitValues = new ArrayList<>(); |
| splitValues.add(record.getMetadata().getValue(SPLIT_MIN)); |
| splitValues.add(record.getMetadata().getValue(SPLIT_MAX)); |
| crawl(crawlContext, splitValues); |
| record = splitsToCrawl.getRecord(); |
| } |
| } |
| |
| /** |
| * crawls the complete DB or only a split. |
| * |
| * @param splitValues |
| * the split values (MIN/MAX), if null or less than two values, the complete db will be crawled (i.e. no |
| * parameters will be applied to the crawl sql statement), if not null, these parameters will be applied to |
| * the crqwl db statement. |
| */ |
| private void crawl(final JdbcCrawlingContext crawlContext, final List<Value> splitValues) throws Exception { |
| final RecordOutputHandler recordBulks = |
| new RecordOutputHandler(crawlContext.getTaskContext().getOutputs(), crawlContext.getMaxRecordsPerBulk(), |
| OUTPUT_SLOT_CRAWLED_RECORDS); |
| final Collection<Record> results; |
| if (splitValues == null || splitValues.size() < 2) { |
| // no splitting needed... |
| results = |
| _jdbcAccess.executeSql(crawlContext.getDbUrl(), crawlContext.getDbProperties(), crawlContext.getCrawlSql(), |
| crawlContext.getMaxAttachmentSize(), crawlContext.getMessages()); |
| } else { |
| // in that case, the crawlSql must be parameterizable with min/max values... |
| try (final SqlExecutor sqlExecutor = |
| _jdbcAccess.executePrepared(crawlContext.getDbUrl(), crawlContext.getDbProperties(), |
| crawlContext.getCrawlSql(), crawlContext.getMaxAttachmentSize(), crawlContext.getMessages())) { |
| results = sqlExecutor.execute(splitValues); |
| } |
| } |
| final Iterator<Record> it = results.iterator(); |
| while (it.hasNext() && !crawlContext.getTaskContext().isCanceled()) { |
| final Record record = it.next(); |
| augmentRecord(record, crawlContext); |
| crawlContext.getMapper().mapNames(record, getColumnNames(record)); |
| recordBulks.writeRecord(record); |
| } |
| } |
| |
| private Record augmentRecord(final Record record, final JdbcCrawlingContext crawlContext) |
| throws ImportingException { |
| record.setId(createId(crawlContext, record)); |
| record.setSource(crawlContext.getDataSource()); |
| addDeltaHashToRecord(record, crawlContext); |
| return record; |
| } |
| |
| private String createId(final JdbcCrawlingContext crawlContext, final Record record) throws ImportingException { |
| return crawlContext.getDataSource() + ":" + concatColumnValues(crawlContext.getIdColumns(), record); |
| } |
| |
| private void addDeltaHashToRecord(final Record record, final JdbcCrawlingContext crawlContext) |
| throws ImportingException { |
| final DeltaImportStrategy deltaStrategy = |
| getDeltaImportStrategy(crawlContext.getTaskContext().getTaskParameters()); |
| if (deltaStrategy != DeltaImportStrategy.DISABLED) { |
| final Any deltaColumns = crawlContext.getDeltaColumns(); |
| if (deltaColumns == null) { |
| final Any mappingColumns = crawlContext.getMapper().getMappingKeys(); |
| if (!mappingColumns.isEmpty()) { |
| record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH, |
| concatColumnValues(mappingColumns, record)); |
| } |
| } else if (!deltaColumns.isEmpty()) { |
| record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH, concatColumnValues(deltaColumns, record)); |
| } |
| } |
| } |
| |
| private String concatColumnValues(final Any columnNames, final Record record) throws ImportingException { |
| final StringBuffer idBuffer = new StringBuffer(); |
| if (columnNames != null) { |
| final AnyMap metadata = record.getMetadata(); |
| for (final Any column : columnNames) { |
| final String columnName = column.asValue().asString(); |
| if (metadata.containsKey(columnName)) { |
| String value = metadata.getStringValue(columnName); |
| if (value.length() > 100) { |
| value = DigestHelper.calculateDigest(value); |
| } |
| idBuffer.append(value); |
| } else if (record.hasAttachment(columnName)) { |
| idBuffer.append(DigestHelper.calculateDigest(record.getAttachmentAsBytes(columnName))); |
| } else { |
| idBuffer.append("NULL"); |
| } |
| idBuffer.append(COLUMN_SEPARATOR); |
| } |
| idBuffer.deleteCharAt(idBuffer.length() - 1); // remove last added column separator |
| } |
| return idBuffer.toString(); |
| } |
| |
| private Set<String> getColumnNames(final Record resultRecord) { |
| final HashSet<String> columnNames = new HashSet<String>(); |
| columnNames.addAll(resultRecord.getMetadata().keySet()); |
| // remove keys with special meanings |
| columnNames.remove(Record.RECORD_ID); |
| columnNames.remove(Record.SOURCE); |
| columnNames.remove(ImportingConstants.ATTRIBUTE_DELTA_HASH); |
| final Iterator<String> it = resultRecord.getAttachmentNames(); |
| while (it.hasNext()) { |
| columnNames.add(it.next()); |
| } |
| return columnNames; |
| } |
| |
| /** |
| * get deltaImportStrategy parameter from task parameters. Fall back to {@link DeltaImportStrategy#FULL} if parameter |
| * is not set or value is invalid. |
| */ |
| private static DeltaImportStrategy getDeltaImportStrategy(final AnyMap taskParameters) { |
| DeltaImportStrategy usage = DeltaImportStrategy.FULL; |
| final String paramValue = taskParameters.getStringValue(DeltaImportStrategy.TASK_PARAM); |
| if (paramValue != null) { |
| try { |
| usage = DeltaImportStrategy.valueOf(paramValue.toUpperCase(Locale.ENGLISH)); |
| } catch (final Exception ex) { |
| ; // ignore. |
| } |
| } |
| return usage; |
| } |
| } |