blob: f29e19969e81c0aa086f57d6e76bc8f0a073b147 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.importing.state.objectstore;
import java.util.Collection;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.importing.DeltaException;
import org.eclipse.smila.importing.VisitedLinksException;
import org.eclipse.smila.importing.VisitedLinksService;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.utils.config.ConfigUtils;
import org.osgi.service.component.ComponentContext;
/**
* ObjectStore based implementation of the {@link VisitedLinksService} for the jobmanager based importing framework.
*
* @author stuc07
*
*/
public class ObjectStoreVisitedLinksService implements VisitedLinksService {
/** bundle ID for configuration area access. */
public static final String BUNDLE_ID = "org.eclipse.smila.importing.state.objectstore";
/** objectstore store name. */
public static final String STORENAME = "visitedlinks";
/** my logger. */
private final Log _log = LogFactory.getLog(getClass());
/** reference to objectstore state service. */
private ObjectStoreStateService _stateService;
/** reference to objectstore service. */
private ObjectStoreService _objectStore;
/** service activation. */
protected void activate(final ComponentContext context) {
Properties props;
try {
props = ConfigUtils.getConfigProperties(BUNDLE_ID, "visitedlinksstore.properties");
} catch (final Exception ex) {
_log.info("No configuration " + BUNDLE_ID + "/" + " found, using default settings.");
props = new Properties();
}
final StateStoreConfiguration configuration = new StateStoreConfiguration(props);
_stateService = new ObjectStoreStateService(STORENAME, configuration, _objectStore);
}
/** service deactivation. */
protected void deactivate(final ComponentContext context) {
_stateService = null;
}
@Override
/**
* For the same jobRunId and inputBulkId always consider the URLs as not visited.
*/
public boolean checkAndMarkVisited(final String sourceId, final String url, final String jobRunId,
final String inputBulkId) throws VisitedLinksException {
try {
final AnyMap entry = _stateService.getEntry(sourceId, url, null);
if (entry != null) {
final String entryJobRunId = entry.getStringValue(ObjectStoreStateService.KEY_JOBRUNID);
if (jobRunId.equals(entryJobRunId)) {
final String entryInputBulkId = entry.getStringValue(ObjectStoreStateService.KEY_DESCRIPTOR);
if (inputBulkId.equals(entryInputBulkId)) {
// URL was visited in this job run by a task processing the same bulk, so the current caller probably
// processes a retry task and should visit this URL.
return false;
}
// URL was visited in this job run by a worker processing a different bulk, so it should not be
// visited by the current caller again.
return true;
} else {
// URL was visited in another than the current job run, so we have to update the entry.
entry.put(ObjectStoreStateService.KEY_JOBRUNID, jobRunId);
entry.put(ObjectStoreStateService.KEY_DESCRIPTOR, inputBulkId);
_stateService.writeEntry(entry);
return false;
}
}
// no entry found for URL, so the caller can visit it.
markAsVisited(sourceId, url, jobRunId, inputBulkId);
return false;
} catch (final StateException ex) {
throw new VisitedLinksException("Error checking state for url '" + url + "'", ex);
}
}
@Override
public boolean isVisited(final String sourceId, final String url, final String jobRunId)
throws VisitedLinksException {
try {
final AnyMap entry = _stateService.getEntry(sourceId, url, null);
if (entry != null) {
final String entryJobRunId = entry.getStringValue(ObjectStoreStateService.KEY_JOBRUNID);
if (jobRunId.equals(entryJobRunId)) {
return true;
}
}
return false;
} catch (final StateException ex) {
throw new VisitedLinksException("Error checking state for url '" + url + "'", ex);
}
}
@Override
public void markAsVisited(final String sourceId, final String url, final String jobRunId, final String inputBulkId)
throws VisitedLinksException {
try {
_stateService.mark(sourceId, url, null, jobRunId, inputBulkId);
} catch (final StateException ex) {
throw new VisitedLinksException("Error marking url '" + url + "' as crawled", ex);
}
}
@Override
public void clearSource(final String sourceId) throws VisitedLinksException {
try {
_stateService.clearSource(sourceId);
} catch (final StateException ex) {
throw new VisitedLinksException("Error removing all entries for source " + sourceId, ex);
}
}
@Override
public void clearAll() throws VisitedLinksException {
try {
_stateService.clearAll();
} catch (final StateException ex) {
throw new VisitedLinksException("Error clearing all entries of all sources", ex);
}
}
@Override
public Collection<String> getSourceIds() throws VisitedLinksException {
try {
return _stateService.getSourceIds();
} catch (final StateException ex) {
throw new VisitedLinksException("Error determining existing sources.", ex);
}
}
@Override
public long countEntries(final String sourceId, final boolean countExact) throws DeltaException {
try {
return _stateService.countEntries(sourceId, countExact);
} catch (final StateException ex) {
throw new DeltaException("Error counting entries for source " + sourceId, ex);
}
}
/** used by DS to set service reference. */
public void setObjectStore(final ObjectStoreService objectStore) {
_objectStore = objectStore;
}
/** used by DS to remove service reference. */
public void unsetObjectStore(final ObjectStoreService objectStore) {
if (_objectStore == objectStore) {
_objectStore = null;
}
}
}