/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.zookeeper; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.zookeeper.AsyncCallback; | |
import org.apache.zookeeper.CreateMode; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.Transaction; | |
import org.apache.zookeeper.Watcher; | |
import org.apache.zookeeper.ZooDefs.Ids; | |
import org.apache.zookeeper.ZooKeeper; | |
import org.apache.zookeeper.data.Stat; | |
import org.eclipse.smila.clusterconfig.ClusterConfigException; | |
/** | |
* wrapper of ZooKeeper client that executes a number of ZooKeeper operation with handling for ConnectionLoss errors. | |
*/ | |
public class ZkConnection { | |
/** use this constant for the data argument to create empty nodes. */ | |
public static final byte[] NO_DATA = new byte[0]; | |
/** Maximum number a zookeeper command is tried in case of ConnectionLoss. */ | |
private static final int MAX_NO_ZK_COMMAND_TRIES = 10; | |
/** if this time is exceeded while executing a zk operation, a log message is written. */ | |
private static final long MIN_OPERATION_TIME_TO_LOG = 500; // ms | |
/** local logger. */ | |
private final Log _log = LogFactory.getLog(getClass()); | |
/** Service needed to create client. */ | |
private final ZooKeeperService _service; | |
/** Callable extension which is able to return a zk operation string. */ | |
private interface ZkCallable { | |
/** restrict exceptions. */ | |
Object call() throws KeeperException, InterruptedException; | |
/** return the underlying zk operation as string. */ | |
String getZkOperation(); | |
} | |
/** | |
* create instance using the given service to get ZooKeeper clients when needed. | |
* | |
* @param service | |
* ZooKeeper service. | |
*/ | |
public ZkConnection(final ZooKeeperService service) { | |
super(); | |
_service = service; | |
} | |
/** | |
* @return sessionId of underlying zookeeper client. | |
*/ | |
public long getSessionId() { | |
try { | |
return _service.getClient().getSessionId(); | |
} catch (final ClusterConfigException e) { | |
throw new RuntimeException(e); | |
} catch (final IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* create a persistent node with given data. fails if the parent node of the given path does not exist. | |
* | |
* @param nodePath | |
* path for node to create | |
* @param data | |
* data of node to create | |
* @throws KeeperException | |
* error | |
*/ | |
public void createNode(final String nodePath, final byte[] data) throws KeeperException { | |
createNode(nodePath, data, CreateMode.PERSISTENT); | |
} | |
/** | |
* create a node with given data. fails if the parent node of the given path does not exist. | |
* | |
* @param nodePath | |
* path for node to create | |
* @param data | |
* data of node to create | |
* @param mode | |
* 'PERSISTENT' if we want a persistent node, 'EPHEMERAL' if we want an ephemeral node | |
* @throws KeeperException | |
* error | |
*/ | |
public void createNode(final String nodePath, final byte[] data, final CreateMode mode) throws KeeperException { | |
retryZkCommand(new ZkCallable() { | |
private boolean _firstTry = true; // needed because of CL handling | |
@Override | |
public String getZkOperation() { | |
return "createNode '" + nodePath + "', mode '" + mode + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
try { | |
getClient().create(nodePath, data, Ids.OPEN_ACL_UNSAFE, mode); | |
} catch (final KeeperException.NodeExistsException e) { | |
if (_firstTry) { | |
throw e; // error in first call -> node exists -> throw exception | |
} | |
// else: Node was created in previous call despite of CL -> ignore | |
} catch (final KeeperException.ConnectionLossException e) { | |
_firstTry = false; | |
throw e; | |
} catch (final KeeperException.SessionExpiredException e) { | |
_firstTry = false; | |
throw e; | |
} | |
return null; | |
} | |
}); | |
} | |
/** | |
* create a persistent node with given data. creates the complete hierarchy if necessary. | |
* | |
* @param nodePath | |
* path for node to create | |
* @param data | |
* data of node to create | |
* @throws KeeperException | |
* error | |
*/ | |
public void createPath(final String nodePath, final byte[] data) throws KeeperException { | |
for (int i = 0; i < 2; i++) { | |
try { | |
createNode(nodePath, data); | |
return; | |
} catch (final KeeperException.NoNodeException ex) { | |
final int indexOfSlash = nodePath.lastIndexOf('/'); | |
if (indexOfSlash > 0) { | |
final String parentPath = nodePath.substring(0, indexOfSlash); | |
try { | |
createPath(parentPath, NO_DATA); | |
} catch (final KeeperException.NodeExistsException ex2) { | |
; // somebody else just created it. Fine! | |
} | |
} | |
} | |
} | |
} | |
/** | |
* @param path | |
* the path whose elements should be created as zookeeper nodes if not existent | |
* @throws KeeperException | |
* error creating zookeeper nodes | |
*/ | |
public void ensurePathExists(final String path) throws KeeperException { | |
if (exists(path) == null) { | |
// path doesn't exist -> create it | |
try { | |
createPath(path, ZkConnection.NO_DATA); | |
} catch (final KeeperException.NodeExistsException e) { | |
; // someone created it in between -> that's ok | |
} | |
} | |
} | |
/** | |
* delete the given node. | |
* | |
* @param nodePath | |
* path for node to delete | |
* @throws KeeperException | |
* KeeperException.NoNodeException if node doesn't exist | |
*/ | |
public void deleteNode(final String nodePath) throws KeeperException { | |
// to be sure that the node exists at all. | |
if (exists(nodePath) == null) { | |
throw new KeeperException.NoNodeException(nodePath); | |
} | |
retryZkCommand(new ZkCallable() { | |
private boolean _firstTry = true; // needed because of CL handling | |
@Override | |
public String getZkOperation() { | |
return "deleteNode '" + nodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
try { | |
getClient().delete(nodePath, -1); // -1 = any version | |
} catch (final KeeperException.NoNodeException e) { | |
if (_firstTry) { | |
throw e; // error in first call -> node doesn't exist -> throw exception | |
} | |
// else: Node was deleted in previous call despite of CL -> ignore | |
} catch (final KeeperException.ConnectionLossException e) { | |
_firstTry = false; | |
throw e; | |
} catch (final KeeperException.SessionExpiredException e) { | |
_firstTry = false; | |
throw e; | |
} | |
return null; | |
} | |
}); | |
} | |
/** | |
* delete a complete node subtree, starting from the given path. | |
* | |
* @param nodePath | |
* root node of tree to delete | |
* @throws KeeperException | |
* error deleting tree. State of subtree is quite undefined afterwards. | |
*/ | |
public void deleteTreeWithoutMultiOps(final String nodePath) throws KeeperException { | |
try { | |
final List<String> children = getChildrenSorted(nodePath); | |
final String root = nodePath + '/'; | |
for (final String childNode : children) { | |
final String childPath = root + childNode; | |
deleteTreeWithoutMultiOps(childPath); | |
} | |
deleteNode(nodePath); | |
} catch (final KeeperException.NoNodeException ex) { | |
; // somebody else delete it. Fine! | |
} | |
} | |
/** | |
* delete a complete node subtree starting from the given path. | |
* | |
* @param nodePath | |
* root node of tree to delete | |
* @throws KeeperException | |
* error deleting tree. State of subtree is quite undefined afterwards. | |
*/ | |
public void deleteTree(final String nodePath) throws KeeperException { | |
try { | |
final Transaction tx = _service.getClient().transaction(); | |
deleteTreeWithMultiOps(nodePath, tx); | |
tx.commit(); | |
} catch (final Exception e) { | |
// MultiOp transaction failed, so we try the standard way | |
deleteTreeWithoutMultiOps(nodePath); | |
} | |
} | |
/** delete a complete node subtree using zookeeper MultiOps, starting from the given path. */ | |
private void deleteTreeWithMultiOps(final String nodePath, final Transaction tx) throws KeeperException { | |
final List<String> children = getChildrenSorted(nodePath); | |
final String root = nodePath + '/'; | |
for (final String childNode : children) { | |
final String childPath = root + childNode; | |
deleteTreeWithMultiOps(childPath, tx); | |
} | |
tx.delete(nodePath, -1); | |
} | |
/** | |
* Check if node exists. | |
* | |
* @param nodePath | |
* path for node to check existence | |
* @return Stat of node to exist, 'null' if no such node exists. | |
* @throws KeeperException | |
* error | |
*/ | |
public Stat exists(final String nodePath) throws KeeperException { | |
return (Stat) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "exists '" + nodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
return getClient().exists(nodePath, false); // false -> set no watch | |
} | |
}); | |
} | |
/** | |
* Check if node exists and set a watcher. | |
* | |
* @param nodePath | |
* path for node to check existence | |
* @return Stat of node to exist, 'null' if no such node exists. | |
* @throws KeeperException | |
* error | |
*/ | |
public Stat exists(final String nodePath, final Watcher watcher) throws KeeperException { | |
return (Stat) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "exists '" + nodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
return getClient().exists(nodePath, watcher); | |
} | |
}); | |
} | |
/** | |
* Check if node exists wihin given timeout. | |
* | |
* @param nodePath | |
* path for node to check existence | |
* @param timeout | |
* timeout in milliseconds. | |
* @return Stat of node to exist, 'null' if no such node exists, or the call did not finish within the timeout. | |
* @throws KeeperException | |
* error | |
*/ | |
public Stat exists(final String nodePath, final long timeout) throws KeeperException { | |
return (Stat) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "async exists'" + nodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
if (_log.isDebugEnabled()) { | |
_log.debug("Checking for existence of " + nodePath + " asynchronously."); | |
} | |
final StatCallback cb = new StatCallback(); | |
getClient().exists(nodePath, false, cb, null); | |
synchronized (cb) { | |
cb.wait(timeout); | |
} | |
return cb._stat; | |
} | |
}); | |
} | |
private static class StatCallback implements AsyncCallback.StatCallback { | |
private Stat _stat; | |
@Override | |
public void processResult(final int rc, final String path, final Object ctx, final Stat stat) { | |
_stat = stat; | |
synchronized (this) { | |
notifyAll(); | |
} | |
} | |
} | |
/** | |
* update data on existing node if version does match. | |
* | |
* @param nodePath | |
* path for node to update data | |
* @param nodeData | |
* new data | |
* @param version | |
* the expected matching version | |
* @throws KeeperException | |
* KeeperException.NoNodeException if node doesn't exist | |
*/ | |
public Stat setData(final String nodePath, final byte[] nodeData, final int version) throws KeeperException { | |
return (Stat) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "setData '" + nodePath + "', version '" + version + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
return getClient().setData(nodePath, nodeData, version); | |
} | |
}); | |
} | |
/** | |
* update data on existing node. | |
* | |
* @param nodePath | |
* path for node to update data | |
* @param nodeData | |
* new data | |
* @throws KeeperException | |
* KeeperException.NoNodeException if node doesn't exist | |
*/ | |
public Stat setData(final String nodePath, final byte[] nodeData) throws KeeperException { | |
return setData(nodePath, nodeData, -1); | |
} | |
/** | |
* get data of node. | |
* | |
* @param nodePath | |
* the node path to get the data | |
* @return node data | |
* @throws KeeperException | |
* KeeperException.NoNodeException if node doesn't exist | |
*/ | |
public byte[] getData(final String nodePath) throws KeeperException { | |
return getData(nodePath, new Stat()); | |
} | |
/** | |
* get data and stat of node. | |
* | |
* @param nodePath | |
* the node path to get the data | |
* @param stat | |
* contains the stat of the node after the call | |
* @return node data | |
* @throws KeeperException | |
* KeeperException.NoNodeException if node doesn't exist | |
*/ | |
public byte[] getData(final String nodePath, final Stat stat) throws KeeperException { | |
return (byte[]) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "getData '" + nodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
return getClient().getData(nodePath, false, stat); // false -> set no watch | |
} | |
}); | |
} | |
/** | |
* get children from node, sorted by name. | |
* | |
* @param nodePath | |
* path of node to get the children | |
* @return sorted list of children | |
* @throws KeeperException | |
* KeeperException.NoNodeException if node doesn't exist | |
*/ | |
public List<String> getChildrenSorted(final String nodePath) throws KeeperException { | |
@SuppressWarnings("unchecked") | |
final List<String> children = (List<String>) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "getChildrenSorted '" + nodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
return getClient().getChildren(nodePath, false); // false -> set no watch | |
} | |
}); | |
Collections.sort(children); // sort by name (lexicographic ascending) | |
return children; | |
} | |
/** | |
* get all leaf nodes (= nodes without children) starting from given node. | |
* | |
* @param startNodePath | |
* path of start node | |
* @return list of all leaf nodes below starting node | |
* @throws KeeperException | |
* KeeperException.NoNodeException if start node doesn't exist | |
*/ | |
public List<String> getLeafNodes(final String startNodePath) throws KeeperException { | |
@SuppressWarnings("unchecked") | |
final List<String> children = (List<String>) retryZkCommand(new ZkCallable() { | |
@Override | |
public String getZkOperation() { | |
return "getLeafNodes '" + startNodePath + "'"; | |
} | |
@Override | |
public Object call() throws KeeperException, InterruptedException { | |
final List<String> children = new ArrayList<String>(); | |
collectLeafNodes(startNodePath, children); // collect children | |
return children; | |
} | |
/** recursive helper method. */ | |
private void collectLeafNodes(final String node, final List<String> allChildren) throws KeeperException, | |
InterruptedException { | |
final List<String> children = getClient().getChildren(node, false); // false -> set no watch | |
for (final String child : children) { | |
final String childNode = node + "/" + child; | |
final Stat stat = getClient().exists(childNode, false); | |
if (stat.getNumChildren() == 0) { | |
allChildren.add(childNode); | |
} else { | |
collectLeafNodes(childNode, allChildren); | |
} | |
} | |
} | |
}); | |
return children; | |
} | |
/** | |
* Close current zookeeper client (session). | |
*/ | |
public void disconnectZkSession() throws IOException, ClusterConfigException { | |
if (_log.isTraceEnabled()) { | |
_log.trace("Disconnecting ZK session."); | |
} | |
_service.closeClient(); | |
} | |
/** | |
* @return current zookeeper client instance | |
*/ | |
private ZooKeeper getClient() { | |
try { | |
return _service.getClient(); | |
} catch (final Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* Generic method for executing zookeeper client commands. In case of ConnectionLoss or SessionExpired, command is | |
* repeated until success or maximum number of tries is reached. | |
* | |
* @param callable | |
* command to execute | |
* @return result of Callable | |
*/ | |
private Object retryZkCommand(final ZkCallable callable) throws KeeperException { | |
final long start = System.currentTimeMillis(); | |
int tries = 0; | |
while (tries < MAX_NO_ZK_COMMAND_TRIES) { | |
tries++; | |
try { | |
final Object result = callable.call(); | |
if (_log.isInfoEnabled()) { | |
final long t = System.currentTimeMillis() - start; | |
if (t > MIN_OPERATION_TIME_TO_LOG) { | |
_log.info("ZkConnection operation time: " + t + " ms, tries: " + tries + ", operation: " | |
+ callable.getZkOperation()); | |
} | |
} | |
return result; | |
} catch (final InterruptedException ex) { | |
_log.info("Interrupted exception during " + callable.getZkOperation() + ", retrying."); // ignore and try again | |
} catch (final KeeperException.ConnectionLossException e) { | |
_log.warn("ConnectionLoss exception during " + callable.getZkOperation() + ", retrying."); // ignore and try again | |
_service.waitForClientConnected(); | |
} catch (final KeeperException.SessionExpiredException e) { | |
_log.warn("SessionExpired exception during " + callable.getZkOperation() + ", retrying.", e); // ignore and try again | |
_service.waitForClientConnected(); | |
} | |
} | |
throw new RuntimeException("Reached maximum number of tries (" + MAX_NO_ZK_COMMAND_TRIES | |
+ ") for zookeeper client command"); | |
} | |
} |