blob: 9a4edf1d50e40473e8e1600e4703421d3d51e09d [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.zookeeper;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
/**
* Provides a map view to the zookeeper structure below an existing root node path.
*
* Additionally it has methods to "merge" values with existing ones (e.g. add() to sum ints)
*
* Methods are similar (but not equal) to ConcurrentHashMap. ZkConcurrentMap also uses no locks.
*
* @author aweber
*/
public class ZkConcurrentMap {
/** znode data encoding for encoding/decoding string values. */
private static final String ENCODING = "utf-8";
/** how often should we repeat the operation if we are conflicting with changes of others. */
private static final int NO_OF_TRIES_WHEN_CONFLICT = 100;
/** if this time is exceeded for an operation, a log message is written. */
private static final long MIN_OPERATION_TIME_TO_LOG = 500; // ms
/** how long will we wait maximal to repeat the operation if we are conflicting with changes of others (in millis). */
private static final int MAX_WAIT_TIME_BETWEEN_TRIES = 10;
/** we use a random wait time to repeat the operation if we are conflicting with changes of others. */
private final Random _waitTimeBetweenTriesGenerator = new Random(System.nanoTime());
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** Connection to ZooKeeper server, includes Zookeeper client. */
private final ZkConnection _zk;
/** Root zookeeper node from where to start map. */
private final String _root;
/** Callable extension which is able to return a zk operation string. */
private interface ZkMapCallable extends Callable<Object> {
/** return the underlying zk operation as string. */
String getZkOperation();
/** return value if there are too much conflicts so that zk operation fails. */
Object getExceededRetriesResult();
}
/** marker to recognize retries. */
private static final class RetryOperation {
}
/**
* @param service
* the zookeeper service
* @param rootNodePath
* the existing(!) zookeeper root node path from where to start the map view.
*/
public ZkConcurrentMap(final ZooKeeperService service, final String rootNodePath) {
this(new ZkConnection(service), rootNodePath);
}
/**
* @param zk
* shared ZooKeeper connection
* @param rootNodePath
* the existing(!) zookeeper root node path from where to start the map view.
* @throws IllegalArgumentException
* if root node doesn't exist in zookeeper
*/
public ZkConcurrentMap(final ZkConnection zk, final String rootNodePath) {
if (rootNodePath == null) {
throw new NullPointerException("parameter rootNodePath is <null>");
}
_zk = zk;
_root = rootNodePath;
Stat stat = null;
try {
stat = _zk.exists(rootNodePath);
} catch (final Exception e) {
throw new RuntimeException(e);
}
if (stat == null) {
throw new IllegalArgumentException("root zookeeper node '" + rootNodePath + "' doesn't exist!");
}
}
/**
* set value for given key. If key exists, value is overwritten.
*
* @param key
* not allowed to be 'null'
* @param value
* not allowed to be 'null'
* @return 'true', if successful, otherwise 'false'
*/
public boolean put(final String key, final String value) {
if (_log.isTraceEnabled()) {
_log.trace("put(), key = " + key + ", value = " + value);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
final String nodePath = _root + '/' + key;
try {
final byte[] data = value.getBytes(ENCODING);
return (Boolean) retryZkMapOperation(new ZkMapCallable() {
@Override
public String getZkOperation() {
return "put(): key = " + key + ", value = " + value;
}
@Override
public Object call() throws Exception {
try {
// create new znode(=key) with given value as data
if (_zk.exists(nodePath) == null) {
_zk.createNode(nodePath, data);
return true;
}
} catch (final KeeperException.NodeExistsException e) {
; // znode already exists -> overwrite value
}
// reset value.
try {
_zk.setData(nodePath, data);
return true;
} catch (final KeeperException.NoNodeException e2) {
return new RetryOperation(); // someone removed the node in between, try again
} catch (final Exception e2) {
throw new RuntimeException(e2);
}
}
@Override
public Object getExceededRetriesResult() {
return Boolean.FALSE;
}
});
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
/**
* set value for given key if key doesn't exist yet.
*
* @param key
* not allowed to be 'null'
* @param value
* not allowed to be 'null'
* @return the value of the node after the put operation. Should be used to check for expected result.
*/
public String putIfAbsent(final String key, final String value) {
if (_log.isTraceEnabled()) {
_log.trace("putIfAbsent(), key = " + key + ", value = " + value);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
if (value == null) {
throw new NullPointerException("parameter value is <null>");
}
final String nodePath = _root + '/' + key;
try {
final byte[] data = value.getBytes(ENCODING);
return (String) retryZkMapOperation(new ZkMapCallable() {
@Override
public String getZkOperation() {
return "put(): key = " + key + ", value = " + value;
}
@Override
public Object call() throws Exception {
try {
if (_zk.exists(nodePath) == null) {
// create new znode(=key) with given value as data
_zk.createNode(nodePath, data);
}
} catch (final KeeperException.NodeExistsException e) {
; // ok, node exists, report value later.
}
// reporting value of node if it exists.
try {
return new String(_zk.getData(nodePath), ENCODING);
} catch (final KeeperException.NoNodeException e2) {
; // someone removed the node in between -> try again
return new RetryOperation();
} catch (final Exception e2) {
throw new RuntimeException(e2);
}
}
@Override
public Object getExceededRetriesResult() {
return "<UNKNOWN_VALUE>";
}
});
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
/**
* Set new value for given key if key exists and has given old value.
*
* @param key
* the key, not allowed to be 'null'
* @param oldValue
* the old value, not allowed to be 'null'
* @param newValue
* the new value, not allowed to be 'null'
* @return 'true', if successful, otherwise 'false'
*/
public boolean replace(final String key, final String oldValue, final String newValue) {
return replaceAndGetVersion(key, oldValue, newValue) != null;
}
/**
* Set new value for given key if key exists and has given old value.
*
* @param key
* the key, not allowed to be 'null'
* @param oldValue
* the old value, not allowed to be 'null'
* @param newValue
* the new value, not allowed to be 'null'
* @return the revision number for the key node, 'null' if not successful.
*/
public Integer replaceAndGetVersion(final String key, final String oldValue, final String newValue) {
if (_log.isTraceEnabled()) {
_log.trace("replace(), key = " + key + ", oldValue = " + oldValue + ", newValue = " + newValue);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
final String nodePath = _root + '/' + key;
try {
return (Integer) retryZkMapOperation(new ZkMapCallable() {
@Override
public String getZkOperation() {
return "replace(), key = " + key + ", oldValue = " + oldValue + ", newValue = " + newValue;
}
@Override
public Object call() throws Exception {
// check znode existence and get current version
final Stat stat = _zk.exists(nodePath);
if (stat == null) {
return null;
}
final int version = stat.getVersion();
// check matching of expected old value
final String realOldValue = new String(_zk.getData(nodePath), ENCODING);
if (!oldValue.equals(realOldValue)) {
return null;
}
// set new value
try {
return _zk.setData(nodePath, newValue.getBytes(ENCODING), version).getVersion();
} catch (final KeeperException.NoNodeException e) {
// someone removed the key in between.
return null;
} catch (final KeeperException.BadVersionException e) {
return new RetryOperation(); // someone changed the value in between -> try again with new value
}
}
@Override
public Object getExceededRetriesResult() {
return null;
}
});
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* @param key
* the key, not allowed to be 'null'
* @return value of the given key as String, or 'null' if key doesn't exist.
*/
public String getString(final String key) {
if (_log.isTraceEnabled()) {
_log.trace("getString(), key = " + key);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
final String nodePath = _root + '/' + key;
try {
// return value(=data) of given key(=znode)
return new String(_zk.getData(nodePath), ENCODING);
} catch (final KeeperException.NoNodeException e) {
return null; // znode doesn't exist
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* @param key
* the key, not allowed to be 'null'
* @return value of the given key as Integer, or 'null' if mapping for key doesn't exist.
*/
public Integer getInt(final String key) {
final String s = getString(key);
if (s == null) {
return null;
}
return Integer.valueOf(s);
}
/**
* @param key
* the key, not allowed to be 'null'
* @return value of the given key as Long, or 'null' if mapping for key doesn't exist.
*/
public Long getLong(final String key) {
final String s = getString(key);
if (s == null) {
return null;
}
return Long.valueOf(s);
}
/**
* revision of the key node. Useful to detect updates.
*
* @param key
* the key, not allowed to be 'null'
* @return key node version.
*/
public Integer getVersion(final String key) {
if (_log.isTraceEnabled()) {
_log.trace("getRevision(), key = " + key);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
final String nodePath = _root + '/' + key;
try {
// return value(=data) of given key(=znode)
final Stat stat = _zk.exists(nodePath);
if (stat == null) {
return null;
}
return stat.getVersion();
} catch (final KeeperException.NoNodeException e) {
return null; // znode doesn't exist
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* Remove given key / value mapping. If key doesn't exist, ignore call.
*
* @param key
* the key, not allowed to be 'null'
*/
public void remove(final String key) {
if (_log.isTraceEnabled()) {
_log.trace("remove(), key = " + key);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
final String nodePath = _root + '/' + key;
try {
_zk.deleteNode(nodePath);
} catch (final KeeperException.NoNodeException e) {
; // that's ok
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* add value to old value of given key. If key doesn't exist, create it.
*
* @param key
* the key, not allowed to be 'null'
* @param value
* the value to add
* @return 'newValue', if successful. If operation failed because of too many zookeeper conflicts, return 'null'.
*/
public Integer add(final String key, final int value) {
if (_log.isTraceEnabled()) {
_log.trace("add(), key = " + key + ", value = " + value);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
final String nodePath = _root + '/' + key;
try {
return (Integer) retryZkMapOperation(new ZkMapCallable() {
@Override
public String getZkOperation() {
return "add(), key = " + key + ", value = " + value;
}
@Override
public Object call() throws Exception {
// check znode existence and get current version
Stat stat = _zk.exists(nodePath);
if (stat == null) {
// znode doesn't exist -> create it
try {
_zk.createNode(nodePath, String.valueOf(value).getBytes(ENCODING));
return value;
} catch (final KeeperException.NodeExistsException e) {
stat = _zk.exists(nodePath); // someone created this znode in between, that's ok
}
}
final int version = stat.getVersion();
// add given value to old data
final String oldString = new String(_zk.getData(nodePath), ENCODING);
final int oldInt = Integer.valueOf(oldString);
final String newValue = String.valueOf(value + oldInt);
try {
_zk.setData(nodePath, newValue.getBytes(ENCODING), version);
return Integer.valueOf(newValue);
} catch (final KeeperException.NoNodeException e) {
return new RetryOperation(); // someone removed the key in between, try again...
} catch (final KeeperException.BadVersionException e) {
return new RetryOperation(); // someone changed the value in between, try again...
}
}
@Override
public Object getExceededRetriesResult() {
return null;
}
});
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* @return a set view of the keys contained in this map
*/
public Set<String> keySet() {
if (_log.isTraceEnabled()) {
_log.trace("keySet()");
}
try {
return new TreeSet<String>(_zk.getChildrenSorted(_root));
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* @return 'true' if this map contains a mapping for the specified key.
*/
public boolean containsKey(final String key) {
if (_log.isTraceEnabled()) {
_log.trace("containsKey()");
}
try {
return _zk.getChildrenSorted(_root).contains(key);
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* Removes all mappings from this map.
*/
public void clear() {
if (_log.isTraceEnabled()) {
_log.trace("clear()");
}
try {
final Set<String> keys = keySet();
for (final String key : keys) {
try {
final String keyPath = _root + "/" + key;
_zk.deleteNode(keyPath);
} catch (final KeeperException.NoNodeException e) {
; // someone removed the node in between, that's ok
}
}
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* inc value of given key, but only if value is not negative and value < max. If value >= max or value < 0 ignore call
* and return -1. If key doesn't exist and max > 0, create entry with value '1'.
*
* @param key
* the key, not allowed to be 'null'
* @param max
* the maximum value
* @return incremented value if old value was not negative and old value < max, otherwise '-1'. If operation failed
* because of too many zookeeper conflicts, return 'null'.
*/
public Integer incIfLessThan(final String key, final long max) {
if (_log.isTraceEnabled()) {
_log.trace("incIfLessThan(), key = " + key + ", max = " + max);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
if (max <= 0) {
return -1; // we can't inc a positive value if max <= 0
}
final String nodePath = _root + '/' + key;
try {
return (Integer) retryZkMapOperation(new ZkMapCallable() {
@Override
public String getZkOperation() {
return "incIfLessThan(), key = " + key + ", max = " + max;
}
@Override
public Object call() throws Exception {
// check znode existence and get current version
Stat stat = _zk.exists(nodePath);
if (stat == null) {
// znode doesn't exist -> create it with start value '1'
try {
_zk.createNode(nodePath, "1".getBytes(ENCODING));
return 1;
} catch (final KeeperException.NodeExistsException e) {
stat = _zk.exists(nodePath); // someone created this znode in between, that's ok
}
}
final int version = stat.getVersion();
// inc old value
final String oldString = new String(_zk.getData(nodePath), ENCODING);
final long oldInt = Long.valueOf(oldString);
if (oldInt >= 0 && oldInt + 1 <= max) {
final String newValue = String.valueOf(oldInt + 1);
try {
_zk.setData(nodePath, newValue.getBytes(ENCODING), version);
return Integer.valueOf(newValue);
} catch (final KeeperException.NoNodeException e) {
return new RetryOperation(); // someone removed the key in between, try again...
} catch (final KeeperException.BadVersionException e) {
return new RetryOperation(); // someone changed the value in between, try again...
}
} else {
return -1; // exceeded max or old value was negative
}
}
@Override
public Object getExceededRetriesResult() {
return null;
}
});
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* dec value of given key, but only if value is not negative and value > min. If value <= min or value < 0 ignore call
* and return -1. If key doesn't exist, create entry with value '0'.
*
* @param key
* the key, not allowed to be 'null'
* @param min
* the minimum value
* @return decremented value if old value was not negative and old value > min, otherwise '-1'. If operation failed
* because of too many zookeeper conflicts, return 'null'.
*/
public Integer decIfGreaterThan(final String key, final long min) {
if (_log.isTraceEnabled()) {
_log.trace("decIfGreaterThan(), key = " + key + ", min = " + min);
}
if (key == null) {
throw new NullPointerException("parameter key is <null>");
}
if (min < 0) {
return -1; // we can't dec a positive value if min < 0
}
final String nodePath = _root + '/' + key;
try {
return (Integer) retryZkMapOperation(new ZkMapCallable() {
@Override
public String getZkOperation() {
return "decIfGreaterThan(), key = " + key + ", min = " + min;
}
@Override
public Object call() throws Exception {
// check znode existence and get current version
Stat stat = _zk.exists(nodePath);
if (stat == null) {
// znode doesn't exist -> create it with start value '0'
try {
_zk.createNode(nodePath, "0".getBytes(ENCODING));
return 0;
} catch (final KeeperException.NodeExistsException e) {
stat = _zk.exists(nodePath); // someone created this znode in between, that's ok
}
}
final int version = stat.getVersion();
// dec old value
final String oldString = new String(_zk.getData(nodePath), ENCODING);
final int oldVal = Integer.valueOf(oldString);
if (oldVal > 0 && oldVal > min) {
final String newValue = String.valueOf(oldVal - 1);
try {
_zk.setData(nodePath, newValue.getBytes(ENCODING), version);
return Integer.valueOf(oldVal - 1);
} catch (final KeeperException.NoNodeException e) {
return new RetryOperation(); // someone removed the key in between, try again...
} catch (final KeeperException.BadVersionException e) {
return new RetryOperation(); // someone changed the value in between, try again...
}
} else {
return -1; // would decrease below min or old value already was negative
}
}
@Override
public Object getExceededRetriesResult() {
return null;
}
});
} catch (final Exception e) {
throw new RuntimeException(e); // Unexpected error.
}
}
/**
* Generic method for executing zookeeper client commands. Repeated when conflicting.
*
* @param callable
* command to execute
* @return result of Callable
*/
private Object retryZkMapOperation(final ZkMapCallable callable) throws Exception {
final long start = System.currentTimeMillis();
int tries = 0;
while (tries < NO_OF_TRIES_WHEN_CONFLICT) {
tries++;
final Object result = callable.call();
if (result instanceof RetryOperation) {
Thread.sleep(_waitTimeBetweenTriesGenerator.nextInt(MAX_WAIT_TIME_BETWEEN_TRIES));
} else {
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("ZkConnection operation time: " + t + " ms, tries: " + tries + ", operation: "
+ callable.getZkOperation());
}
}
return result;
}
}
if (_log.isWarnEnabled()) {
_log.warn("ZkConcurrentMap: Too much conflicts while doing operation: " + callable.getZkOperation());
}
return callable.getExceededRetriesResult();
}
}