blob: b469c94ac310dc2d5eafcf9fbdf672f4afd29f2c [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Juergen Schumacher (Attensity Europe GmbH) - initial API and implementation
*******************************************************************************/
package org.eclipse.smila.importing.crawler.web;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.client.RedirectException;
import org.eclipse.smila.datamodel.Any;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.importing.ImportingConstants;
import org.eclipse.smila.importing.VisitedLinksException;
import org.eclipse.smila.importing.VisitedLinksService;
import org.eclipse.smila.importing.compounds.CompoundExtractor;
import org.eclipse.smila.importing.crawler.web.WebCrawlerConstants.ErrorHandling;
import org.eclipse.smila.importing.crawler.web.filter.FilterConfiguration;
import org.eclipse.smila.importing.crawler.web.utils.UriHelper;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.Inputs;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.Outputs;
import org.eclipse.smila.taskworker.output.RecordOutput;
import org.eclipse.smila.utils.MaybeRecoverableException;
/** Worker for Web crawling. */
public class WebCrawlerWorker implements Worker {
/** Name of the worker, used in worker description and workflows. */
public static final String NAME = "webCrawler";
/** name of input slot containing the links to crawl. */
public static final String INPUT_SLOT_LINKS_TO_CRAWL = "linksToCrawl";
/** name of output slot containing the links to crawl. */
public static final String OUTPUT_SLOT_LINKS_TO_CRAWL = "linksToCrawl";
/** name of input slot containing the crawled records. */
public static final String OUTPUT_SLOT_CRAWLED_RECORDS = "crawledRecords";
/**
* number of records to write into a single output bulk. However, usually the worker will produce 1 record output bulk
* per link input bulk because for each input link at most one output record will be produced.
*/
private static final int RECORDS_PER_BULK = 100;
/** dummy input bulk Id used in initial crawl task for marking links as visited. */
private static final String BULK_ID_FOR_INITIAL_TASK = "initial";
/** reference to VisitedLinks service. */
private VisitedLinksService _visitedLinks;
/** reference to Fetcher service. */
private Fetcher _fetcher;
/** reference to LinkExtractor service. */
private LinkExtractor _linkExtractor;
/** reference to LinkFilter service. */
private LinkFilter _linkFilter;
/** reference to RecordProducer service. */
private RecordProducer _recordProducer;
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** the compound extractor. */
private CompoundExtractor _compoundExtractor;
/**
* holds the current output bulks and creates new output bulks if the specified number or links or records have been
* written.
*/
private static final class RecordOutputHandler {
/** task context outputs manager. */
private final Outputs _outputs;
/** maximum number of links to write to a single bulk. */
private final int _linksPerBulk;
/** maximum number of records to write to a single bulk. */
private final int _recordsPerBulk;
/** current linksToCrawl bulk. */
private RecordOutput _linksToCrawl;
/** index of current linksToCrawl bulk. */
private int _linksToCrawlBulkIndex;
/** current crawledRecords bulk. */
private RecordOutput _crawledRecords;
/** index of current crawledRecords bulk. */
private int _crawledRecordsBulkIndex;
/** initialize instance for processing of one task. */
private RecordOutputHandler(final Outputs outputs, final int linksPerBulk, final int recordsPerBulk) {
_outputs = outputs;
_linksPerBulk = linksPerBulk;
_recordsPerBulk = recordsPerBulk;
}
/** write a record to the linksToCrawl bulk, start a new bulk if necessary. */
private void addLinkToCrawl(final Record record) throws WebCrawlerException {
try {
if (_linksToCrawl == null) {
_linksToCrawl = _outputs.getAsRecordOutput(OUTPUT_SLOT_LINKS_TO_CRAWL);
} else if (_linksToCrawl.getRecordCount() >= _linksPerBulk) {
_linksToCrawl.commit();
_linksToCrawlBulkIndex++;
_linksToCrawl = _outputs.getAsRecordOutput(OUTPUT_SLOT_LINKS_TO_CRAWL, _linksToCrawlBulkIndex);
}
_linksToCrawl.writeRecord(record);
} catch (final Exception ex) {
throw new WebCrawlerException("Error writing to linksToCrawl bulk", ex);
}
}
/** apply mapping and write the mapped record to the crawledRecords bulk, start a new bulk if necessary. */
private void mapAndAddCrawledRecord(final Record record, final WebCrawlingContext context)
throws WebCrawlerException {
try {
if (_crawledRecords == null) {
_crawledRecords = _outputs.getAsRecordOutput(OUTPUT_SLOT_CRAWLED_RECORDS);
} else if (_crawledRecords.getRecordCount() >= _recordsPerBulk) {
_crawledRecords.commit();
_crawledRecordsBulkIndex++;
_crawledRecords = _outputs.getAsRecordOutput(OUTPUT_SLOT_CRAWLED_RECORDS, _crawledRecordsBulkIndex);
}
// map attributes. Do not do it before...
context.getMapper().mapNames(record, WebCrawlerConstants.PROPERTY_NAMES);
_crawledRecords.writeRecord(record);
} catch (final Exception ex) {
throw new WebCrawlerException("Error writing to crawledRecords bulk", ex);
}
}
}
/** get MIME Type from record. */
public static String getMimeType(final Record record) {
return record.getMetadata().getStringValue(WebCrawlerConstants.ATTRIBUTE_MIMETYPE);
}
@Override
public String getName() {
return NAME;
}
@Override
public void perform(final TaskContext taskContext) throws Exception {
final Inputs inputs = taskContext.getInputs();
final WebCrawlingContext webCrawlingContext = new WebCrawlingContext(taskContext);
if (inputs.getDataObjectCount(INPUT_SLOT_LINKS_TO_CRAWL) == 0) {
initiateCrawling(webCrawlingContext);
} else {
final RecordInput linksToCrawl = inputs.getAsRecordInput(INPUT_SLOT_LINKS_TO_CRAWL);
crawlLinkRecords(linksToCrawl, webCrawlingContext);
}
}
/** start crawling from task parameters. */
private void initiateCrawling(final WebCrawlingContext webCrawlingContext) throws MaybeRecoverableException {
// put each link to an own bulk to improving scaling.
final RecordOutputHandler outputBulks =
new RecordOutputHandler(webCrawlingContext.getTaskContext().getOutputs(), 1, RECORDS_PER_BULK);
final Record initialLinkRecord = DataFactory.DEFAULT.createRecord();
final String startUrl =
webCrawlingContext.getTaskParameters().getStringValue(WebCrawlerConstants.TASK_PARAM_START_URL);
setUrl(initialLinkRecord, startUrl);
initCrawlDepth(initialLinkRecord, webCrawlingContext);
_visitedLinks.clearSource(webCrawlingContext.getDataSource());
webCrawlingContext.setCurrentInputBulkId(BULK_ID_FOR_INITIAL_TASK);
crawlLinkRecord(initialLinkRecord, outputBulks, webCrawlingContext);
}
/** crawl links from input records. */
private void crawlLinkRecords(final RecordInput linksToCrawl, final WebCrawlingContext context)
throws ObjectStoreException, IOException, MaybeRecoverableException {
final Long sleepTime =
context.getTaskParameters().getLongValue(WebCrawlerConstants.TASK_PARAM_WAIT_BETWEEN_REQUESTS);
final RecordOutputHandler outputBulks =
new RecordOutputHandler(context.getTaskContext().getOutputs(), context.getLinksPerBulk(), RECORDS_PER_BULK);
final String inputBulkId = linksToCrawl.getObjectName();
context.setCurrentInputBulkId(inputBulkId);
Record record = linksToCrawl.getRecord();
while (record != null && !context.getTaskContext().isCanceled()) {
initCrawlDepth(record, context); // make sure that crawl depth is set in record
normalizeUrl(record, context);
if (hasNotBeenVisited(record, context)) {
waitBetweenRequests(sleepTime);
crawlLinkRecord(record, outputBulks, context);
context.getVisitedUrls().add(getUrl(record));
}
record = linksToCrawl.getRecord();
}
}
/** ensure that URL in record is normalized. */
private void normalizeUrl(final Record record, final WebCrawlingContext context) throws MaybeRecoverableException {
try {
setUrl(record, getUrl(record)); // ensure that URL is normalized
} catch (final MaybeRecoverableException ex) {
handleCrawlException(record, context, ex);
}
}
/**
* check if the URL in a record has already been visited: either in this task by checking local list of URLs visited
* in this task, or by a worker processing a different crawl task, by checking the global {@link VisitedLinksService}
* instance.
*/
private boolean hasNotBeenVisited(final Record record, final WebCrawlingContext context)
throws VisitedLinksException {
final String url = getUrl(record);
debugLogUrl("Check if visited", record);
boolean notVisited = false;
if (!context.getVisitedUrls().contains(url)) {
// not yet visited in this task
notVisited = !checkAndMarkVisitedTimed(record, context);
debugLogUrl("Will visit: " + notVisited, record);
} else {
debugLogUrl("Duplicate URL in task", record);
}
return notVisited;
}
/**
* Check if an url in a record is marked as visited in {@link VisitedLinksService} instance and otherwise mark it.
* measure time as "duration...checkVisitedLinks"
*
* @return 'true' if url was already visited, 'false' otherwise.
*/
private boolean checkAndMarkVisitedTimed(final Record record, final WebCrawlingContext context)
throws VisitedLinksException {
final String url = getUrl(record);
debugLogUrl("Check if visited", record);
final long time = context.getTaskContext().getTimestamp();
try {
return _visitedLinks.checkAndMarkVisited(context.getDataSource(), url, context.getJobRunId(),
context.getCurrentInputBulkId());
} finally {
context.getTaskContext().measureTime("checkVisitedLinks", time);
}
}
/** crawl a link represented by one record: fetch metadata and content, extract links, produce record. */
private void crawlLinkRecord(final Record linkRecord, final RecordOutputHandler outputBulks,
final WebCrawlingContext context) throws MaybeRecoverableException {
try {
invokeFetcherTimed(linkRecord, context);
// Check again after fetching to prevent duplicates when an URL is processed by two workers at the same time.
// Additionally, this marks the URL as visited in the initial crawl task.
if (!checkAndMarkVisitedTimed(linkRecord, context)) {
extractAndFilterLinks(linkRecord, outputBulks, context);
produceAndWriteRecords(linkRecord, outputBulks, context);
}
} catch (final MaybeRecoverableException ex) {
handleCrawlException(linkRecord, context, ex);
} catch (final RuntimeException ex) {
logNonRecoverableError(linkRecord, ex, context);
}
}
/** log or rethrow exception. */
private void handleCrawlException(final Record linkRecord, final WebCrawlingContext context,
final MaybeRecoverableException ex) throws MaybeRecoverableException {
if (ex.isRecoverable() && context.getErrorHandling() == ErrorHandling.RETRY) {
throw ex; // schedule retry for task
}
if (BULK_ID_FOR_INITIAL_TASK.equals(context.getCurrentInputBulkId())) {
if (ex.isRecoverable() && context.getErrorHandling() != ErrorHandling.RETRY) {
// we do not want to retry, so wrap exception such that job run will fail.
throw new WebCrawlerException(ex.getMessage(), ex, false);
}
// exception is not recoverable so we can just rethrow it.
throw ex;
}
if (ex.getCause() != null && ex.getCause() instanceof RedirectException) {
logRedirectErrors(linkRecord, context, ex);
} else {
logNonRecoverableError(linkRecord, ex, context);
}
}
private void logRedirectErrors(final Record linkRecord, final WebCrawlingContext context,
final MaybeRecoverableException ex) {
final FilterConfiguration filterConfig = context.getFilterConfiguration();
if (filterConfig == null || !filterConfig.followRedirects()) {
_log.info(ex.getLocalizedMessage()); // ignore redirect errors if follow redirects is not configured
} else {
logNonRecoverableError(linkRecord, ex, context);
}
}
/**
* produce the record to be processed by SMILA from the crawled link record and write them to the records output bulk.
*/
private void produceAndWriteRecords(final Record linkRecord, final RecordOutputHandler outputBulks,
final WebCrawlingContext context) throws WebCrawlerException {
final Collection<Record> crawledRecords = produceRecordsTimed(linkRecord, context);
for (final Record crawledRecord : crawledRecords) {
if (isCompoundRecord(crawledRecord, context)) {
setIsCompound(crawledRecord);
}
outputBulks.mapAndAddCrawledRecord(crawledRecord, context);
}
}
/** checks if the crawled record is a compound record. */
private boolean isCompoundRecord(final Record record, final WebCrawlingContext context) {
return _compoundExtractor.canExtract(getUrl(record), getMimeType(record));
}
/** extract and filter links from content of the fetched web resource and write them to the links output bulk. */
private void extractAndFilterLinks(final Record linkRecord, final RecordOutputHandler outputBulks,
final WebCrawlingContext context) throws WebCrawlerException, VisitedLinksException {
if (linkRecord.hasAttachment(WebCrawlerConstants.ATTACHMENT_CONTENT)) {
final Collection<Record> extractedLinks = extractLinksTimed(linkRecord, context);
final Collection<Record> filteredLinks = filterLinksTimed(extractedLinks, linkRecord, context);
for (final Record outgoingLink : filteredLinks) {
// check if extracted links are already visited
if (context.getExtractedUrls().add(getUrl(outgoingLink))) {
if (isNotVisitedTimed(outgoingLink, context)) {
outputBulks.addLinkToCrawl(outgoingLink);
}
}
}
}
}
/** invoke fetcher and measure time as "duration...fetchResource". */
private void invokeFetcherTimed(final Record linkRecord, final WebCrawlingContext context)
throws WebCrawlerException {
debugLogUrl("Call fetcher for ", linkRecord);
final long time = context.getTaskContext().getTimestamp();
try {
// this record has not been mapped, so use the original URL attribute, ignore mapping rules!
_fetcher.crawl(getUrl(linkRecord), linkRecord, context);
} finally {
context.getTaskContext().measureTime("fetchResource", time);
}
}
/** invoke link extractor and measure time as "duration...extractLinks". */
private Collection<Record> extractLinksTimed(final Record linkRecord, final WebCrawlingContext context)
throws WebCrawlerException {
debugLogUrl("Extract links from ", linkRecord);
final long time = context.getTaskContext().getTimestamp();
try {
return _linkExtractor.extractLinks(linkRecord, context);
} finally {
context.getTaskContext().measureTime("extractLinks", time);
}
}
/** invoke link filter and measure time as "duration...filterLink". */
private Collection<Record> filterLinksTimed(final Collection<Record> extractedLinks, final Record sourceLink,
final WebCrawlingContext context) throws WebCrawlerException {
if (_log.isDebugEnabled()) {
_log.debug("Filter links " + extractedLinks + " extracted from " + getUrl(sourceLink));
}
final long time = context.getTaskContext().getTimestamp();
try {
// apply filter for max crawl depth (and set new value to extracted records)
final long crawlDepth = getCrawlDepth(sourceLink);
if (crawlDepth == 0) {
return Collections.emptyList();
}
final Iterator<Record> it = extractedLinks.iterator();
final long nextCrawlDepth = Math.max(-1, crawlDepth - 1);
while (it.hasNext()) {
final Record rec = it.next();
setCrawlDepth(rec, nextCrawlDepth);
}
// apply filter for URL patterns
final Collection<Record> filteredLinks =
_linkFilter.filterExtractedLinks(extractedLinks, getUrl(sourceLink), context);
if (_log.isDebugEnabled()) {
_log.debug("Remaining links: " + filteredLinks);
}
return filteredLinks;
} finally {
context.getTaskContext().measureTime("filterLink", time);
}
}
/**
* Check if an extracted link has not yet been visited by someone else. measure time as "duration...checkVisitedLinks"
*
* @return 'true' if url was already visited, 'false' otherwise.
*/
private boolean isNotVisitedTimed(final Record record, final WebCrawlingContext context)
throws VisitedLinksException {
final String url = getUrl(record);
debugLogUrl("Check if extracted link is visited", record);
final long time = context.getTaskContext().getTimestamp();
try {
return !_visitedLinks.isVisited(context.getDataSource(), url, context.getJobRunId());
} finally {
context.getTaskContext().measureTime("checkVisitedLinks", time);
}
}
/** invoke record producer and measure time as "duration...produceRecords". */
private Collection<Record> produceRecordsTimed(final Record crawledRecord, final WebCrawlingContext context)
throws WebCrawlerException {
debugLogUrl("Produce record for ", crawledRecord);
final long time = context.getTaskContext().getTimestamp();
try {
return _recordProducer.produceRecords(crawledRecord, context);
} finally {
context.getTaskContext().measureTime("produceRecords", time);
}
}
/** log URL of record to debug log. */
private void debugLogUrl(final String message, final Record link) {
if (_log.isDebugEnabled()) {
_log.debug(message + " " + getUrl(link));
}
}
/**
* log a non-recoverable error as a warning to the tasklog instead of throwing it, which would abort the task and
* cancel the crawl job run.
*/
private void logNonRecoverableError(final Record linkRecord, final Exception ex, final WebCrawlingContext context) {
context.getTaskLog().warn(
"Error crawling link " + getUrl(linkRecord) + " in source " + context.getDataSource() + ", skipping.", ex);
}
/** @return URL from record. */
private String getUrl(final Record record) {
return record.getMetadata().getStringValue(WebCrawlerConstants.ATTRIBUTE_URL);
}
/**
* normalize URL and set in record.
*/
private void setUrl(final Record record, final String url) throws MaybeRecoverableException {
try {
record.getMetadata().put(WebCrawlerConstants.ATTRIBUTE_URL, UriHelper.normalizeUrl(null, url));
} catch (final URISyntaxException ex) {
throw new MaybeRecoverableException(ex, false);
}
}
private void initCrawlDepth(final Record linkRecord, final WebCrawlingContext webCrawlingContext) {
final Any linkCrawlDepth = linkRecord.getMetadata().get(WebCrawlerConstants.ATTRIBUTE_CRAWL_DEPTH);
if (linkCrawlDepth == null || !linkCrawlDepth.isLong()) {
final long maxCrawlDepth = webCrawlingContext.getFilterConfiguration().getMaxCrawlDepth();
linkRecord.getMetadata().put(WebCrawlerConstants.ATTRIBUTE_CRAWL_DEPTH, maxCrawlDepth);
}
}
/** set crawl depth in record. */
private void setCrawlDepth(final Record record, final long crawlDepth) {
record.getMetadata().put(WebCrawlerConstants.ATTRIBUTE_CRAWL_DEPTH, crawlDepth);
}
/** @return crawl depth of given record. */
private long getCrawlDepth(final Record record) {
if (record.getMetadata().containsKey(WebCrawlerConstants.ATTRIBUTE_CRAWL_DEPTH)) {
return record.getMetadata().getLongValue(WebCrawlerConstants.ATTRIBUTE_CRAWL_DEPTH);
}
return -1;
}
/** mark record as compound. */
private void setIsCompound(final Record record) {
record.getMetadata().put(ImportingConstants.ATTRIBUTE_COMPOUNDFLAG, true);
}
/** check if we should sleep between two consecutive requests and sleep, if so. */
private void waitBetweenRequests(final Long sleepTime) {
if (sleepTime != null && sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (final InterruptedException e) {
; // ignore
}
}
}
/** DS service reference injection method. */
public void setVisitedLinks(final VisitedLinksService visitedLinks) {
_visitedLinks = visitedLinks;
}
/** DS service reference removal method. */
public void unsetVisitedLinks(final VisitedLinksService visitedLinks) {
if (_visitedLinks == visitedLinks) {
_visitedLinks = null;
}
}
/** DS service reference injection method. */
public void setFetcher(final Fetcher fetcher) {
_fetcher = fetcher;
}
/** DS service reference removal method. */
public void unsetFetcher(final Fetcher fetcher) {
if (_fetcher == fetcher) {
_fetcher = null;
}
}
/** DS service reference injection method. */
public void setLinkExtractor(final LinkExtractor linkExtractor) {
_linkExtractor = linkExtractor;
}
/** DS service reference removal method. */
public void unsetLinkExtractor(final LinkExtractor linkExtractor) {
if (_linkExtractor == linkExtractor) {
_linkExtractor = null;
}
}
/** DS service reference injection method. */
public void setLinkFilter(final LinkFilter linkFilter) {
_linkFilter = linkFilter;
}
/** DS service reference removal method. */
public void unsetLinkFilter(final LinkFilter linkFilter) {
if (_linkFilter == linkFilter) {
_linkFilter = null;
}
}
/** DS service reference injection method. */
public void setRecordProducer(final RecordProducer recordProducer) {
_recordProducer = recordProducer;
}
/** DS service reference removal method. */
public void unsetRecordProducer(final RecordProducer recordProducer) {
if (_recordProducer == recordProducer) {
_recordProducer = null;
}
}
/** DS service reference injection method. */
public void setCompoundExtractor(final CompoundExtractor compoundExtractor) {
_compoundExtractor = compoundExtractor;
}
/** DS service reference removal method. */
public void unsetCompoundExtractor(final CompoundExtractor compoundExtractor) {
if (_compoundExtractor == compoundExtractor) {
_compoundExtractor = null;
}
}
}