blob: 9e5962960baef90569affeb7bbe8da9370417f26 [file] [log] [blame]
/*********************************************************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
**********************************************************************************************************************/
package org.eclipse.smila.importing.state.objectstore;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.smila.datamodel.AnyMap;
import org.eclipse.smila.datamodel.DataFactory;
import org.eclipse.smila.datamodel.Record;
import org.eclipse.smila.datamodel.ipc.BinaryObjectStreamIterator;
import org.eclipse.smila.datamodel.ipc.IpcAnyReader;
import org.eclipse.smila.datamodel.ipc.IpcAnyWriter;
import org.eclipse.smila.importing.DeltaService;
import org.eclipse.smila.objectstore.NoSuchObjectException;
import org.eclipse.smila.objectstore.NoSuchStoreException;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.objectstore.util.ObjectStoreRetryUtil;
import org.eclipse.smila.utils.digest.DigestHelper;
/**
* ObjectStore based implementation of a state service used in the jobmanager based importing framework.
*
* @author scum36
*
*/
public class ObjectStoreStateService {
/** key for the entry. */
protected static final String KEY = "key";
/** key for the objectId entry in the data object. */
protected static final String KEY_OBJECTID = "objectId";
/** key for the compoundId entry in the data object. */
protected static final String KEY_COMPOUNDID = "compoundId";
/** key for the sourceId entry in the data object. */
protected static final String KEY_SOURCEID = "sourceId";
/** key for the descriptor entry in the data object. */
protected static final String KEY_DESCRIPTOR = "descriptor";
/** key for the job run ID entry in the data object. */
protected static final String KEY_JOBRUNID = "jobRunId";
/** root directory in ObjectStore for delta entries. */
protected static final String ROOT_ENTRIES = "entries/";
/** limit for number of entries to count exact. */
private static final int LIMIT_COUNTENTRIES_EXACT = 10000;
/** my logger. */
private final Log _log = LogFactory.getLog(getClass());
/** BON parser for entry values. */
private final IpcAnyReader _parser = new IpcAnyReader();
/** BON writer for entry values. */
private final IpcAnyWriter _writer = new IpcAnyWriter();
/** store configuration. */
private final StateStoreConfiguration _configuration;
/** reference to objectstore service. */
private final ObjectStoreService _objectStore;
/** store configuration. */
private final String _storeName;
/** creates a new instance of the service. */
public ObjectStoreStateService(final String storeName, final StateStoreConfiguration configuration,
final ObjectStoreService objectStore) {
_storeName = storeName;
_configuration = configuration;
_objectStore = objectStore;
}
/** retrieves the information for an entry. */
public AnyMap getEntry(final String sourceId, final String objectId, final String compoundId)
throws StateException {
final String key = createEntryKey(sourceId, objectId, compoundId);
return readEntry(key);
}
/** marks an entry. */
public void mark(final String sourceId, final String objectId, final String compoundId, final String jobRunId,
final String descriptor) throws StateException {
final String key = createEntryKey(sourceId, objectId, compoundId);
final AnyMap entry = createEntryObject(sourceId, objectId, compoundId, jobRunId, descriptor);
entry.put(KEY, key);
writeEntry(entry);
}
/** clears a source. */
public void clearSource(final String sourceId) throws StateException {
if (_log.isInfoEnabled()) {
_log.info("Clearing source: " + sourceId);
}
try {
_objectStore.removeObjects(_storeName, createEntryKeyBase(sourceId));
} catch (final NoSuchStoreException ex) {
; // ok. there is nothing to clear
} catch (final ObjectStoreException ex) {
throw new StateException("Error removing all entries for source " + sourceId, ex);
}
}
/** clears all stores. */
public void clearAll() throws StateException {
if (_log.isInfoEnabled()) {
_log.info("Clearing all sources");
}
try {
_objectStore.clearStore(_storeName);
} catch (final NoSuchStoreException ex) {
; // ok. there is nothing to clear
} catch (final ObjectStoreException ex) {
throw new StateException("Error clearing all entries of all sources", ex);
}
}
/** @return all source ids. Throws StateNotFoundException if store doesn't exist yet. */
public Collection<String> getSourceIds() throws StateException {
try {
final Collection<String> sourceIds = getPrefixes(ROOT_ENTRIES);
return sourceIds;
} catch (final ObjectStoreException ex) {
throw new StateException("Error determining existing sources.", ex);
}
}
/** count the entries of the given source. Throws StateNotFoundException if store or source do not exist yet. */
public long countEntries(final String sourceId, final boolean countExact) throws StateException {
if (!hasSource(sourceId)) {
throw new StateNotFoundException("Source '" + sourceId + "' doesn't exist in store '" + _storeName + "'");
}
try {
long entryCount = 0;
int shardCount = 0;
final String sourceBase = createEntryKeyBase(sourceId);
final Collection<String> shardIds = _objectStore.getPrefixes(_storeName, sourceBase);
if (shardIds != null) {
for (final String shardId : shardIds) {
String shardPrefix = shardId;
if (!shardId.endsWith("/")) {
shardPrefix += "/";
}
entryCount += _objectStore.countStoreObjects(_storeName, shardPrefix);
shardCount++;
if (!countExact && entryCount > LIMIT_COUNTENTRIES_EXACT) { // estimate.
return entryCount * shardIds.size() / shardCount;
}
}
}
return entryCount;
} catch (final ObjectStoreException ex) {
throw new StateException("Error determining existing sources.", ex);
}
}
/** get object-id prefixes of all shards to use as input for {@link #getUnvisitedObjectIds(String, String)}. */
public Collection<String> getShardPrefixes(final String sourceId) throws StateException {
final String sourceBase = createEntryKeyBase(sourceId);
try {
return getPrefixes(sourceBase);
} catch (final ObjectStoreException ex) {
throw new StateException("Error getting shard prefixes for source '" + sourceId + "'", ex);
}
};
/** get all object IDs from a shard that have not been visited in the given job run. */
public Collection<DeltaService.EntryId> getUnvisitedObjectIds(final String sourceAndShardPrefix,
final String jobRunId) throws StateException {
final List<DeltaService.EntryId> objectIds = new ArrayList<DeltaService.EntryId>();
try {
final Collection<StoreObject> objects =
_objectStore.getStoreObjectInfos(_storeName, ROOT_ENTRIES + sourceAndShardPrefix);
for (final StoreObject storeObject : objects) {
final InputStream entryStream = _objectStore.readObject(_storeName, storeObject.getId());
try {
final BinaryObjectStreamIterator entries = new BinaryObjectStreamIterator(entryStream);
while (entries.hasNext()) {
final Record entry = entries.next();
final String visitedInJobRun = entry.getMetadata().getStringValue(KEY_JOBRUNID);
if (!jobRunId.equals(visitedInJobRun)) {
final DeltaService.EntryId entryId =
new DeltaService.EntryId(entry.getMetadata().getStringValue(KEY_OBJECTID), entry.getMetadata()
.getStringValue(KEY_COMPOUNDID));
objectIds.add(entryId);
}
}
} finally {
IOUtils.closeQuietly(entryStream);
}
}
} catch (final NoSuchStoreException ex) {
; // OK: no store, no objects
} catch (final Exception ex) {
throw new StateException("Error getting unvisited objects for source/shard '" + sourceAndShardPrefix + "'",
ex);
}
return objectIds;
};
/** update the job run id of all entries that belong to the given compound. */
public void markCompoundElementsVisited(final String sourceId, final String compoundRecordId,
final String jobRunId) throws StateException {
final String sourceAndShardPrefix = createShardKey(sourceId, compoundRecordId);
try {
final Collection<StoreObject> objects = _objectStore.getStoreObjectInfos(_storeName, sourceAndShardPrefix);
for (final StoreObject storeObject : objects) {
final InputStream entryStream = _objectStore.readObject(_storeName, storeObject.getId());
try {
final BinaryObjectStreamIterator entries = new BinaryObjectStreamIterator(entryStream);
while (entries.hasNext()) {
final AnyMap entry = entries.next().getMetadata();
if (compoundRecordId.equals(entry.getStringValue(KEY_COMPOUNDID))
&& !jobRunId.equals(entry.getStringValue(KEY_JOBRUNID))) {
entry.put(KEY_JOBRUNID, jobRunId);
writeEntry(entry);
}
}
} finally {
IOUtils.closeQuietly(entryStream);
}
}
} catch (final NoSuchStoreException ex) {
; // OK: no store, no objects
} catch (final Exception ex) {
throw new StateException("Error getting unvisited objects for source/shard '" + sourceAndShardPrefix + "'",
ex);
}
}
/** remove an entry. */
public void deleteEntry(final String sourceId, final DeltaService.EntryId entryId) throws StateException {
try {
_objectStore.removeObject(_storeName,
createEntryKey(sourceId, entryId.getRecordId(), entryId.getCompoundId()));
} catch (final NoSuchStoreException ex) {
; // OK: no store, nothing to delete.
} catch (final ObjectStoreException ex) {
throw new StateException("Error deleting entry '" + entryId.getRecordId() + "'", ex);
}
}
/** @return whether store contains entries for given source. */
private boolean hasSource(final String sourceId) throws StateException {
return getSourceIds().contains(sourceId);
}
/** read the current entry from the object store. */
private AnyMap readEntry(final String key) throws StateException {
final byte[] data;
try {
data = readEntryIfExists(key);
} catch (final ObjectStoreException ex) {
throw new StateException("Error reading object " + key + " from objectstore", ex);
}
if (data.length > 1) {
try {
return (AnyMap) _parser.readBinaryObject(data);
} catch (final IOException ex) {
throw new StateException("Error parsing object " + key, ex);
}
}
return null;
}
/** try to read data from object store. If {@link NoSuchObjectException} is thrown, return null. */
private byte[] readEntryIfExists(final String key) throws ObjectStoreException {
try {
return ObjectStoreRetryUtil.retryGetObject(_objectStore, _storeName, key);
} catch (final NoSuchStoreException | NoSuchObjectException ex) {
return new byte[0];
}
}
/** gets all prefixes for the given base prefix from the store. */
private Collection<String> getPrefixes(final String base) throws ObjectStoreException {
final Collection<String> sourceIds = new ArrayList<String>();
try {
final Collection<String> sourceEntries = _objectStore.getPrefixes(_storeName, base);
for (final String sourceEntry : sourceEntries) {
String sourceId = sourceEntry.substring(ROOT_ENTRIES.length());
if (sourceId.endsWith("/")) {
sourceId = sourceId.substring(0, sourceId.length() - 1);
}
sourceIds.add(sourceId);
}
} catch (final NoSuchStoreException ex) {
; // ignore
}
return sourceIds;
}
/** update the entry in the objectstore. */
protected void writeEntry(final AnyMap value) throws StateException {
final String key = value.getStringValue(KEY);
final byte[] data;
try {
data = _writer.writeBinaryObject(value);
} catch (final IOException ex) {
throw new StateException("Error converting object " + value + " for " + key + " to BON", ex);
}
writeEntryEnsureStore(key, data);
}
/** write entry, create store if it doesn't exist already. */
private void writeEntryEnsureStore(final String key, final byte[] data) throws StateException {
try {
try {
ObjectStoreRetryUtil.retryPutObject(_objectStore, _storeName, key, data);
} catch (final NoSuchStoreException ex) {
_log.info("Creating store '" + _storeName + "'");
ObjectStoreRetryUtil.retryEnsureStore(_objectStore, _storeName);
ObjectStoreRetryUtil.retryPutObject(_objectStore, _storeName, key, data);
}
} catch (final ObjectStoreException ex) {
throw new StateException("Error writing object for " + key + " to objectstore", ex);
}
}
/** get sourceAndShard prefix for the given object ID. */
private String createShardKey(final String sourceId, final String objectId) {
final String idDigest = DigestHelper.calculateDigest(objectId);
final String entryKey = _configuration.getShardKey(idDigest);
return createEntryKeyBase(sourceId) + entryKey;
}
/** create the object key for the delta entry in the objectstore. */
private String createEntryKey(final String sourceId, final String objectId, final String compoundId) {
final String idDigest = DigestHelper.calculateDigest(objectId);
final String entryKey;
if (compoundId == null) {
entryKey = _configuration.getEntryKey(idDigest);
} else {
final String compoundDigest = DigestHelper.calculateDigest(compoundId);
entryKey = _configuration.getEntryKey(idDigest, compoundDigest);
}
return createEntryKeyBase(sourceId) + entryKey;
}
/** create base path for delta entries of the given source. */
private String createEntryKeyBase(final String sourceId) {
return ROOT_ENTRIES + sourceId + '/';
}
/** create the data object to store in object store. */
private AnyMap createEntryObject(final String sourceId, final String objectId, final String compoundId,
final String jobRunId, final String descriptor) {
final AnyMap entry = DataFactory.DEFAULT.createAnyMap();
entry.put(KEY_SOURCEID, sourceId);
entry.put(KEY_OBJECTID, objectId);
if (compoundId != null) {
entry.put(KEY_COMPOUNDID, compoundId);
}
entry.put(KEY_DESCRIPTOR, descriptor);
entry.put(KEY_JOBRUNID, jobRunId);
return entry;
}
}