/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.zookeeper.test; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CompletionService; | |
import java.util.concurrent.ExecutorCompletionService; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
import org.eclipse.smila.zookeeper.ZkConcurrentMap; | |
import org.eclipse.smila.zookeeper.ZkConnection; | |
import org.eclipse.smila.zookeeper.ZkLock; | |
import org.eclipse.smila.zookeeper.ZooKeeperService;; | |
/** | |
* Test for ZkLock class. | |
* | |
* @author aweber | |
*/ | |
public class TestZkLock extends DeclarativeServiceTestCase { | |
/** root znode. */ | |
private static final String ROOT_ZNODE = "/locktest"; | |
/** for multi thread tests. */ | |
private static final int NO_OF_PARALLEL_THREADS = 10; | |
/** for multi thread tests. */ | |
private static final int THREAD_SLEEP_TIME = 1000; | |
/** ZooKeeper service. */ | |
private ZooKeeperService _zkService; | |
/** | |
* get ZooKeeper service. | |
* | |
* @throws Exception | |
* no service found. | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_zkService = getService(ZooKeeperService.class); | |
assertNotNull(_zkService); | |
prepareRootNode(); | |
} | |
/** | |
* Tests lock / unlock. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testLockUnlock() throws Exception { | |
final ZkConnection zkCon = new ZkConnection(_zkService); | |
// repeat to be sure that unlock works correct | |
for (int j = 0; j <= 2; j++) { | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
completionService.submit(new Callable<Boolean>() { | |
@Override | |
public Boolean call() throws Exception { | |
final ZkLock zkLock = new ZkLock(zkCon, ROOT_ZNODE); | |
boolean hasLock = zkLock.tryLock(); | |
Thread.sleep(THREAD_SLEEP_TIME); | |
if (hasLock) { | |
zkLock.unlock(); | |
hasLock = zkLock.tryLock(); // get lock again -> should be successful | |
zkLock.unlock(); | |
} | |
return hasLock; | |
} | |
}); | |
} | |
boolean foundSuccessful = false; | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<Boolean> f = null; | |
try { | |
f = completionService.take(); | |
final Boolean success = f.get(10, TimeUnit.SECONDS); | |
// we expect that only one of the threads can get the lock | |
if (success) { | |
// this thread was successful | |
assertFalse("we expect exactly one successful thread", foundSuccessful); | |
foundSuccessful = true; | |
} | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
if (!foundSuccessful) { | |
fail("we expect that one thread is successful"); | |
} | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
} | |
/** | |
* Tests lock without unlock, instead using end of session as "unlock". This works because lock uses ephemeral nodes. | |
* | |
* @throws Exception | |
* test fails | |
*/ | |
public void testLockWithEndofSessionInsteadOfUnlock() throws Exception { | |
final ZkConnection zkCon = new ZkConnection(_zkService); | |
// repeat to be sure that unlock works correct | |
for (int j = 0; j <= 2; j++) { | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
completionService.submit(new Callable<Boolean>() { | |
@Override | |
public Boolean call() throws Exception { | |
try { | |
final ZkLock zkLock = new ZkLock(zkCon, ROOT_ZNODE); | |
final boolean hasLock = zkLock.tryLock(); | |
Thread.sleep(THREAD_SLEEP_TIME); | |
return hasLock; | |
} finally { | |
zkCon.disconnectZkSession(); | |
} | |
} | |
}); | |
} | |
boolean foundSuccessful = false; | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<Boolean> f = null; | |
try { | |
f = completionService.take(); | |
final Boolean success = f.get(10, TimeUnit.SECONDS); | |
// we expect that only one of the threads can get the lock | |
if (success) { | |
// this thread was successful | |
assertFalse("we expect exactly one successful thread", foundSuccessful); | |
foundSuccessful = true; | |
} | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
if (!foundSuccessful) { | |
fail("we expect that one thread is successful"); | |
} | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
} | |
/** | |
* @throws Exception | |
* error creating root node | |
*/ | |
private void prepareRootNode() throws Exception { | |
ZkConnection zkCon = null; | |
zkCon = new ZkConnection(_zkService); | |
if (zkCon.exists(ROOT_ZNODE) == null) { | |
zkCon.createPath(ROOT_ZNODE, "".getBytes()); | |
} else { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
map.clear(); | |
} | |
} | |
} |