/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.zookeeper; | |
import java.util.Random; | |
import java.util.Set; | |
import java.util.TreeSet; | |
import java.util.concurrent.Callable; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.data.Stat; | |
/** | |
* Provides a map view to the zookeeper structure below an existing root node path. | |
* | |
* Additionally it has methods to "merge" values with existing ones (e.g. add() to sum ints) | |
* | |
* Methods are similar (but not equal) to ConcurrentHashMap. ZkConcurrentMap also uses no locks. | |
* | |
* @author aweber | |
*/ | |
public class ZkConcurrentMap { | |
/** znode data encoding for encoding/decoding string values. */ | |
private static final String ENCODING = "utf-8"; | |
/** how often should we repeat the operation if we are conflicting with changes of others. */ | |
private static final int NO_OF_TRIES_WHEN_CONFLICT = 100; | |
/** if this time is exceeded for an operation, a log message is written. */ | |
private static final long MIN_OPERATION_TIME_TO_LOG = 500; // ms | |
/** how long will we wait maximal to repeat the operation if we are conflicting with changes of others (in millis). */ | |
private static final int MAX_WAIT_TIME_BETWEEN_TRIES = 10; | |
/** we use a random wait time to repeat the operation if we are conflicting with changes of others. */ | |
private final Random _waitTimeBetweenTriesGenerator = new Random(System.nanoTime()); | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** Connection to ZooKeeper server, includes Zookeeper client. */ | |
private final ZkConnection _zk; | |
/** Root zookeeper node from where to start map. */ | |
private final String _root; | |
/** Callable extension which is able to return a zk operation string. */ | |
private interface ZkMapCallable extends Callable<Object> { | |
/** return the underlying zk operation as string. */ | |
String getZkOperation(); | |
/** return value if there are too much conflicts so that zk operation fails. */ | |
Object getExceededRetriesResult(); | |
} | |
/** marker to recognize retries. */ | |
private static final class RetryOperation { | |
} | |
/** | |
* @param service | |
* the zookeeper service | |
* @param rootNodePath | |
* the existing(!) zookeeper root node path from where to start the map view. | |
*/ | |
public ZkConcurrentMap(final ZooKeeperService service, final String rootNodePath) { | |
this(new ZkConnection(service), rootNodePath); | |
} | |
/** | |
* @param zk | |
* shared ZooKeeper connection | |
* @param rootNodePath | |
* the existing(!) zookeeper root node path from where to start the map view. | |
* @throws IllegalArgumentException | |
* if root node doesn't exist in zookeeper | |
*/ | |
public ZkConcurrentMap(final ZkConnection zk, final String rootNodePath) { | |
if (rootNodePath == null) { | |
throw new NullPointerException("parameter rootNodePath is <null>"); | |
} | |
_zk = zk; | |
_root = rootNodePath; | |
Stat stat = null; | |
try { | |
stat = _zk.exists(rootNodePath); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); | |
} | |
if (stat == null) { | |
throw new IllegalArgumentException("root zookeeper node '" + rootNodePath + "' doesn't exist!"); | |
} | |
} | |
/** | |
* set value for given key. If key exists, value is overwritten. | |
* | |
* @param key | |
* not allowed to be 'null' | |
* @param value | |
* not allowed to be 'null' | |
* @return 'true', if successful, otherwise 'false' | |
*/ | |
public boolean put(final String key, final String value) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("put(), key = " + key + ", value = " + value); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
final byte[] data = value.getBytes(ENCODING); | |
return (Boolean) retryZkMapOperation(new ZkMapCallable() { | |
@Override | |
public String getZkOperation() { | |
return "put(): key = " + key + ", value = " + value; | |
} | |
@Override | |
public Object call() throws Exception { | |
try { | |
// create new znode(=key) with given value as data | |
if (_zk.exists(nodePath) == null) { | |
_zk.createNode(nodePath, data); | |
return true; | |
} | |
} catch (final KeeperException.NodeExistsException e) { | |
; // znode already exists -> overwrite value | |
} | |
// reset value. | |
try { | |
_zk.setData(nodePath, data); | |
return true; | |
} catch (final KeeperException.NoNodeException e2) { | |
return new RetryOperation(); // someone removed the node in between, try again | |
} catch (final Exception e2) { | |
throw new RuntimeException(e2); | |
} | |
} | |
@Override | |
public Object getExceededRetriesResult() { | |
return Boolean.FALSE; | |
} | |
}); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* set value for given key if key doesn't exist yet. | |
* | |
* @param key | |
* not allowed to be 'null' | |
* @param value | |
* not allowed to be 'null' | |
* @return the value of the node after the put operation. Should be used to check for expected result. | |
*/ | |
public String putIfAbsent(final String key, final String value) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("putIfAbsent(), key = " + key + ", value = " + value); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
if (value == null) { | |
throw new NullPointerException("parameter value is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
final byte[] data = value.getBytes(ENCODING); | |
return (String) retryZkMapOperation(new ZkMapCallable() { | |
@Override | |
public String getZkOperation() { | |
return "put(): key = " + key + ", value = " + value; | |
} | |
@Override | |
public Object call() throws Exception { | |
try { | |
if (_zk.exists(nodePath) == null) { | |
// create new znode(=key) with given value as data | |
_zk.createNode(nodePath, data); | |
} | |
} catch (final KeeperException.NodeExistsException e) { | |
; // ok, node exists, report value later. | |
} | |
// reporting value of node if it exists. | |
try { | |
return new String(_zk.getData(nodePath), ENCODING); | |
} catch (final KeeperException.NoNodeException e2) { | |
; // someone removed the node in between -> try again | |
return new RetryOperation(); | |
} catch (final Exception e2) { | |
throw new RuntimeException(e2); | |
} | |
} | |
@Override | |
public Object getExceededRetriesResult() { | |
return "<UNKNOWN_VALUE>"; | |
} | |
}); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* Set new value for given key if key exists and has given old value. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
* @param oldValue | |
* the old value, not allowed to be 'null' | |
* @param newValue | |
* the new value, not allowed to be 'null' | |
* @return 'true', if successful, otherwise 'false' | |
*/ | |
public boolean replace(final String key, final String oldValue, final String newValue) { | |
return replaceAndGetVersion(key, oldValue, newValue) != null; | |
} | |
/** | |
* Set new value for given key if key exists and has given old value. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
* @param oldValue | |
* the old value, not allowed to be 'null' | |
* @param newValue | |
* the new value, not allowed to be 'null' | |
* @return the revision number for the key node, 'null' if not successful. | |
*/ | |
public Integer replaceAndGetVersion(final String key, final String oldValue, final String newValue) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("replace(), key = " + key + ", oldValue = " + oldValue + ", newValue = " + newValue); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
return (Integer) retryZkMapOperation(new ZkMapCallable() { | |
@Override | |
public String getZkOperation() { | |
return "replace(), key = " + key + ", oldValue = " + oldValue + ", newValue = " + newValue; | |
} | |
@Override | |
public Object call() throws Exception { | |
// check znode existence and get current version | |
final Stat stat = _zk.exists(nodePath); | |
if (stat == null) { | |
return null; | |
} | |
final int version = stat.getVersion(); | |
// check matching of expected old value | |
final String realOldValue = new String(_zk.getData(nodePath), ENCODING); | |
if (!oldValue.equals(realOldValue)) { | |
return null; | |
} | |
// set new value | |
try { | |
return _zk.setData(nodePath, newValue.getBytes(ENCODING), version).getVersion(); | |
} catch (final KeeperException.NoNodeException e) { | |
// someone removed the key in between. | |
return null; | |
} catch (final KeeperException.BadVersionException e) { | |
return new RetryOperation(); // someone changed the value in between -> try again with new value | |
} | |
} | |
@Override | |
public Object getExceededRetriesResult() { | |
return null; | |
} | |
}); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* @param key | |
* the key, not allowed to be 'null' | |
* @return value of the given key as String, or 'null' if key doesn't exist. | |
*/ | |
public String getString(final String key) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("getString(), key = " + key); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
// return value(=data) of given key(=znode) | |
return new String(_zk.getData(nodePath), ENCODING); | |
} catch (final KeeperException.NoNodeException e) { | |
return null; // znode doesn't exist | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* @param key | |
* the key, not allowed to be 'null' | |
* @return value of the given key as Integer, or 'null' if mapping for key doesn't exist. | |
*/ | |
public Integer getInt(final String key) { | |
final String s = getString(key); | |
if (s == null) { | |
return null; | |
} | |
return Integer.valueOf(s); | |
} | |
/** | |
* @param key | |
* the key, not allowed to be 'null' | |
* @return value of the given key as Long, or 'null' if mapping for key doesn't exist. | |
*/ | |
public Long getLong(final String key) { | |
final String s = getString(key); | |
if (s == null) { | |
return null; | |
} | |
return Long.valueOf(s); | |
} | |
/** | |
* revision of the key node. Useful to detect updates. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
* @return key node version. | |
*/ | |
public Integer getVersion(final String key) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("getRevision(), key = " + key); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
// return value(=data) of given key(=znode) | |
final Stat stat = _zk.exists(nodePath); | |
if (stat == null) { | |
return null; | |
} | |
return stat.getVersion(); | |
} catch (final KeeperException.NoNodeException e) { | |
return null; // znode doesn't exist | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* Remove given key / value mapping. If key doesn't exist, ignore call. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
*/ | |
public void remove(final String key) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("remove(), key = " + key); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
_zk.deleteNode(nodePath); | |
} catch (final KeeperException.NoNodeException e) { | |
; // that's ok | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* add value to old value of given key. If key doesn't exist, create it. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
* @param value | |
* the value to add | |
* @return 'newValue', if successful. If operation failed because of too many zookeeper conflicts, return 'null'. | |
*/ | |
public Integer add(final String key, final int value) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("add(), key = " + key + ", value = " + value); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
return (Integer) retryZkMapOperation(new ZkMapCallable() { | |
@Override | |
public String getZkOperation() { | |
return "add(), key = " + key + ", value = " + value; | |
} | |
@Override | |
public Object call() throws Exception { | |
// check znode existence and get current version | |
Stat stat = _zk.exists(nodePath); | |
if (stat == null) { | |
// znode doesn't exist -> create it | |
try { | |
_zk.createNode(nodePath, String.valueOf(value).getBytes(ENCODING)); | |
return value; | |
} catch (final KeeperException.NodeExistsException e) { | |
stat = _zk.exists(nodePath); // someone created this znode in between, that's ok | |
} | |
} | |
final int version = stat.getVersion(); | |
// add given value to old data | |
final String oldString = new String(_zk.getData(nodePath), ENCODING); | |
final int oldInt = Integer.valueOf(oldString); | |
final String newValue = String.valueOf(value + oldInt); | |
try { | |
_zk.setData(nodePath, newValue.getBytes(ENCODING), version); | |
return Integer.valueOf(newValue); | |
} catch (final KeeperException.NoNodeException e) { | |
return new RetryOperation(); // someone removed the key in between, try again... | |
} catch (final KeeperException.BadVersionException e) { | |
return new RetryOperation(); // someone changed the value in between, try again... | |
} | |
} | |
@Override | |
public Object getExceededRetriesResult() { | |
return null; | |
} | |
}); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* @return a set view of the keys contained in this map | |
*/ | |
public Set<String> keySet() { | |
if (_log.isTraceEnabled()) { | |
_log.trace("keySet()"); | |
} | |
try { | |
return new TreeSet<String>(_zk.getChildrenSorted(_root)); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* @return 'true' if this map contains a mapping for the specified key. | |
*/ | |
public boolean containsKey(final String key) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("containsKey()"); | |
} | |
try { | |
return _zk.getChildrenSorted(_root).contains(key); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* Removes all mappings from this map. | |
*/ | |
public void clear() { | |
if (_log.isTraceEnabled()) { | |
_log.trace("clear()"); | |
} | |
try { | |
final Set<String> keys = keySet(); | |
for (final String key : keys) { | |
try { | |
final String keyPath = _root + "/" + key; | |
_zk.deleteNode(keyPath); | |
} catch (final KeeperException.NoNodeException e) { | |
; // someone removed the node in between, that's ok | |
} | |
} | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* inc value of given key, but only if value is not negative and value < max. If value >= max or value < 0 ignore call | |
* and return -1. If key doesn't exist and max > 0, create entry with value '1'. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
* @param max | |
* the maximum value | |
* @return incremented value if old value was not negative and old value < max, otherwise '-1'. If operation failed | |
* because of too many zookeeper conflicts, return 'null'. | |
*/ | |
public Integer incIfLessThan(final String key, final long max) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("incIfLessThan(), key = " + key + ", max = " + max); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
if (max <= 0) { | |
return -1; // we can't inc a positive value if max <= 0 | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
return (Integer) retryZkMapOperation(new ZkMapCallable() { | |
@Override | |
public String getZkOperation() { | |
return "incIfLessThan(), key = " + key + ", max = " + max; | |
} | |
@Override | |
public Object call() throws Exception { | |
// check znode existence and get current version | |
Stat stat = _zk.exists(nodePath); | |
if (stat == null) { | |
// znode doesn't exist -> create it with start value '1' | |
try { | |
_zk.createNode(nodePath, "1".getBytes(ENCODING)); | |
return 1; | |
} catch (final KeeperException.NodeExistsException e) { | |
stat = _zk.exists(nodePath); // someone created this znode in between, that's ok | |
} | |
} | |
final int version = stat.getVersion(); | |
// inc old value | |
final String oldString = new String(_zk.getData(nodePath), ENCODING); | |
final long oldInt = Long.valueOf(oldString); | |
if (oldInt >= 0 && oldInt + 1 <= max) { | |
final String newValue = String.valueOf(oldInt + 1); | |
try { | |
_zk.setData(nodePath, newValue.getBytes(ENCODING), version); | |
return Integer.valueOf(newValue); | |
} catch (final KeeperException.NoNodeException e) { | |
return new RetryOperation(); // someone removed the key in between, try again... | |
} catch (final KeeperException.BadVersionException e) { | |
return new RetryOperation(); // someone changed the value in between, try again... | |
} | |
} else { | |
return -1; // exceeded max or old value was negative | |
} | |
} | |
@Override | |
public Object getExceededRetriesResult() { | |
return null; | |
} | |
}); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* dec value of given key, but only if value is not negative and value > min. If value <= min or value < 0 ignore call | |
* and return -1. If key doesn't exist, create entry with value '0'. | |
* | |
* @param key | |
* the key, not allowed to be 'null' | |
* @param min | |
* the minimum value | |
* @return decremented value if old value was not negative and old value > min, otherwise '-1'. If operation failed | |
* because of too many zookeeper conflicts, return 'null'. | |
*/ | |
public Integer decIfGreaterThan(final String key, final long min) { | |
if (_log.isTraceEnabled()) { | |
_log.trace("decIfGreaterThan(), key = " + key + ", min = " + min); | |
} | |
if (key == null) { | |
throw new NullPointerException("parameter key is <null>"); | |
} | |
if (min < 0) { | |
return -1; // we can't dec a positive value if min < 0 | |
} | |
final String nodePath = _root + '/' + key; | |
try { | |
return (Integer) retryZkMapOperation(new ZkMapCallable() { | |
@Override | |
public String getZkOperation() { | |
return "decIfGreaterThan(), key = " + key + ", min = " + min; | |
} | |
@Override | |
public Object call() throws Exception { | |
// check znode existence and get current version | |
Stat stat = _zk.exists(nodePath); | |
if (stat == null) { | |
// znode doesn't exist -> create it with start value '0' | |
try { | |
_zk.createNode(nodePath, "0".getBytes(ENCODING)); | |
return 0; | |
} catch (final KeeperException.NodeExistsException e) { | |
stat = _zk.exists(nodePath); // someone created this znode in between, that's ok | |
} | |
} | |
final int version = stat.getVersion(); | |
// dec old value | |
final String oldString = new String(_zk.getData(nodePath), ENCODING); | |
final int oldVal = Integer.valueOf(oldString); | |
if (oldVal > 0 && oldVal > min) { | |
final String newValue = String.valueOf(oldVal - 1); | |
try { | |
_zk.setData(nodePath, newValue.getBytes(ENCODING), version); | |
return Integer.valueOf(oldVal - 1); | |
} catch (final KeeperException.NoNodeException e) { | |
return new RetryOperation(); // someone removed the key in between, try again... | |
} catch (final KeeperException.BadVersionException e) { | |
return new RetryOperation(); // someone changed the value in between, try again... | |
} | |
} else { | |
return -1; // would decrease below min or old value already was negative | |
} | |
} | |
@Override | |
public Object getExceededRetriesResult() { | |
return null; | |
} | |
}); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); // Unexpected error. | |
} | |
} | |
/** | |
* Generic method for executing zookeeper client commands. Repeated when conflicting. | |
* | |
* @param callable | |
* command to execute | |
* @return result of Callable | |
*/ | |
private Object retryZkMapOperation(final ZkMapCallable callable) throws Exception { | |
final long start = System.currentTimeMillis(); | |
int tries = 0; | |
while (tries < NO_OF_TRIES_WHEN_CONFLICT) { | |
tries++; | |
final Object result = callable.call(); | |
if (result instanceof RetryOperation) { | |
Thread.sleep(_waitTimeBetweenTriesGenerator.nextInt(MAX_WAIT_TIME_BETWEEN_TRIES)); | |
} else { | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("ZkConnection operation time: " + t + " ms, tries: " + tries + ", operation: " | |
+ callable.getZkOperation()); | |
} | |
} | |
return result; | |
} | |
} | |
if (_log.isWarnEnabled()) { | |
_log.warn("ZkConcurrentMap: Too much conflicts while doing operation: " + callable.getZkOperation()); | |
} | |
return callable.getExceededRetriesResult(); | |
} | |
} |