blob: e1f5f51d93f595a43df501062339b06f6bceb828 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Ivan Churkin (brox IT Solutions GmbH) - initial creator Sebastian Voigt (brox IT Solutions GmbH)
**********************************************************************************************************************/
package org.eclipse.smila.connectivity.deltaindexing.impl;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.smila.connectivity.ConnectivityId;
import org.eclipse.smila.connectivity.deltaindexing.DeltaIndexingException;
/**
* The Class DataSourceConnection.
*/
public class DataSourceConnection implements Serializable {
/**
* serial version id.
*/
private static final long serialVersionUID = -2263628823851855201L;
/**
* The Constant EXCEPTION_DATA_SOURCE_LOCKED_BY_OTHER_SESSION.
*/
private static final String EXCEPTION_DATA_SOURCE_LOCKED_BY_OTHER_SESSION =
"Data source %s already locked by another session";
/**
* The Constant EXCEPTION_DATA_SOURCE_SHOULD_BE_LOCKED.
*/
private static final String EXCEPTION_DATA_SOURCE_SHOULD_BE_LOCKED =
"Data source %s should be locked before using!";
/**
* The _data source id.
*/
private final String _dataSourceId;
/**
* The session id.
*/
private String _sessionId;
/**
* The _lock.
*/
// transient
private ReentrantLock _lock = new ReentrantLock();
/**
* The _lock monitor. Just an object not used by anybody else.
*/
private final Object _lockMonitor = new Object[0];
/**
* The _index.
*/
private final Map<ConnectivityId, String> _index = new HashMap<ConnectivityId, String>();
/**
* The modified compounds.
*/
private final transient Set<ConnectivityId> _modified = new HashSet<ConnectivityId>();
/**
* The _new index.
*/
private transient Map<ConnectivityId, String> _updated;
/**
* The _deleted.
*/
private transient Set<ConnectivityId> _deleted;
/**
* A mapping of Ids to their subCompound Ids (if any).
*/
private final Map<ConnectivityId, Set<ConnectivityId>> _subCompounds =
new HashMap<ConnectivityId, Set<ConnectivityId>>();
/**
* Instantiates a new data source connection.
*
* @param dataSourceId
* the data source id
*/
public DataSourceConnection(final String dataSourceId) {
_dataSourceId = dataSourceId;
}
/**
* Lock.
*
* @param sessionId
* the id of the session
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void lock(final String sessionId) throws DeltaIndexingException {
// added synchronization for entering lock to avoid sleeping
synchronized (_lockMonitor) {
if (_lock.isLocked()) {
throw new DeltaIndexingException(
String.format(EXCEPTION_DATA_SOURCE_LOCKED_BY_OTHER_SESSION, _dataSourceId));
}
_modified.clear();
_lock.lock();
_sessionId = sessionId;
}
rollback();
}
/**
* Unlock.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void unlock() throws DeltaIndexingException {
// commit changes
checkLock();
commit();
rollback();
synchronized (_lockMonitor) {
_lock.unlock();
_sessionId = null;
}
}
/**
* Put.
*
* @param id
* the id
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void put(final ConnectivityId id) throws DeltaIndexingException {
checkLock();
// get the hash from the _index.
_updated.put(id, _index.get(id));
}
/**
* Put.
*
* @param id
* the id
* @param hash
* the hash
* @param isCompound
* boolean flag if the record identified by id is a compound record (true) or not (false)
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void put(final ConnectivityId id, final String hash, final boolean isCompound)
throws DeltaIndexingException {
checkLock();
_updated.put(id, hash);
if (isCompound) {
_modified.add(id);
}
}
/**
* Gets the hash.
*
* @param id
* the id
*
* @return the hash
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public String getHash(final ConnectivityId id) throws DeltaIndexingException {
checkLock();
return _index.get(id);
}
/**
* Obsolete id iterator.
*
* @return the iterator< id>
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public Iterator<ConnectivityId> obsoleteIdIterator() throws DeltaIndexingException {
checkLock();
return new IdIterator(_index.keySet(), _updated.keySet(), _modified);
}
/**
* Delete.
*
* @param id
* the id
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void delete(final ConnectivityId id) throws DeltaIndexingException {
checkLock();
_deleted.add(id);
}
/**
* Clear.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void rollback() throws DeltaIndexingException {
checkLock();
_updated = new HashMap<ConnectivityId, String>();
// it may be broken by force unlocking
checkLock();
_deleted = new HashSet<ConnectivityId>();
}
/**
* Clear.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void clear() throws DeltaIndexingException {
rollback();
_index.clear();
_modified.clear();
// clear mapped sets
final Iterator<Set<ConnectivityId>> it = _subCompounds.values().iterator();
while (it.hasNext()) {
final Set<ConnectivityId> set = it.next();
if (set != null) {
set.clear();
}
}
_subCompounds.clear();
}
/**
* Commit changes.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
private void commit() throws DeltaIndexingException {
for (final ConnectivityId id : _updated.keySet()) {
checkLock();
_index.put(id, _updated.get(id));
}
for (final ConnectivityId id : _deleted) {
checkLock();
_index.remove(id);
removeSubCompound(id);
}
}
/**
* Check lock.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
private void checkLock() throws DeltaIndexingException {
if (!_lock.isLocked()) {
throw new DeltaIndexingException(String.format(EXCEPTION_DATA_SOURCE_SHOULD_BE_LOCKED, _dataSourceId));
}
}
/**
* Force unlock and rollback.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void forceUnlockAndRollback() throws DeltaIndexingException {
final ReentrantLock lock = new ReentrantLock();
lock.lock();
_lock = lock;
rollback();
unlock();
}
/**
* Force unlock and clear.
*
* @throws DeltaIndexingException
* the delta indexing exception
*/
public void forceUnlockAndClear() throws DeltaIndexingException {
final ReentrantLock lock = new ReentrantLock();
lock.lock();
_lock = lock;
clear();
unlock();
}
/**
* Add a sub compound mapping.
*
* @param parentId
* the parent Id
* @param subCompoundId
* the sub compound id
*/
public void addSubCompound(final ConnectivityId parentId, final ConnectivityId subCompoundId) {
Set<ConnectivityId> set = _subCompounds.get(parentId);
if (set == null) {
set = new HashSet<ConnectivityId>();
}
set.add(subCompoundId);
_subCompounds.put(parentId, set);
}
/**
* Returns the Set of sub compounds or null if none exists.
*
* @param id
* the id to get the sub compounds for
* @return the Set of sub compounds or null if none exists
*/
public Set<ConnectivityId> getSubCompounds(final ConnectivityId id) {
return _subCompounds.get(id);
}
/**
* Removes the mapping for an id, cleaning up the mapped set, too.
*
* @param id
* the id to remove the mapping for
*/
public void removeSubCompound(final ConnectivityId id) {
final Set<ConnectivityId> set = _subCompounds.get(id);
if (set != null) {
set.clear();
}
_subCompounds.remove(id);
}
/**
* Gets the data source id.
*
* @return the data source id
*/
public String getDataSourceId() {
return _dataSourceId;
}
/**
* Gets the session id.
*
* @return the session id
*/
public String getSessionId() {
return _sessionId;
}
/**
* Returns the number of entries in the index.
*
* @return the number of entries in the index
*/
public long getEntryCount() {
final Set<ConnectivityId> set = new HashSet<ConnectivityId>();
if (_index != null) {
set.addAll(_index.keySet());
}
if (_updated != null) {
set.addAll(_updated.keySet());
}
if (_modified != null) {
set.addAll(_modified);
}
if (_deleted != null) {
set.removeAll(_deleted);
}
return set.size();
}
}