/********************************************************************************************************************* | |
* Copyright (c) 2008, 2013 Empolis Information Management GmbH and brox IT Solutions GmbH. All rights reserved. This | |
* program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 which | |
* accompanies this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
*********************************************************************************************************************/ | |
package org.eclipse.smila.importing.crawler.feed; | |
import java.util.Collection; | |
import java.util.Iterator; | |
import java.util.UUID; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.datamodel.Any; | |
import org.eclipse.smila.datamodel.AnyMap; | |
import org.eclipse.smila.datamodel.DataFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.datamodel.util.RecordMerger; | |
import org.eclipse.smila.importing.ImportingConstants; | |
import org.eclipse.smila.importing.ImportingException; | |
import org.eclipse.smila.importing.util.RecordOutputHandler; | |
import org.eclipse.smila.taskworker.TaskContext; | |
import org.eclipse.smila.taskworker.Worker; | |
import org.eclipse.smila.taskworker.input.Inputs; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
public class FeedCrawlerWorker implements Worker { | |
/** Name of the worker, used in worker description and workflows. */ | |
public static final String NAME = "feedCrawler"; | |
/** Name of the (optional) input slot containing the links to crawl. */ | |
public static final String INPUT_SLOT_LINKS_TO_CRAWL = "linksToCrawl"; | |
/** name of output slot containing the crawled records. */ | |
public static final String OUTPUT_SLOT_CRAWLED_RECORDS = "crawledRecords"; | |
/** Name of the task parameter that contains the feed URL(s) (may be more than one). */ | |
public static final String TASK_PARAM_FEED_URL = "feedUrls"; | |
/** Maximum number of records in one bulk object. */ | |
public static final String TASK_PARAM_MAX_RECORDS_PER_BULK = "maxRecordsPerBulk"; | |
/** feed entry properties used for delta hash. */ | |
public static final String TASK_PARAM_DELTA_PROPERTIES = "deltaProperties"; | |
/** (when using input slot) Name of the input record attribute containing the feed URL. */ | |
public static final String INPUT_ATTRIBUTE_FEED_URL = "httpUrl"; | |
/** default: write up to 1000 records to one output bulk. */ | |
public static final Long MAX_RECORDS_PER_BULK_DEFAULT = 1000L; | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
@Override | |
public String getName() { | |
return NAME; | |
} | |
@Override | |
public void perform(final TaskContext taskContext) throws Exception { | |
final FeedCrawlingContext crawlContext = new FeedCrawlingContext(taskContext); | |
final Inputs inputs = taskContext.getInputs(); | |
final RecordOutputHandler recordOutput = | |
new RecordOutputHandler(taskContext.getOutputs(), crawlContext.getMaxRecordsPerBulk(), | |
OUTPUT_SLOT_CRAWLED_RECORDS); | |
final StringBuilder errorMessage = new StringBuilder(); | |
boolean success = false; | |
if (inputs.getDataObjectCount(INPUT_SLOT_LINKS_TO_CRAWL) == 0) { | |
// feed crawler is used without an input bucket -> use feed URLs from job config | |
success = crawlConfiguredFeeds(crawlContext, recordOutput, errorMessage); | |
} else { | |
// feed crawler is used with an input bucket -> use feed URLs from input records | |
final RecordInput linksToCrawl = inputs.getAsRecordInput(INPUT_SLOT_LINKS_TO_CRAWL); | |
success = crawlInputRecordFeeds(crawlContext, linksToCrawl, recordOutput, errorMessage); | |
} | |
if (!success) { | |
// none of the feed URLs were successfully crawled | |
throw new ImportingException(errorMessage.toString()); | |
} | |
} | |
/** crawl feeds based on URLs from records of input bucket. | |
* | |
* @return 'true' if at least one feed URL from input records was successfully crawled. | |
*/ | |
private boolean crawlInputRecordFeeds(final FeedCrawlingContext crawlContext, final RecordInput linksToCrawl, | |
final RecordOutputHandler recordOutput, final StringBuilder errorMessage) throws Exception { | |
boolean success = false; | |
Record inputRecord = linksToCrawl.getRecord(); | |
while (inputRecord != null && !crawlContext.getTaskContext().isCanceled()) { | |
final String feedUrl = inputRecord.getMetadata().getStringValue(INPUT_ATTRIBUTE_FEED_URL); | |
if (feedUrl != null) { | |
// success if at least one feed URL was successfully crawled | |
success = crawl(crawlContext, feedUrl, recordOutput, inputRecord, errorMessage) || success; | |
} else { | |
_log.warn("FeedCrawler input record contains no attribute '" + INPUT_ATTRIBUTE_FEED_URL | |
+ "' which is needed for feed crawling. Record was: " + inputRecord); | |
} | |
inputRecord = linksToCrawl.getRecord(); | |
} | |
return success; | |
} | |
/** | |
* crawl feeds based on URLs from job configuration. | |
* | |
* @return 'true' if at least one configured feed URL was successfully crawled. | |
*/ | |
private boolean crawlConfiguredFeeds(final FeedCrawlingContext crawlContext, | |
final RecordOutputHandler recordOutput, final StringBuilder errorMessage) throws Exception { | |
boolean success = false; | |
final Iterator<Any> it = crawlContext.getFeedUrls().iterator(); | |
while (it.hasNext() && !crawlContext.getTaskContext().isCanceled()) { | |
final Any url = it.next(); | |
final String feedUrl = url.toString(); | |
final Record baseRecord = DataFactory.DEFAULT.createRecord(); | |
// success if at least one feed URL was successfully crawled | |
success = crawl(crawlContext, feedUrl, recordOutput, baseRecord, errorMessage) || success; | |
} | |
return success; | |
} | |
/** | |
* read given feed, map feed entries to given record and write them to output. | |
* | |
* @return 'true' if given feed URL was successfully crawled. | |
*/ | |
private boolean crawl(final FeedCrawlingContext crawlContext, final String feedUrl, | |
final RecordOutputHandler recordOutput, final Record baseRecord, final StringBuilder errorMessage) | |
throws Exception { | |
if (_log.isInfoEnabled()) { | |
_log.info("Crawling feed " + feedUrl); | |
} | |
try { | |
final RomeFeedReader feedReader = new RomeFeedReader(); | |
final Collection<Record> results = feedReader.readFeed(feedUrl); | |
final Iterator<Record> it = results.iterator(); | |
while (it.hasNext() && !crawlContext.getTaskContext().isCanceled()) { | |
final Record feedRecord = it.next(); | |
final Record resultRecord = DataFactory.DEFAULT.cloneRecord(baseRecord, null); | |
RecordMerger.mergeRecords(resultRecord, feedRecord, false, false); | |
augmentRecord(crawlContext, feedUrl, resultRecord); | |
crawlContext.getMapper().mapNames(resultRecord, FeedProperties.ALL_PROPS); | |
recordOutput.writeRecord(resultRecord); | |
} | |
} catch (final Exception e) { | |
final String m = "Error while crawling feed '" + feedUrl + "': " + e.getMessage() + ". "; | |
_log.warn(m); | |
errorMessage.append(m); | |
return false; // failure | |
} | |
return true; // success | |
} | |
/** add id, source and delta hash to record. */ | |
private Record augmentRecord(final FeedCrawlingContext crawlContext, final String feedUrl, final Record record) | |
throws ImportingException { | |
record.setId(createId(crawlContext, record)); | |
record.setSource(crawlContext.getDataSource()); | |
final AnyMap metadata = record.getMetadata(); | |
metadata.put(ImportingConstants.ATTRIBUTE_DELTA_HASH, createDeltaHash(crawlContext, record)); | |
metadata.put(FeedProperties.FEED_SOURCE_URL, feedUrl); | |
return record; | |
} | |
/** create record id. */ | |
private String createId(final FeedCrawlingContext crawlContext, final Record record) throws ImportingException { | |
final String feedEntryUri = record.getMetadata().getStringValue(FeedProperties.FEED_ENTRY_URI); | |
// "How the entry URI maps to a concrete feed type (RSS or Atom) depends on | |
// the concrete feed type. This is explained in detail in Rome documentation", | |
// see http://wiki.java.net/twiki/bin/view/Javawsxml/Rome05URIMapping | |
return crawlContext.getDataSource() + ":" + feedEntryUri; | |
} | |
/** create delta hash for record. */ | |
private String createDeltaHash(final FeedCrawlingContext crawlContext, final Record record) | |
throws ImportingException { | |
final Any deltaProps = crawlContext.getDeltaProperties(); | |
if (deltaProps == null) { | |
// no delta properties configured -> force update. | |
return UUID.randomUUID().toString(); | |
} | |
final StringBuilder deltaBuilder = new StringBuilder(); | |
boolean success = false; | |
for (final Any propAny : deltaProps) { | |
final String prop = propAny.toString(); | |
if (record.getMetadata().containsKey(prop)) { | |
final Any any = record.getMetadata().getValue(prop); | |
if (any.isValue()) { | |
deltaBuilder.append(any.asValue().asString()); | |
success = true; // at least one delta property could be successfully added | |
} else { | |
_log.debug("Couldn't add property '" + prop | |
+ "' to delta hash. Property has no simple value, value type is " + any.getValueType() + "."); | |
} | |
} else { | |
_log.debug("Couldn't add property '" + prop + "' to delta hash. Property has no value."); | |
} | |
} | |
if (!success) { | |
_log.warn("Couldn't create delta hash from configured properties, no values were set. Forcing update."); | |
return UUID.randomUUID().toString(); // force update | |
} | |
return deltaBuilder.toString(); | |
} | |
} |