blob: aab506689d8c6755cc598c0e1873d0c4f27687f2 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.zookeeper.test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
import org.eclipse.smila.clusterconfig.ClusterConfigException;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZooKeeperService;
/**
* Basic tests for ZooKeeperService and zookeeper client.
*/
public class TestZooKeeperService extends DeclarativeServiceTestCase {
protected ZooKeeperService _service;
/**
* test if ZooKeeper service was successfully started and registered.
*
* @throws Exception
* no service found.
*/
public void testService() throws Exception {
_service = getService(ZooKeeperService.class);
assertNotNull(_service);
final ZooKeeper zk = _service.getClient();
assertNotNull(zk);
_service.closeClient();
}
/**
* tests zookeeper node creation.
*/
public void testCreateObject() throws Exception {
_service = getService(ZooKeeperService.class);
final String baseNode = "/testCreateObject";
final ZooKeeper zk = _service.getClient();
try {
zk.create(baseNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (final Exception ex) {
; // exists already.
}
final String path =
zk.create(baseNode + "/1", "testCreateObject".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
final List<String> children = zk.getChildren(baseNode, false);
assertEquals(1, children.size());
assertEquals(path, baseNode + "/" + children.get(0));
final Stat stat = zk.exists(path, false);
final byte[] data = zk.getData(path, false, stat);
assertEquals("testCreateObject", new String(data));
}
/**
* check if creating sequential nodes increases the node-name even if the directory is always cleaned up.
*
* deactivated: Not connection-loss safe.
*
* @throws Exception
*/
public void dontTestCreateSequential() throws Exception {
final String sequenceDir = "/testCreateSequential";
final ZooKeeper zk = _service.getClient();
zk.create(sequenceDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String previousNode = null;
for (int i = 0; i < 100; ++i) {
final String node =
zk.create(sequenceDir + "/node-", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
if (previousNode != null) {
assertTrue(previousNode.compareTo(node) < 0);
}
previousNode = node;
zk.delete(node, -1);
final List<String> nodes = zk.getChildren(sequenceDir, false);
assertTrue(nodes.isEmpty());
}
}
/**
* check if setting data increases the version even if new data is same as old.
*
* deactivated: Not connection-loss safe.
*
* @throws Exception
*/
public void dontTestChangeVersion() throws Exception {
final String nodeName = "/testChangeVersion";
final ZooKeeper zk = _service.getClient();
zk.create(nodeName, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
final Stat s1 = zk.exists(nodeName, false);
zk.setData(nodeName, new byte[0], s1.getVersion());
final Stat s2 = zk.exists(nodeName, false);
assertTrue(s2.getVersion() > s1.getVersion());
try {
zk.setData(nodeName, new byte[0], s1.getVersion());
fail("should not work");
} catch (final KeeperException.BadVersionException ex) {
; // OK.
} catch (final Exception ex) {
fail("expected KeeperException.BadVersionException, got " + ex.toString());
}
zk.setData(nodeName, new byte[0], s2.getVersion());
final Stat s3 = zk.exists(nodeName, false);
assertTrue(s3.getVersion() > s2.getVersion());
}
/**
* compare time of {@link ZooKeeper#create(String, byte[], List, CreateMode)} and
* {@link ZooKeeper#exists(String, boolean)} for checking node existence.
*
* @throws Exception
* test fails
*/
public void notestExistsVsCreatePerformance() throws Exception {
final String nodeName = "/testExistsVsCreatePerformance";
final ZooKeeper zk = _service.getClient();
zk.create(nodeName, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertNotNull(zk.exists(nodeName, false));
final int noOfRuns = 100;
final long startCreate = System.nanoTime();
for (int i = 0; i < noOfRuns; i++) {
try {
zk.create(nodeName, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("should not work");
} catch (final NodeExistsException ex) {
; // ok
}
}
final long endCreate = System.nanoTime();
final long startExists = System.nanoTime();
for (int i = 0; i < noOfRuns; i++) {
if (zk.exists(nodeName, false) == null) {
fail("node should be there");
}
}
final long endExists = System.nanoTime();
System.out.println("create: " + (endCreate - startCreate) / 1000 / 1000);
System.out.println("exists: " + (endExists - startExists) / 1000 / 1000);
}
/**
* Tests if a watcher is delivered with a created event.
*
* @throws TimeoutException
* test failed
* @throws ExecutionException
* test failed
* @throws InterruptedException
* test failed
* @throws ClusterConfigException
* test failed
* @throws IOException
* test failed
* @throws KeeperException
* test failed
*
*/
public void testWatcher() throws InterruptedException, ExecutionException, TimeoutException, IOException,
ClusterConfigException, KeeperException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
/** runnable class that will be registered as watcher. */
class WatchTask implements Callable<EventType>, Watcher {
private volatile EventType _caughtEvent;
private final String _path;
/** Constructor. */
public WatchTask(final String path) {
_path = path;
}
@Override
public void process(final WatchedEvent event) {
if (event.getPath().equals(_path)) {
_caughtEvent = event.getType();
}
}
@Override
public EventType call() {
try {
try {
while (_caughtEvent == null) {
Thread.sleep(100);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
return _caughtEvent;
} finally {
// reset for next use.
_caughtEvent = null;
}
}
}
_service = getService(ZooKeeperService.class);
assertNotNull(_service);
final ZkConnection zk = new ZkConnection(_service);
final String nodePath = "/xyz-test-abc/node";
final WatchTask watchTask = new WatchTask(nodePath);
try {
try {
zk.deleteNode(nodePath);
} catch (final KeeperException e) {
; // ignore.
}
Future<EventType> future = executor.submit(watchTask);
_service.registerWatcher(watchTask);
// add watch
_service.getClient().exists(nodePath, true);
zk.createPath(nodePath, new byte[0]);
EventType caughtEvent = future.get(60, TimeUnit.SECONDS);
assertTrue(caughtEvent != null);
assertEquals(EventType.NodeCreated, caughtEvent);
// set watch again
_service.getClient().exists(nodePath, true);
// restart our watcher
future = executor.submit(watchTask);
zk.setData(nodePath, new byte[0]);
caughtEvent = future.get(60, TimeUnit.SECONDS);
assertTrue(caughtEvent != null);
assertEquals(EventType.NodeDataChanged, caughtEvent);
} finally {
_service.unregisterWatcher(watchTask);
executor.shutdownNow();
try {
zk.deleteNode(nodePath);
} catch (final KeeperException e) {
; // ignore.
}
}
}
}