/*********************************************************************************************************************** | |
* Copyright (c) 2008,2011 empolis GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial API and implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.importing.crawler.web; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.eclipse.smila.datamodel.Record; | |
import org.eclipse.smila.importing.crawler.web.WebCrawlerConstants.ErrorHandling; | |
import org.eclipse.smila.importing.util.PropertyNameMapper; | |
import org.eclipse.smila.taskworker.TaskContext; | |
import org.eclipse.smila.taskworker.Worker; | |
import org.eclipse.smila.taskworker.input.RecordInput; | |
import org.eclipse.smila.taskworker.output.RecordOutput; | |
/** Fetches binary content from URL and stores the content as record attachment. */ | |
public class WebFetcherWorker implements Worker { | |
/** worker's name. */ | |
private static final String WORKER_NAME = "webFetcher"; | |
/** input slot name. */ | |
private static final String INPUT_SLOT = "linksToFetch"; | |
/** output slot name. */ | |
private static final String OUTPUT_SLOT = "fetchedLinks"; | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** reference to Fetcher service. */ | |
private Fetcher _fetcher; | |
@Override | |
public String getName() { | |
return WORKER_NAME; | |
} | |
@Override | |
public void perform(final TaskContext taskContext) throws Exception { | |
final RecordInput recordInput = taskContext.getInputs().getAsRecordInput(INPUT_SLOT); | |
final RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(OUTPUT_SLOT); | |
final Long sleepTime = | |
taskContext.getTaskParameters().getLongValue(WebCrawlerConstants.TASK_PARAM_WAIT_BETWEEN_REQUESTS); | |
final WebCrawlingContext context = new WebCrawlingContext(taskContext); | |
context.setCurrentInputBulkId(recordInput.getObjectName()); | |
final PropertyNameMapper mapper = context.getMapper(); | |
Record record; | |
do { | |
record = recordInput.getRecord(); | |
boolean writeRecord = true; | |
if (record != null) { | |
if (!record.hasAttachment(mapper.get(WebCrawlerConstants.ATTACHMENT_CONTENT).get(0))) { | |
// we have no content attachment yet, so we fetch the content here... | |
waitBetweenRequests(sleepTime); | |
writeRecord = fetchContent(record, context); | |
} | |
if (writeRecord) { | |
// if any mapping had been forgotten... | |
mapper.mapNames(record, WebCrawlerConstants.PROPERTY_NAMES); | |
recordOutput.writeRecord(record); | |
if (_log.isDebugEnabled()) { | |
_log.debug("added record " + record.getId()); | |
} | |
} | |
} | |
} while (record != null && !taskContext.isCanceled()); | |
} | |
/** | |
* fetch content and measure time as "duration...fetchContent". | |
* | |
* @return true if the record should be written to output, else it should be dropped. | |
*/ | |
private boolean fetchContent(final Record record, final WebCrawlingContext context) throws WebCrawlerException { | |
final long time = context.getTaskContext().getTimestamp(); | |
try { | |
if (_log.isDebugEnabled()) { | |
_log.debug("fetching content for record " + record.getId()); | |
} | |
// this record has already been mapped, so we have to look up the URL using mapping info. | |
_fetcher.fetch( | |
record.getMetadata().getStringValue(context.getMapper().get(WebCrawlerConstants.ATTRIBUTE_URL).get(0)), | |
record, context); | |
return true; | |
} catch (final WebCrawlerException e) { | |
final StringBuilder message = new StringBuilder("Failed to fetch link for record ").append(record.getId()); | |
final boolean dropRecord = e.isRecoverable() && context.getErrorHandling() == ErrorHandling.DROP; | |
final boolean ignoreFetchError = e.isRecoverable() && context.getErrorHandling() == ErrorHandling.IGNORE; | |
if (dropRecord && !ignoreFetchError) { | |
message.append(", record will be dropped."); | |
} | |
context.getTaskLog().warn(message.toString(), e); | |
if (e.isRecoverable() && context.getErrorHandling() == ErrorHandling.RETRY) { | |
throw e; // repeat the whole task for recoverable errors, otherwise continue processing | |
} | |
return !dropRecord || ignoreFetchError; | |
} finally { | |
context.getTaskContext().measureTime("fetchContent", time); | |
} | |
} | |
/** check if we should sleep between two consecutive requests and sleep, if so. */ | |
private void waitBetweenRequests(final Long sleepTime) { | |
if (sleepTime != null && sleepTime > 0) { | |
try { | |
Thread.sleep(sleepTime); | |
} catch (final InterruptedException e) { | |
; // ignore | |
} | |
} | |
} | |
/** DS service reference injection method. */ | |
public void setFetcher(final Fetcher fetcher) { | |
_fetcher = fetcher; | |
} | |
/** DS service reference removal method. */ | |
public void unsetFetcher(final Fetcher fetcher) { | |
if (_fetcher == fetcher) { | |
_fetcher = null; | |
} | |
} | |
} |