| /********************************************************************************************************************* |
| * Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the |
| * accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this |
| * distribution, and is available at http://www.eclipse.org/legal/epl-v10.html |
| **********************************************************************************************************************/ |
| package org.eclipse.smila.importing.state.objectstore; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.eclipse.smila.datamodel.AnyMap; |
| import org.eclipse.smila.datamodel.DataFactory; |
| import org.eclipse.smila.datamodel.Record; |
| import org.eclipse.smila.datamodel.ipc.BinaryObjectStreamIterator; |
| import org.eclipse.smila.datamodel.ipc.IpcAnyReader; |
| import org.eclipse.smila.datamodel.ipc.IpcAnyWriter; |
| import org.eclipse.smila.importing.DeltaService; |
| import org.eclipse.smila.objectstore.NoSuchObjectException; |
| import org.eclipse.smila.objectstore.NoSuchStoreException; |
| import org.eclipse.smila.objectstore.ObjectStoreException; |
| import org.eclipse.smila.objectstore.ObjectStoreService; |
| import org.eclipse.smila.objectstore.StoreObject; |
| import org.eclipse.smila.objectstore.util.ObjectStoreRetryUtil; |
| import org.eclipse.smila.utils.digest.DigestHelper; |
| |
| /** |
| * ObjectStore based implementation of a state service used in the jobmanager based importing framework. |
| * |
| * @author scum36 |
| * |
| */ |
| public class ObjectStoreStateService { |
| |
| /** key for the entry. */ |
| protected static final String KEY = "key"; |
| |
| /** key for the objectId entry in the data object. */ |
| protected static final String KEY_OBJECTID = "objectId"; |
| |
| /** key for the compoundId entry in the data object. */ |
| protected static final String KEY_COMPOUNDID = "compoundId"; |
| |
| /** key for the sourceId entry in the data object. */ |
| protected static final String KEY_SOURCEID = "sourceId"; |
| |
| /** key for the descriptor entry in the data object. */ |
| protected static final String KEY_DESCRIPTOR = "descriptor"; |
| |
| /** key for the job run ID entry in the data object. */ |
| protected static final String KEY_JOBRUNID = "jobRunId"; |
| |
| /** root directory in ObjectStore for delta entries. */ |
| protected static final String ROOT_ENTRIES = "entries/"; |
| |
| /** limit for number of entries to count exact. */ |
| private static final int LIMIT_COUNTENTRIES_EXACT = 10000; |
| |
| /** my logger. */ |
| private final Log _log = LogFactory.getLog(getClass()); |
| |
| /** BON parser for entry values. */ |
| private final IpcAnyReader _parser = new IpcAnyReader(); |
| |
| /** BON writer for entry values. */ |
| private final IpcAnyWriter _writer = new IpcAnyWriter(); |
| |
| /** store configuration. */ |
| private final StateStoreConfiguration _configuration; |
| |
| /** reference to objectstore service. */ |
| private final ObjectStoreService _objectStore; |
| |
| /** store configuration. */ |
| private final String _storeName; |
| |
| /** creates a new instance of the service. */ |
| public ObjectStoreStateService(final String storeName, final StateStoreConfiguration configuration, |
| final ObjectStoreService objectStore) { |
| _storeName = storeName; |
| _configuration = configuration; |
| _objectStore = objectStore; |
| } |
| |
| /** retrieves the information for an entry. */ |
| public AnyMap getEntry(final String sourceId, final String objectId, final String compoundId) |
| throws StateException { |
| final String key = createEntryKey(sourceId, objectId, compoundId); |
| return readEntry(key); |
| } |
| |
| /** marks an entry. */ |
| public void mark(final String sourceId, final String objectId, final String compoundId, final String jobRunId, |
| final String descriptor) throws StateException { |
| final String key = createEntryKey(sourceId, objectId, compoundId); |
| final AnyMap entry = createEntryObject(sourceId, objectId, compoundId, jobRunId, descriptor); |
| entry.put(KEY, key); |
| writeEntry(entry); |
| } |
| |
| /** clears a source. */ |
| public void clearSource(final String sourceId) throws StateException { |
| if (_log.isInfoEnabled()) { |
| _log.info("Clearing source: " + sourceId); |
| } |
| try { |
| _objectStore.removeObjects(_storeName, createEntryKeyBase(sourceId)); |
| } catch (final NoSuchStoreException ex) { |
| ; // ok. there is nothing to clear |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error removing all entries for source " + sourceId, ex); |
| } |
| } |
| |
| /** clears all stores. */ |
| public void clearAll() throws StateException { |
| if (_log.isInfoEnabled()) { |
| _log.info("Clearing all sources"); |
| } |
| try { |
| _objectStore.clearStore(_storeName); |
| } catch (final NoSuchStoreException ex) { |
| ; // ok. there is nothing to clear |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error clearing all entries of all sources", ex); |
| } |
| } |
| |
| /** @return all source ids. Throws StateNotFoundException if store doesn't exist yet. */ |
| public Collection<String> getSourceIds() throws StateException { |
| try { |
| final Collection<String> sourceIds = getPrefixes(ROOT_ENTRIES); |
| return sourceIds; |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error determining existing sources.", ex); |
| } |
| } |
| |
| /** count the entries of the given source. Throws StateNotFoundException if store or source do not exist yet. */ |
| public long countEntries(final String sourceId, final boolean countExact) throws StateException { |
| if (!hasSource(sourceId)) { |
| throw new StateNotFoundException("Source '" + sourceId + "' doesn't exist in store '" + _storeName + "'"); |
| } |
| try { |
| long entryCount = 0; |
| int shardCount = 0; |
| final String sourceBase = createEntryKeyBase(sourceId); |
| final Collection<String> shardIds = _objectStore.getPrefixes(_storeName, sourceBase); |
| if (shardIds != null) { |
| for (final String shardId : shardIds) { |
| String shardPrefix = shardId; |
| if (!shardId.endsWith("/")) { |
| shardPrefix += "/"; |
| } |
| entryCount += _objectStore.countStoreObjects(_storeName, shardPrefix); |
| shardCount++; |
| if (!countExact && entryCount > LIMIT_COUNTENTRIES_EXACT) { // estimate. |
| return entryCount * shardIds.size() / shardCount; |
| } |
| } |
| } |
| return entryCount; |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error determining existing sources.", ex); |
| } |
| } |
| |
| /** get object-id prefixes of all shards to use as input for {@link #getUnvisitedObjectIds(String, String)}. */ |
| public Collection<String> getShardPrefixes(final String sourceId) throws StateException { |
| final String sourceBase = createEntryKeyBase(sourceId); |
| try { |
| return getPrefixes(sourceBase); |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error getting shard prefixes for source '" + sourceId + "'", ex); |
| } |
| }; |
| |
| /** get all object IDs from a shard that have not been visited in the given job run. */ |
| public Collection<DeltaService.EntryId> getUnvisitedObjectIds(final String sourceAndShardPrefix, |
| final String jobRunId) throws StateException { |
| final List<DeltaService.EntryId> objectIds = new ArrayList<DeltaService.EntryId>(); |
| try { |
| final Collection<StoreObject> objects = |
| _objectStore.getStoreObjectInfos(_storeName, ROOT_ENTRIES + sourceAndShardPrefix); |
| for (final StoreObject storeObject : objects) { |
| final InputStream entryStream = _objectStore.readObject(_storeName, storeObject.getId()); |
| try { |
| final BinaryObjectStreamIterator entries = new BinaryObjectStreamIterator(entryStream); |
| while (entries.hasNext()) { |
| final Record entry = entries.next(); |
| final String visitedInJobRun = entry.getMetadata().getStringValue(KEY_JOBRUNID); |
| if (!jobRunId.equals(visitedInJobRun)) { |
| final DeltaService.EntryId entryId = |
| new DeltaService.EntryId(entry.getMetadata().getStringValue(KEY_OBJECTID), entry.getMetadata() |
| .getStringValue(KEY_COMPOUNDID)); |
| objectIds.add(entryId); |
| } |
| } |
| } finally { |
| IOUtils.closeQuietly(entryStream); |
| } |
| } |
| } catch (final NoSuchStoreException ex) { |
| ; // OK: no store, no objects |
| } catch (final Exception ex) { |
| throw new StateException("Error getting unvisited objects for source/shard '" + sourceAndShardPrefix + "'", |
| ex); |
| } |
| return objectIds; |
| }; |
| |
| /** update the job run id of all entries that belong to the given compound. */ |
| public void markCompoundElementsVisited(final String sourceId, final String compoundRecordId, |
| final String jobRunId) throws StateException { |
| final String sourceAndShardPrefix = createShardKey(sourceId, compoundRecordId); |
| try { |
| final Collection<StoreObject> objects = _objectStore.getStoreObjectInfos(_storeName, sourceAndShardPrefix); |
| for (final StoreObject storeObject : objects) { |
| final InputStream entryStream = _objectStore.readObject(_storeName, storeObject.getId()); |
| try { |
| final BinaryObjectStreamIterator entries = new BinaryObjectStreamIterator(entryStream); |
| while (entries.hasNext()) { |
| final AnyMap entry = entries.next().getMetadata(); |
| if (compoundRecordId.equals(entry.getStringValue(KEY_COMPOUNDID)) |
| && !jobRunId.equals(entry.getStringValue(KEY_JOBRUNID))) { |
| entry.put(KEY_JOBRUNID, jobRunId); |
| writeEntry(entry); |
| } |
| } |
| } finally { |
| IOUtils.closeQuietly(entryStream); |
| } |
| } |
| } catch (final NoSuchStoreException ex) { |
| ; // OK: no store, no objects |
| } catch (final Exception ex) { |
| throw new StateException("Error getting unvisited objects for source/shard '" + sourceAndShardPrefix + "'", |
| ex); |
| } |
| |
| } |
| |
| /** remove an entry. */ |
| public void deleteEntry(final String sourceId, final DeltaService.EntryId entryId) throws StateException { |
| try { |
| _objectStore.removeObject(_storeName, |
| createEntryKey(sourceId, entryId.getRecordId(), entryId.getCompoundId())); |
| } catch (final NoSuchStoreException ex) { |
| ; // OK: no store, nothing to delete. |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error deleting entry '" + entryId.getRecordId() + "'", ex); |
| } |
| } |
| |
| /** @return whether store contains entries for given source. */ |
| private boolean hasSource(final String sourceId) throws StateException { |
| return getSourceIds().contains(sourceId); |
| } |
| |
| /** read the current entry from the object store. */ |
| private AnyMap readEntry(final String key) throws StateException { |
| final byte[] data; |
| try { |
| data = readEntryIfExists(key); |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error reading object " + key + " from objectstore", ex); |
| } |
| if (data.length > 1) { |
| try { |
| return (AnyMap) _parser.readBinaryObject(data); |
| } catch (final IOException ex) { |
| throw new StateException("Error parsing object " + key, ex); |
| } |
| } |
| return null; |
| } |
| |
| /** try to read data from object store. If {@link NoSuchObjectException} is thrown, return null. */ |
| private byte[] readEntryIfExists(final String key) throws ObjectStoreException { |
| try { |
| return ObjectStoreRetryUtil.retryGetObject(_objectStore, _storeName, key); |
| } catch (final NoSuchStoreException | NoSuchObjectException ex) { |
| return new byte[0]; |
| } |
| } |
| |
| /** gets all prefixes for the given base prefix from the store. */ |
| private Collection<String> getPrefixes(final String base) throws ObjectStoreException { |
| final Collection<String> sourceIds = new ArrayList<String>(); |
| try { |
| final Collection<String> sourceEntries = _objectStore.getPrefixes(_storeName, base); |
| for (final String sourceEntry : sourceEntries) { |
| String sourceId = sourceEntry.substring(ROOT_ENTRIES.length()); |
| if (sourceId.endsWith("/")) { |
| sourceId = sourceId.substring(0, sourceId.length() - 1); |
| } |
| sourceIds.add(sourceId); |
| } |
| } catch (final NoSuchStoreException ex) { |
| ; // ignore |
| } |
| return sourceIds; |
| } |
| |
| /** update the entry in the objectstore. */ |
| protected void writeEntry(final AnyMap value) throws StateException { |
| final String key = value.getStringValue(KEY); |
| final byte[] data; |
| try { |
| data = _writer.writeBinaryObject(value); |
| } catch (final IOException ex) { |
| throw new StateException("Error converting object " + value + " for " + key + " to BON", ex); |
| } |
| writeEntryEnsureStore(key, data); |
| } |
| |
| /** write entry, create store if it doesn't exist already. */ |
| private void writeEntryEnsureStore(final String key, final byte[] data) throws StateException { |
| try { |
| try { |
| ObjectStoreRetryUtil.retryPutObject(_objectStore, _storeName, key, data); |
| } catch (final NoSuchStoreException ex) { |
| _log.info("Creating store '" + _storeName + "'"); |
| ObjectStoreRetryUtil.retryEnsureStore(_objectStore, _storeName); |
| ObjectStoreRetryUtil.retryPutObject(_objectStore, _storeName, key, data); |
| } |
| } catch (final ObjectStoreException ex) { |
| throw new StateException("Error writing object for " + key + " to objectstore", ex); |
| } |
| } |
| |
| /** get sourceAndShard prefix for the given object ID. */ |
| private String createShardKey(final String sourceId, final String objectId) { |
| final String idDigest = DigestHelper.calculateDigest(objectId); |
| final String entryKey = _configuration.getShardKey(idDigest); |
| return createEntryKeyBase(sourceId) + entryKey; |
| |
| } |
| |
| /** create the object key for the delta entry in the objectstore. */ |
| private String createEntryKey(final String sourceId, final String objectId, final String compoundId) { |
| final String idDigest = DigestHelper.calculateDigest(objectId); |
| final String entryKey; |
| if (compoundId == null) { |
| entryKey = _configuration.getEntryKey(idDigest); |
| } else { |
| final String compoundDigest = DigestHelper.calculateDigest(compoundId); |
| entryKey = _configuration.getEntryKey(idDigest, compoundDigest); |
| } |
| return createEntryKeyBase(sourceId) + entryKey; |
| } |
| |
| /** create base path for delta entries of the given source. */ |
| private String createEntryKeyBase(final String sourceId) { |
| return ROOT_ENTRIES + sourceId + '/'; |
| } |
| |
| /** create the data object to store in object store. */ |
| private AnyMap createEntryObject(final String sourceId, final String objectId, final String compoundId, |
| final String jobRunId, final String descriptor) { |
| final AnyMap entry = DataFactory.DEFAULT.createAnyMap(); |
| entry.put(KEY_SOURCEID, sourceId); |
| entry.put(KEY_OBJECTID, objectId); |
| if (compoundId != null) { |
| entry.put(KEY_COMPOUNDID, compoundId); |
| } |
| entry.put(KEY_DESCRIPTOR, descriptor); |
| entry.put(KEY_JOBRUNID, jobRunId); |
| return entry; |
| } |
| |
| } |