blob: 667697d39753f8cceb6a8c46a7c693b1e447b254 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.eclipse.smila.clusterconfig.ClusterConfigException;
/**
* wrapper of ZooKeeper client that executes a number of ZooKeeper operation with handling for ConnectionLoss errors.
*/
public class ZkConnection {
/** use this constant for the data argument to create empty nodes. */
public static final byte[] NO_DATA = new byte[0];
/** Maximum number a zookeeper command is tried in case of ConnectionLoss. */
private static final int MAX_NO_ZK_COMMAND_TRIES = 10;
/** if this time is exceeded while executing a zk operation, a log message is written. */
private static final long MIN_OPERATION_TIME_TO_LOG = 500; // ms
/** local logger. */
private final Log _log = LogFactory.getLog(getClass());
/** Service needed to create client. */
private final ZooKeeperService _service;
/** Callable extension which is able to return a zk operation string. */
private interface ZkCallable {
/** restrict exceptions. */
Object call() throws KeeperException, InterruptedException;
/** return the underlying zk operation as string. */
String getZkOperation();
}
/**
* create instance using the given service to get ZooKeeper clients when needed.
*
* @param service
* ZooKeeper service.
*/
public ZkConnection(final ZooKeeperService service) {
super();
_service = service;
}
/**
* @return sessionId of underlying zookeeper client.
*/
public long getSessionId() {
try {
return _service.getClient().getSessionId();
} catch (final ClusterConfigException e) {
throw new RuntimeException(e);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
/**
* create a persistent node with given data. fails if the parent node of the given path does not exist.
*
* @param nodePath
* path for node to create
* @param data
* data of node to create
* @throws KeeperException
* error
*/
public void createNode(final String nodePath, final byte[] data) throws KeeperException {
createNode(nodePath, data, CreateMode.PERSISTENT);
}
/**
* create a node with given data. fails if the parent node of the given path does not exist.
*
* @param nodePath
* path for node to create
* @param data
* data of node to create
* @param mode
* 'PERSISTENT' if we want a persistent node, 'EPHEMERAL' if we want an ephemeral node
* @throws KeeperException
* error
*/
public void createNode(final String nodePath, final byte[] data, final CreateMode mode) throws KeeperException {
retryZkCommand(new ZkCallable() {
private boolean _firstTry = true; // needed because of CL handling
@Override
public String getZkOperation() {
return "createNode '" + nodePath + "', mode '" + mode + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
try {
getClient().create(nodePath, data, Ids.OPEN_ACL_UNSAFE, mode);
} catch (final KeeperException.NodeExistsException e) {
if (_firstTry) {
throw e; // error in first call -> node exists -> throw exception
}
// else: Node was created in previous call despite of CL -> ignore
} catch (final KeeperException.ConnectionLossException e) {
_firstTry = false;
throw e;
} catch (final KeeperException.SessionExpiredException e) {
_firstTry = false;
throw e;
}
return null;
}
});
}
/**
* create a persistent node with given data. creates the complete hierarchy if necessary.
*
* @param nodePath
* path for node to create
* @param data
* data of node to create
* @throws KeeperException
* error
*/
public void createPath(final String nodePath, final byte[] data) throws KeeperException {
for (int i = 0; i < 2; i++) {
try {
createNode(nodePath, data);
return;
} catch (final KeeperException.NoNodeException ex) {
final int indexOfSlash = nodePath.lastIndexOf('/');
if (indexOfSlash > 0) {
final String parentPath = nodePath.substring(0, indexOfSlash);
try {
createPath(parentPath, NO_DATA);
} catch (final KeeperException.NodeExistsException ex2) {
; // somebody else just created it. Fine!
}
}
}
}
}
/**
* @param path
* the path whose elements should be created as zookeeper nodes if not existent
* @throws KeeperException
* error creating zookeeper nodes
*/
public void ensurePathExists(final String path) throws KeeperException {
if (exists(path) == null) {
// path doesn't exist -> create it
try {
createPath(path, ZkConnection.NO_DATA);
} catch (final KeeperException.NodeExistsException e) {
; // someone created it in between -> that's ok
}
}
}
/**
* delete the given node.
*
* @param nodePath
* path for node to delete
* @throws KeeperException
* KeeperException.NoNodeException if node doesn't exist
*/
public void deleteNode(final String nodePath) throws KeeperException {
// to be sure that the node exists at all.
if (exists(nodePath) == null) {
throw new KeeperException.NoNodeException(nodePath);
}
retryZkCommand(new ZkCallable() {
private boolean _firstTry = true; // needed because of CL handling
@Override
public String getZkOperation() {
return "deleteNode '" + nodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
try {
getClient().delete(nodePath, -1); // -1 = any version
} catch (final KeeperException.NoNodeException e) {
if (_firstTry) {
throw e; // error in first call -> node doesn't exist -> throw exception
}
// else: Node was deleted in previous call despite of CL -> ignore
} catch (final KeeperException.ConnectionLossException e) {
_firstTry = false;
throw e;
} catch (final KeeperException.SessionExpiredException e) {
_firstTry = false;
throw e;
}
return null;
}
});
}
/**
* delete a complete node subtree, starting from the given path.
*
* @param nodePath
* root node of tree to delete
* @throws KeeperException
* error deleting tree. State of subtree is quite undefined afterwards.
*/
public void deleteTreeWithoutMultiOps(final String nodePath) throws KeeperException {
try {
final List<String> children = getChildrenSorted(nodePath);
final String root = nodePath + '/';
for (final String childNode : children) {
final String childPath = root + childNode;
deleteTreeWithoutMultiOps(childPath);
}
deleteNode(nodePath);
} catch (final KeeperException.NoNodeException ex) {
; // somebody else delete it. Fine!
}
}
/**
* delete a complete node subtree starting from the given path.
*
* @param nodePath
* root node of tree to delete
* @throws KeeperException
* error deleting tree. State of subtree is quite undefined afterwards.
*/
public void deleteTree(final String nodePath) throws KeeperException {
try {
final Transaction tx = _service.getClient().transaction();
deleteTreeWithMultiOps(nodePath, tx);
tx.commit();
} catch (final Exception e) {
// MultiOp transaction failed, so we try the standard way
deleteTreeWithoutMultiOps(nodePath);
}
}
/** delete a complete node subtree using zookeeper MultiOps, starting from the given path. */
private void deleteTreeWithMultiOps(final String nodePath, final Transaction tx) throws KeeperException {
final List<String> children = getChildrenSorted(nodePath);
final String root = nodePath + '/';
for (final String childNode : children) {
final String childPath = root + childNode;
deleteTreeWithMultiOps(childPath, tx);
}
tx.delete(nodePath, -1);
}
/**
* Check if node exists.
*
* @param nodePath
* path for node to check existence
* @return Stat of node to exist, 'null' if no such node exists.
* @throws KeeperException
* error
*/
public Stat exists(final String nodePath) throws KeeperException {
return (Stat) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "exists '" + nodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
return getClient().exists(nodePath, false); // false -> set no watch
}
});
}
/**
* Check if node exists and set a watcher.
*
* @param nodePath
* path for node to check existence
* @return Stat of node to exist, 'null' if no such node exists.
* @throws KeeperException
* error
*/
public Stat exists(final String nodePath, final Watcher watcher) throws KeeperException {
return (Stat) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "exists '" + nodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
return getClient().exists(nodePath, watcher);
}
});
}
/**
* Check if node exists wihin given timeout.
*
* @param nodePath
* path for node to check existence
* @param timeout
* timeout in milliseconds.
* @return Stat of node to exist, 'null' if no such node exists, or the call did not finish within the timeout.
* @throws KeeperException
* error
*/
public Stat exists(final String nodePath, final long timeout) throws KeeperException {
return (Stat) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "async exists'" + nodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
if (_log.isDebugEnabled()) {
_log.debug("Checking for existence of " + nodePath + " asynchronously.");
}
final StatCallback cb = new StatCallback();
getClient().exists(nodePath, false, cb, null);
synchronized (cb) {
cb.wait(timeout);
}
return cb._stat;
}
});
}
private static class StatCallback implements AsyncCallback.StatCallback {
private Stat _stat;
@Override
public void processResult(final int rc, final String path, final Object ctx, final Stat stat) {
_stat = stat;
synchronized (this) {
notifyAll();
}
}
}
/**
* update data on existing node if version does match.
*
* @param nodePath
* path for node to update data
* @param nodeData
* new data
* @param version
* the expected matching version
* @throws KeeperException
* KeeperException.NoNodeException if node doesn't exist
*/
public Stat setData(final String nodePath, final byte[] nodeData, final int version) throws KeeperException {
return (Stat) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "setData '" + nodePath + "', version '" + version + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
return getClient().setData(nodePath, nodeData, version);
}
});
}
/**
* update data on existing node.
*
* @param nodePath
* path for node to update data
* @param nodeData
* new data
* @throws KeeperException
* KeeperException.NoNodeException if node doesn't exist
*/
public Stat setData(final String nodePath, final byte[] nodeData) throws KeeperException {
return setData(nodePath, nodeData, -1);
}
/**
* get data of node.
*
* @param nodePath
* the node path to get the data
* @return node data
* @throws KeeperException
* KeeperException.NoNodeException if node doesn't exist
*/
public byte[] getData(final String nodePath) throws KeeperException {
return getData(nodePath, new Stat());
}
/**
* get data and stat of node.
*
* @param nodePath
* the node path to get the data
* @param stat
* contains the stat of the node after the call
* @return node data
* @throws KeeperException
* KeeperException.NoNodeException if node doesn't exist
*/
public byte[] getData(final String nodePath, final Stat stat) throws KeeperException {
return (byte[]) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "getData '" + nodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
return getClient().getData(nodePath, false, stat); // false -> set no watch
}
});
}
/**
* get children from node, sorted by name.
*
* @param nodePath
* path of node to get the children
* @return sorted list of children
* @throws KeeperException
* KeeperException.NoNodeException if node doesn't exist
*/
public List<String> getChildrenSorted(final String nodePath) throws KeeperException {
@SuppressWarnings("unchecked")
final List<String> children = (List<String>) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "getChildrenSorted '" + nodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
return getClient().getChildren(nodePath, false); // false -> set no watch
}
});
Collections.sort(children); // sort by name (lexicographic ascending)
return children;
}
/**
* get all leaf nodes (= nodes without children) starting from given node.
*
* @param startNodePath
* path of start node
* @return list of all leaf nodes below starting node
* @throws KeeperException
* KeeperException.NoNodeException if start node doesn't exist
*/
public List<String> getLeafNodes(final String startNodePath) throws KeeperException {
@SuppressWarnings("unchecked")
final List<String> children = (List<String>) retryZkCommand(new ZkCallable() {
@Override
public String getZkOperation() {
return "getLeafNodes '" + startNodePath + "'";
}
@Override
public Object call() throws KeeperException, InterruptedException {
final List<String> children = new ArrayList<String>();
collectLeafNodes(startNodePath, children); // collect children
return children;
}
/** recursive helper method. */
private void collectLeafNodes(final String node, final List<String> allChildren) throws KeeperException,
InterruptedException {
final List<String> children = getClient().getChildren(node, false); // false -> set no watch
for (final String child : children) {
final String childNode = node + "/" + child;
final Stat stat = getClient().exists(childNode, false);
if (stat.getNumChildren() == 0) {
allChildren.add(childNode);
} else {
collectLeafNodes(childNode, allChildren);
}
}
}
});
return children;
}
/**
* Close current zookeeper client (session).
*/
public void disconnectZkSession() throws IOException, ClusterConfigException {
if (_log.isTraceEnabled()) {
_log.trace("Disconnecting ZK session.");
}
_service.closeClient();
}
/**
* @return current zookeeper client instance
*/
private ZooKeeper getClient() {
try {
return _service.getClient();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
/**
* Generic method for executing zookeeper client commands. In case of ConnectionLoss or SessionExpired, command is
* repeated until success or maximum number of tries is reached.
*
* @param callable
* command to execute
* @return result of Callable
*/
private Object retryZkCommand(final ZkCallable callable) throws KeeperException {
final long start = System.currentTimeMillis();
int tries = 0;
while (tries < MAX_NO_ZK_COMMAND_TRIES) {
tries++;
try {
final Object result = callable.call();
if (_log.isInfoEnabled()) {
final long t = System.currentTimeMillis() - start;
if (t > MIN_OPERATION_TIME_TO_LOG) {
_log.info("ZkConnection operation time: " + t + " ms, tries: " + tries + ", operation: "
+ callable.getZkOperation());
}
}
return result;
} catch (final InterruptedException ex) {
_log.info("Interrupted exception during " + callable.getZkOperation() + ", retrying."); // ignore and try again
} catch (final KeeperException.ConnectionLossException e) {
_log.warn("ConnectionLoss exception during " + callable.getZkOperation() + ", retrying."); // ignore and try again
_service.waitForClientConnected();
} catch (final KeeperException.SessionExpiredException e) {
_log.warn("SessionExpired exception during " + callable.getZkOperation() + ", retrying.", e); // ignore and try again
_service.waitForClientConnected();
}
}
throw new RuntimeException("Reached maximum number of tries (" + MAX_NO_ZK_COMMAND_TRIES
+ ") for zookeeper client command");
}
}