blob: 0bf95a654936f227f3ba25c47df56bd051583a14 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.importing.state.objectstore;
import java.util.Collection;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.importing.DeltaException;
import org.eclipse.smila.importing.DeltaService;
import org.eclipse.smila.importing.State;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.utils.config.ConfigUtils;
import org.osgi.service.component.ComponentContext;
/**
* ObjectStore based implementation of the {@link DeltaService} for the jobmanager based importing framework.
*
* @author scum36
*
*/
public class ObjectStoreDeltaService implements DeltaService {
/** bundle ID for configuration area access. */
public static final String BUNDLE_ID = "org.eclipse.smila.importing.state.objectstore";
/** objectstore store name. */
public static final String STORENAME = "deltaservice";
/** my logger. */
private final Log _log = LogFactory.getLog(getClass());
/** reference to objectstore state service. */
private ObjectStoreStateService _stateService;
/** reference to objectstore service. */
private ObjectStoreService _objectStore;
/** service activation. */
protected void activate(final ComponentContext context) {
Properties props;
try {
props = ConfigUtils.getConfigProperties(BUNDLE_ID, "deltastore.properties");
} catch (final Exception ex) {
_log.info("No configuration " + BUNDLE_ID + "/" + " found, using default settings.");
props = new Properties();
}
final StateStoreConfiguration configuration = new StateStoreConfiguration(props);
_stateService = new ObjectStoreStateService(STORENAME, configuration, _objectStore);
}
/** service deactivation. */
protected void deactivate(final ComponentContext context) {
_stateService = null;
}
@Override
public State checkState(final String sourceId, final String recordId, final String jobRunId, final String hashCode)
throws DeltaException {
return checkState(sourceId, recordId, null, jobRunId, hashCode);
}
@Override
public State checkState(final String sourceId, final String recordId, final String compoundRecordId,
final String jobRunId, final String hashCode) throws DeltaException {
try {
final AnyMap entry = _stateService.getEntry(sourceId, recordId, compoundRecordId);
if (entry == null) {
return State.NEW;
}
final String entryHash = entry.getStringValue(ObjectStoreStateService.KEY_DESCRIPTOR);
if (hashCode.equals(entryHash)) {
final String entryJobRunId = entry.getStringValue(ObjectStoreStateService.KEY_JOBRUNID);
if (!jobRunId.equals(entryJobRunId)) {
entry.put(ObjectStoreStateService.KEY_JOBRUNID, jobRunId);
_stateService.writeEntry(entry);
}
return State.UPTODATE;
}
return State.CHANGED;
} catch (final StateException ex) {
throw new DeltaException("Error checking state for recordId '" + recordId + "'", ex);
}
}
@Override
public void markCompoundElementsVisited(final String sourceId, final String compoundRecordId,
final String jobRunId) throws DeltaException {
try {
_stateService.markCompoundElementsVisited(sourceId, compoundRecordId, jobRunId);
} catch (final StateException ex) {
throw new DeltaException("Error marking entries for compound '" + compoundRecordId + "' as visited.", ex);
}
}
@Override
public void markAsUpdated(final String sourceId, final String recordId, final String jobRunId,
final String hashCode) throws DeltaException {
markAsUpdated(sourceId, recordId, null, jobRunId, hashCode);
}
@Override
public void markAsUpdated(final String sourceId, final String recordId, final String compoundRecordId,
final String jobRunId, final String hashCode) throws DeltaException {
try {
_stateService.mark(sourceId, recordId, compoundRecordId, jobRunId, hashCode);
} catch (final StateException ex) {
throw new DeltaException("Error marking recordId '" + recordId + "' as updated", ex);
}
}
@Override
public void clearSource(final String sourceId) throws DeltaException {
try {
_stateService.clearSource(sourceId);
} catch (final StateException ex) {
throw new DeltaException("Error removing all entries for source " + sourceId, ex);
}
}
@Override
public void clearAll() throws DeltaException {
try {
_stateService.clearAll();
} catch (final StateException ex) {
throw new DeltaException("Error clearing all entries of all sources", ex);
}
}
@Override
public Collection<String> getSourceIds() throws DeltaException {
try {
return _stateService.getSourceIds();
} catch (final StateException ex) {
throw new DeltaException("Error determining existing sources.", ex);
}
}
@Override
public long countEntries(final String sourceId, final boolean countExact) throws DeltaException {
try {
return _stateService.countEntries(sourceId, countExact);
} catch (final StateException ex) {
throw new DeltaException("Error counting entries for source " + sourceId, ex);
}
}
@Override
public Collection<String> getShardPrefixes(final String sourceId) throws DeltaException {
try {
return _stateService.getShardPrefixes(sourceId);
} catch (final StateException ex) {
throw new DeltaException(ex.getMessage(), ex);
}
}
@Override
public Collection<EntryId> getUnvisitedEntries(final String sourceAndShardPrefix, final String jobRunId)
throws DeltaException {
try {
return _stateService.getUnvisitedObjectIds(sourceAndShardPrefix, jobRunId);
} catch (final StateException ex) {
throw new DeltaException(ex.getMessage(), ex);
}
}
@Override
public void deleteEntry(final String sourceId, final EntryId entryId) throws DeltaException {
try {
_stateService.deleteEntry(sourceId, entryId);
} catch (final StateException ex) {
throw new DeltaException(ex.getMessage(), ex);
}
}
/** used by DS to set service reference. */
public void setObjectStore(final ObjectStoreService objectStore) {
_objectStore = objectStore;
}
/** used by DS to remove service reference. */
public void unsetObjectStore(final ObjectStoreService objectStore) {
if (_objectStore == objectStore) {
_objectStore = null;
}
}
}