blob: d2cb1ae45efb00407c801dba9ce78a5afef71f58 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*********************************************************************************************************************/
package org.eclipse.smila.importing.crawler.feed;
import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.util.RecordMerger;
import org.eclipse.smila.importing.ImportingConstants;
import org.eclipse.smila.importing.ImportingException;
import org.eclipse.smila.importing.util.RecordOutputHandler;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.Inputs;
import org.eclipse.smila.taskworker.input.RecordInput;
public class FeedCrawlerWorker implements Worker {
/** Name of the worker, used in worker description and workflows. */
public static final String NAME = "feedCrawler";
/** Name of the (optional) input slot containing the links to crawl. */
public static final String INPUT_SLOT_LINKS_TO_CRAWL = "linksToCrawl";
/** name of output slot containing the crawled records. */
public static final String OUTPUT_SLOT_CRAWLED_RECORDS = "crawledRecords";
/** Name of the task parameter that contains the feed URL(s) (may be more than one). */
public static final String TASK_PARAM_FEED_URL = "feedUrls";
/** Maximum number of records in one bulk object. */
public static final String TASK_PARAM_MAX_RECORDS_PER_BULK = "maxRecordsPerBulk";
/** feed entry properties used for delta hash. */
public static final String TASK_PARAM_DELTA_PROPERTIES = "deltaProperties";
/** (when using input slot) Name of the input record attribute containing the feed URL. */
public static final String INPUT_ATTRIBUTE_FEED_URL = "httpUrl";
/** default: write up to 1000 records to one output bulk. */
public static final Long MAX_RECORDS_PER_BULK_DEFAULT = 1000L;
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
@Override
public String getName() {
return NAME;
}
@Override
public void perform(final TaskContext taskContext) throws Exception {
final FeedCrawlingContext crawlContext = new FeedCrawlingContext(taskContext);
final Inputs inputs = taskContext.getInputs();
final RecordOutputHandler recordOutput =
new RecordOutputHandler(taskContext.getOutputs(), crawlContext.getMaxRecordsPerBulk(),
OUTPUT_SLOT_CRAWLED_RECORDS);
final StringBuilder errorMessage = new StringBuilder();
boolean success = false;
if (inputs.getDataObjectCount(INPUT_SLOT_LINKS_TO_CRAWL) == 0) {
// feed crawler is used without an input bucket -> use feed URLs from job config
success = crawlConfiguredFeeds(crawlContext, recordOutput, errorMessage);
} else {
// feed crawler is used with an input bucket -> use feed URLs from input records
final RecordInput linksToCrawl = inputs.getAsRecordInput(INPUT_SLOT_LINKS_TO_CRAWL);
success = crawlInputRecordFeeds(crawlContext, linksToCrawl, recordOutput, errorMessage);
}
if (!success) {
// none of the feed URLs were successfully crawled
throw new ImportingException(errorMessage.toString());
}
}
/** crawl feeds based on URLs from records of input bucket.
*
* @return 'true' if at least one feed URL from input records was successfully crawled.
*/
private boolean crawlInputRecordFeeds(final FeedCrawlingContext crawlContext, final RecordInput linksToCrawl,
final RecordOutputHandler recordOutput, final StringBuilder errorMessage) throws Exception {
boolean success = false;
Record inputRecord = linksToCrawl.getRecord();
while (inputRecord != null && !crawlContext.getTaskContext().isCanceled()) {
final String feedUrl = inputRecord.getMetadata().getStringValue(INPUT_ATTRIBUTE_FEED_URL);
if (feedUrl != null) {
// success if at least one feed URL was successfully crawled
success = crawl(crawlContext, feedUrl, recordOutput, inputRecord, errorMessage) || success;
} else {
_log.warn("FeedCrawler input record contains no attribute '" + INPUT_ATTRIBUTE_FEED_URL
+ "' which is needed for feed crawling. Record was: " + inputRecord);
}
inputRecord = linksToCrawl.getRecord();
}
return success;
}
/**
* crawl feeds based on URLs from job configuration.
*
* @return 'true' if at least one configured feed URL was successfully crawled.
*/
private boolean crawlConfiguredFeeds(final FeedCrawlingContext crawlContext,
final RecordOutputHandler recordOutput, final StringBuilder errorMessage) throws Exception {
boolean success = false;
final Iterator<Any> it = crawlContext.getFeedUrls().iterator();
while (it.hasNext() && !crawlContext.getTaskContext().isCanceled()) {
final Any url = it.next();
final String feedUrl = url.toString();
final Record baseRecord = DataFactory.DEFAULT.createRecord();
// success if at least one feed URL was successfully crawled
success = crawl(crawlContext, feedUrl, recordOutput, baseRecord, errorMessage) || success;
}
return success;
}
/**
* read given feed, map feed entries to given record and write them to output.
*
* @return 'true' if given feed URL was successfully crawled.
*/
private boolean crawl(final FeedCrawlingContext crawlContext, final String feedUrl,
final RecordOutputHandler recordOutput, final Record baseRecord, final StringBuilder errorMessage)
throws Exception {
if (_log.isInfoEnabled()) {
_log.info("Crawling feed " + feedUrl);
}
try {
final RomeFeedReader feedReader = new RomeFeedReader();
final Collection<Record> results = feedReader.readFeed(feedUrl);
final Iterator<Record> it = results.iterator();
while (it.hasNext() && !crawlContext.getTaskContext().isCanceled()) {
final Record feedRecord = it.next();
final Record resultRecord = DataFactory.DEFAULT.cloneRecord(baseRecord, null);
RecordMerger.mergeRecords(resultRecord, feedRecord, false, false);
augmentRecord(crawlContext, feedUrl, resultRecord);
crawlContext.getMapper().mapNames(resultRecord, FeedProperties.ALL_PROPS);
recordOutput.writeRecord(resultRecord);
}
} catch (final Exception e) {
final String m = "Error while crawling feed '" + feedUrl + "': " + e.getMessage() + ". ";
_log.warn(m);
errorMessage.append(m);
return false; // failure
}
return true; // success
}
/** add id, source and delta hash to record. */
private Record augmentRecord(final FeedCrawlingContext crawlContext, final String feedUrl, final Record record)
throws ImportingException {
record.setId(createId(crawlContext, record));
record.setSource(crawlContext.getDataSource());
final AnyMap metadata = record.getMetadata();
metadata.put(ImportingConstants.ATTRIBUTE_DELTA_HASH, createDeltaHash(crawlContext, record));
metadata.put(FeedProperties.FEED_SOURCE_URL, feedUrl);
return record;
}
/** create record id. */
private String createId(final FeedCrawlingContext crawlContext, final Record record) throws ImportingException {
final String feedEntryUri = record.getMetadata().getStringValue(FeedProperties.FEED_ENTRY_URI);
// "How the entry URI maps to a concrete feed type (RSS or Atom) depends on
// the concrete feed type. This is explained in detail in Rome documentation",
// see http://wiki.java.net/twiki/bin/view/Javawsxml/Rome05URIMapping
return crawlContext.getDataSource() + ":" + feedEntryUri;
}
/** create delta hash for record. */
private String createDeltaHash(final FeedCrawlingContext crawlContext, final Record record)
throws ImportingException {
final Any deltaProps = crawlContext.getDeltaProperties();
if (deltaProps == null) {
// no delta properties configured -> force update.
return UUID.randomUUID().toString();
}
final StringBuilder deltaBuilder = new StringBuilder();
boolean success = false;
for (final Any propAny : deltaProps) {
final String prop = propAny.toString();
if (record.getMetadata().containsKey(prop)) {
final Any any = record.getMetadata().getValue(prop);
if (any.isValue()) {
deltaBuilder.append(any.asValue().asString());
success = true; // at least one delta property could be successfully added
} else {
_log.debug("Couldn't add property '" + prop
+ "' to delta hash. Property has no simple value, value type is " + any.getValueType() + ".");
}
} else {
_log.debug("Couldn't add property '" + prop + "' to delta hash. Property has no value.");
}
}
if (!success) {
_log.warn("Couldn't create delta hash from configured properties, no values were set. Forcing update.");
return UUID.randomUUID().toString(); // force update
}
return deltaBuilder.toString();
}
}