| /********************************************************************************************************************* |
| * Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the |
| * accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this |
| * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.importing.state.objectstore; |
| |
| import java.util.Collection; |
| import java.util.Properties; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.importing.DeltaException; |
| import org.eclipse.smila.importing.DeltaService; |
| import org.eclipse.smila.importing.State; |
| import org.eclipse.smila.objectstore.ObjectStoreService; |
| import org.eclipse.smila.utils.config.ConfigUtils; |
| import org.osgi.service.component.ComponentContext; |
| |
| /** |
| * ObjectStore based implementation of the {@link DeltaService} for the jobmanager based importing framework. |
| * |
| * @author scum36 |
| * |
| */ |
| public class ObjectStoreDeltaService implements DeltaService { |
| /** bundle ID for configuration area access. */ |
| public static final String BUNDLE_ID = "org.eclipse.smila.importing.state.objectstore"; |
| |
| /** objectstore store name. */ |
| public static final String STORENAME = "deltaservice"; |
| |
| /** my logger. */ |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| /** reference to objectstore state service. */ |
| private ObjectStoreStateService _stateService; |
| |
| /** reference to objectstore service. */ |
| private ObjectStoreService _objectStore; |
| |
| /** service activation. */ |
| protected void activate(final ComponentContext context) { |
| Properties props; |
| try { |
| props = ConfigUtils.getConfigProperties(BUNDLE_ID, "deltastore.properties"); |
| } catch (final Exception ex) { |
| _log.info("No configuration " + BUNDLE_ID + "/" + " found, using default settings."); |
| props = new Properties(); |
| } |
| final StateStoreConfiguration configuration = new StateStoreConfiguration(props); |
| _stateService = new ObjectStoreStateService(STORENAME, configuration, _objectStore); |
| } |
| |
| /** service deactivation. */ |
| protected void deactivate(final ComponentContext context) { |
| _stateService = null; |
| } |
| |
| @Override |
| public State checkState(final String sourceId, final String recordId, final String jobRunId, final String hashCode) |
| throws DeltaException { |
| return checkState(sourceId, recordId, null, jobRunId, hashCode); |
| } |
| |
| @Override |
| public State checkState(final String sourceId, final String recordId, final String compoundRecordId, |
| final String jobRunId, final String hashCode) throws DeltaException { |
| try { |
| final AnyMap entry = _stateService.getEntry(sourceId, recordId, compoundRecordId); |
| if (entry == null) { |
| return State.NEW; |
| } |
| final String entryHash = entry.getStringValue(ObjectStoreStateService.KEY_DESCRIPTOR); |
| if (hashCode.equals(entryHash)) { |
| final String entryJobRunId = entry.getStringValue(ObjectStoreStateService.KEY_JOBRUNID); |
| if (!jobRunId.equals(entryJobRunId)) { |
| entry.put(ObjectStoreStateService.KEY_JOBRUNID, jobRunId); |
| _stateService.writeEntry(entry); |
| } |
| return State.UPTODATE; |
| } |
| return State.CHANGED; |
| } catch (final StateException ex) { |
| throw new DeltaException("Error checking state for recordId '" + recordId + "'", ex); |
| } |
| } |
| |
| @Override |
| public void markCompoundElementsVisited(final String sourceId, final String compoundRecordId, |
| final String jobRunId) throws DeltaException { |
| try { |
| _stateService.markCompoundElementsVisited(sourceId, compoundRecordId, jobRunId); |
| } catch (final StateException ex) { |
| throw new DeltaException("Error marking entries for compound '" + compoundRecordId + "' as visited.", ex); |
| } |
| } |
| |
| @Override |
| public void markAsUpdated(final String sourceId, final String recordId, final String jobRunId, |
| final String hashCode) throws DeltaException { |
| markAsUpdated(sourceId, recordId, null, jobRunId, hashCode); |
| } |
| |
| @Override |
| public void markAsUpdated(final String sourceId, final String recordId, final String compoundRecordId, |
| final String jobRunId, final String hashCode) throws DeltaException { |
| try { |
| _stateService.mark(sourceId, recordId, compoundRecordId, jobRunId, hashCode); |
| } catch (final StateException ex) { |
| throw new DeltaException("Error marking recordId '" + recordId + "' as updated", ex); |
| } |
| } |
| |
| @Override |
| public void clearSource(final String sourceId) throws DeltaException { |
| try { |
| _stateService.clearSource(sourceId); |
| } catch (final StateException ex) { |
| throw new DeltaException("Error removing all entries for source " + sourceId, ex); |
| } |
| } |
| |
| @Override |
| public void clearAll() throws DeltaException { |
| try { |
| _stateService.clearAll(); |
| } catch (final StateException ex) { |
| throw new DeltaException("Error clearing all entries of all sources", ex); |
| } |
| } |
| |
| @Override |
| public Collection<String> getSourceIds() throws DeltaException { |
| try { |
| return _stateService.getSourceIds(); |
| } catch (final StateException ex) { |
| throw new DeltaException("Error determining existing sources.", ex); |
| } |
| } |
| |
| @Override |
| public long countEntries(final String sourceId, final boolean countExact) throws DeltaException { |
| try { |
| return _stateService.countEntries(sourceId, countExact); |
| } catch (final StateException ex) { |
| throw new DeltaException("Error counting entries for source " + sourceId, ex); |
| } |
| } |
| |
| @Override |
| public Collection<String> getShardPrefixes(final String sourceId) throws DeltaException { |
| try { |
| return _stateService.getShardPrefixes(sourceId); |
| } catch (final StateException ex) { |
| throw new DeltaException(ex.getMessage(), ex); |
| } |
| } |
| |
| @Override |
| public Collection<EntryId> getUnvisitedEntries(final String sourceAndShardPrefix, final String jobRunId) |
| throws DeltaException { |
| try { |
| return _stateService.getUnvisitedObjectIds(sourceAndShardPrefix, jobRunId); |
| } catch (final StateException ex) { |
| throw new DeltaException(ex.getMessage(), ex); |
| } |
| } |
| |
| @Override |
| public void deleteEntry(final String sourceId, final EntryId entryId) throws DeltaException { |
| try { |
| _stateService.deleteEntry(sourceId, entryId); |
| } catch (final StateException ex) { |
| throw new DeltaException(ex.getMessage(), ex); |
| } |
| } |
| |
| /** used by DS to set service reference. */ |
| public void setObjectStore(final ObjectStoreService objectStore) { |
| _objectStore = objectStore; |
| } |
| |
| /** used by DS to remove service reference. */ |
| public void unsetObjectStore(final ObjectStoreService objectStore) { |
| if (_objectStore == objectStore) { |
| _objectStore = null; |
| } |
| } |
| |
| } |