blob: 61046a17c33a4096ff8589a4979fe6c99524a887 [file] [log] [blame]
/***********************************************************************************************************************
* Copyright (c) 2008,2011 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial API and implementation
**********************************************************************************************************************/
package org.eclipse.smila.importing.crawler.web;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.importing.crawler.web.WebCrawlerConstants.ErrorHandling;
import org.eclipse.smila.importing.util.PropertyNameMapper;
import org.eclipse.smila.taskworker.TaskContext;
import org.eclipse.smila.taskworker.Worker;
import org.eclipse.smila.taskworker.input.RecordInput;
import org.eclipse.smila.taskworker.output.RecordOutput;
/** Fetches binary content from URL and stores the content as record attachment. */
public class WebFetcherWorker implements Worker {
/** worker's name. */
private static final String WORKER_NAME = "webFetcher";
/** input slot name. */
private static final String INPUT_SLOT = "linksToFetch";
/** output slot name. */
private static final String OUTPUT_SLOT = "fetchedLinks";
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** reference to Fetcher service. */
private Fetcher _fetcher;
@Override
public String getName() {
return WORKER_NAME;
}
@Override
public void perform(final TaskContext taskContext) throws Exception {
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT);
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT);
final Long sleepTime =
taskContext.getTaskParameters().getLongValue(WebCrawlerConstants.TASK_PARAM_WAIT_BETWEEN_REQUESTS);
final WebCrawlingContext context = new WebCrawlingContext(taskContext);
context.setCurrentInputBulkId(recordInput.getObjectName());
final PropertyNameMapper mapper = context.getMapper();
Record record;
do {
record = recordInput.getRecord();
boolean writeRecord = true;
if (record != null) {
if (!record.hasAttachment(mapper.get(WebCrawlerConstants.ATTACHMENT_CONTENT).get(0))) {
// we have no content attachment yet, so we fetch the content here...
waitBetweenRequests(sleepTime);
writeRecord = fetchContent(record, context);
}
if (writeRecord) {
// if any mapping had been forgotten...
mapper.mapNames(record, WebCrawlerConstants.PROPERTY_NAMES);
recordOutput.writeRecord(record);
if (_log.isDebugEnabled()) {
_log.debug("added record " + record.getId());
}
}
}
} while (record != null && !taskContext.isCanceled());
}
/**
* fetch content and measure time as "duration...fetchContent".
*
* @return true if the record should be written to output, else it should be dropped.
*/
private boolean fetchContent(final Record record, final WebCrawlingContext context) throws WebCrawlerException {
final long time = context.getTaskContext().getTimestamp();
try {
if (_log.isDebugEnabled()) {
_log.debug("fetching content for record " + record.getId());
}
// this record has already been mapped, so we have to look up the URL using mapping info.
_fetcher.fetch(
record.getMetadata().getStringValue(context.getMapper().get(WebCrawlerConstants.ATTRIBUTE_URL).get(0)),
record, context);
return true;
} catch (final WebCrawlerException e) {
final StringBuilder message = new StringBuilder("Failed to fetch link for record ").append(record.getId());
final boolean dropRecord = e.isRecoverable() && context.getErrorHandling() == ErrorHandling.DROP;
final boolean ignoreFetchError = e.isRecoverable() && context.getErrorHandling() == ErrorHandling.IGNORE;
if (dropRecord && !ignoreFetchError) {
message.append(", record will be dropped.");
}
context.getTaskLog().warn(message.toString(), e);
if (e.isRecoverable() && context.getErrorHandling() == ErrorHandling.RETRY) {
throw e; // repeat the whole task for recoverable errors, otherwise continue processing
}
return !dropRecord || ignoreFetchError;
} finally {
context.getTaskContext().measureTime("fetchContent", time);
}
}
/** check if we should sleep between two consecutive requests and sleep, if so. */
private void waitBetweenRequests(final Long sleepTime) {
if (sleepTime != null && sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (final InterruptedException e) {
; // ignore
}
}
}
/** DS service reference injection method. */
public void setFetcher(final Fetcher fetcher) {
_fetcher = fetcher;
}
/** DS service reference removal method. */
public void unsetFetcher(final Fetcher fetcher) {
if (_fetcher == fetcher) {
_fetcher = null;
}
}
}