blob: 8230cbbbc08815313ecab1a60412aa5fcea539f8 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.zookeeper.test;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
import org.eclipse.smila.zookeeper.ZkConcurrentMap;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZkLock;
import org.eclipse.smila.zookeeper.ZooKeeperService;;
/**
* Test for ZkLock class.
*
* @author aweber
*/
public class TestZkLock extends DeclarativeServiceTestCase {
/** root znode. */
private static final String ROOT_ZNODE = "/locktest";
/** for multi thread tests. */
private static final int NO_OF_PARALLEL_THREADS = 10;
/** for multi thread tests. */
private static final int THREAD_SLEEP_TIME = 1000;
/** ZooKeeper service. */
private ZooKeeperService _zkService;
/**
* get ZooKeeper service.
*
* @throws Exception
* no service found.
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_zkService = getService(ZooKeeperService.class);
assertNotNull(_zkService);
prepareRootNode();
}
/**
* Tests lock / unlock.
*
* @throws Exception
* test fails
*/
public void testLockUnlock() throws Exception {
final ZkConnection zkCon = new ZkConnection(_zkService);
// repeat to be sure that unlock works correct
for (int j = 0; j <= 2; j++) {
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
completionService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
final ZkLock zkLock = new ZkLock(zkCon, ROOT_ZNODE);
boolean hasLock = zkLock.tryLock();
Thread.sleep(THREAD_SLEEP_TIME);
if (hasLock) {
zkLock.unlock();
hasLock = zkLock.tryLock(); // get lock again -> should be successful
zkLock.unlock();
}
return hasLock;
}
});
}
boolean foundSuccessful = false;
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<Boolean> f = null;
try {
f = completionService.take();
final Boolean success = f.get(10, TimeUnit.SECONDS);
// we expect that only one of the threads can get the lock
if (success) {
// this thread was successful
assertFalse("we expect exactly one successful thread", foundSuccessful);
foundSuccessful = true;
}
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
if (!foundSuccessful) {
fail("we expect that one thread is successful");
}
} finally {
executor.shutdownNow();
}
}
}
/**
* Tests lock without unlock, instead using end of session as "unlock". This works because lock uses ephemeral nodes.
*
* @throws Exception
* test fails
*/
public void testLockWithEndofSessionInsteadOfUnlock() throws Exception {
final ZkConnection zkCon = new ZkConnection(_zkService);
// repeat to be sure that unlock works correct
for (int j = 0; j <= 2; j++) {
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
completionService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
final ZkLock zkLock = new ZkLock(zkCon, ROOT_ZNODE);
final boolean hasLock = zkLock.tryLock();
Thread.sleep(THREAD_SLEEP_TIME);
return hasLock;
} finally {
zkCon.disconnectZkSession();
}
}
});
}
boolean foundSuccessful = false;
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<Boolean> f = null;
try {
f = completionService.take();
final Boolean success = f.get(10, TimeUnit.SECONDS);
// we expect that only one of the threads can get the lock
if (success) {
// this thread was successful
assertFalse("we expect exactly one successful thread", foundSuccessful);
foundSuccessful = true;
}
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
if (!foundSuccessful) {
fail("we expect that one thread is successful");
}
} finally {
executor.shutdownNow();
}
}
}
/**
* @throws Exception
* error creating root node
*/
private void prepareRootNode() throws Exception {
ZkConnection zkCon = null;
zkCon = new ZkConnection(_zkService);
if (zkCon.exists(ROOT_ZNODE) == null) {
zkCon.createPath(ROOT_ZNODE, "".getBytes());
} else {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
map.clear();
}
}
}