/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.zookeeper.test; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CompletionService; | |
import java.util.concurrent.ExecutorCompletionService; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
import org.eclipse.smila.zookeeper.ZkConcurrentMap; | |
import org.eclipse.smila.zookeeper.ZkConnection; | |
import org.eclipse.smila.zookeeper.ZooKeeperService; | |
/** | |
* Test for ZkConcurrentMap class. | |
* | |
* @author aweber | |
*/ | |
public class TestZkConcurrentMap extends DeclarativeServiceTestCase { | |
/** root znode. */ | |
private static final String ROOT_ZNODE = "/data"; | |
/** for multi thread tests. */ | |
private static final int NO_OF_PARALLEL_THREADS = 100; | |
/** ZooKeeper service. */ | |
private ZooKeeperService _zkService; | |
/** | |
* get ZooKeeper service. | |
* | |
* @throws Exception | |
* no service found. | |
*/ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_zkService = getService(ZooKeeperService.class); | |
assertNotNull(_zkService); | |
prepareRootNode(); | |
} | |
/** | |
* @throws Exception | |
* error creating root node | |
*/ | |
private void prepareRootNode() throws Exception { | |
ZkConnection zkCon = null; | |
zkCon = new ZkConnection(_zkService); | |
if (zkCon.exists(ROOT_ZNODE) == null) { | |
zkCon.createPath(ROOT_ZNODE, "".getBytes()); | |
} else { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
map.clear(); | |
} | |
} | |
/** | |
* test ZkConcurrentMap.put(),get() and remove() with single thread. | |
*/ | |
public void testOneThreadPutGetRemove() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String key2 = "key2"; | |
final String value1 = "value1"; | |
final String value2 = "value2"; | |
assertEquals(null, map.getString(key1)); | |
map.put(key1, value1); | |
assertEquals(value1, map.getString(key1)); | |
map.put(key1, value2); | |
assertEquals(value2, map.getString(key1)); | |
map.put(key1, value1); | |
assertEquals(value1, map.getString(key1)); | |
assertEquals(null, map.getString(key2)); | |
map.put(key2, value2); | |
assertEquals(value1, map.getString(key1)); | |
assertEquals(value2, map.getString(key2)); | |
map.remove(key1); | |
assertEquals(null, map.getString(key1)); | |
assertEquals(value2, map.getString(key2)); | |
map.remove(key2); | |
assertEquals(null, map.getString(key2)); | |
} | |
/** | |
* test ZkConcurrentMap.put(),get() and remove() with muliple threads. | |
*/ | |
public void testMultiThreadPutGetRemove() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key = "key"; | |
final String value = "value"; | |
assertEquals(null, map.getString(key)); | |
final int noOfOperations = 3; // put, get, remove | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
if (i % noOfOperations == 0) { | |
completionService.submit(new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
final boolean success = map.put(key, value); | |
if (success) { | |
return "put"; | |
} else { | |
return "error"; | |
} | |
} | |
}); | |
} else if (i % noOfOperations == 1) { | |
completionService.submit(new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
return map.getString(key); | |
} | |
}); | |
} else if (i % noOfOperations == 2) { | |
completionService.submit(new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
map.remove(key); | |
return "remove"; | |
} | |
}); | |
} | |
} | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<String> f = null; | |
try { | |
f = completionService.take(); | |
final String result = f.get(10, TimeUnit.SECONDS); | |
assertTrue((result == null) || result.equals("remove") || result.equals("put") || result.equals(value)); | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* test ZkConcurrentMap.putIfAbsent() with single thread. | |
*/ | |
public void testOneThreadPutIfAbsent() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String value1 = "value1"; | |
final String value2 = "value2"; | |
assertEquals(null, map.getString(key1)); | |
String result = map.putIfAbsent(key1, value1); | |
assertEquals(value1, result); // putIfAbsent succesful | |
result = map.putIfAbsent(key1, value2); | |
assertEquals(value1, result); // putIfAbsent not succesful | |
map.remove(key1); | |
assertEquals(null, map.getString(key1)); | |
result = map.putIfAbsent(key1, value2); // putIfAbsent succesful | |
assertEquals(value2, result); | |
} | |
/** | |
* test ZkConcurrentMap.putIfAbsent() with muliple threads. | |
*/ | |
public void testMultiThreadPutIfAbsent() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key = "key"; | |
assertEquals(null, map.getString(key)); | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
final String value = "value" + i; | |
completionService.submit(new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
return map.putIfAbsent(key, value); | |
} | |
}); | |
} | |
boolean success = false; | |
String previousResult = null; | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<String> f = null; | |
try { | |
f = completionService.take(); | |
final String result = f.get(10, TimeUnit.SECONDS); | |
assertNotNull(result); | |
assertTrue(result.startsWith("value")); | |
success = true; | |
if (previousResult != null) { | |
assertEquals(previousResult, result); | |
} | |
previousResult = result; | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
assertTrue("we expect that one thread is successful", success); | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* test ZkConcurrentMap.replace() with single thread. | |
*/ | |
public void testOneThreadReplace() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String value1 = "value1"; | |
final String value2 = "value2"; | |
final String value3 = "value3"; | |
assertEquals(null, map.getString(key1)); | |
assertFalse(map.replace(key1, value2, value3)); // not replaced | |
assertEquals(null, map.getString(key1)); | |
map.put(key1, value1); | |
assertFalse(map.replace(key1, value2, value3)); // not replaced | |
assertEquals(value1, map.getString(key1)); | |
map.put(key1, value2); | |
assertTrue(map.replace(key1, value2, value3)); // replaced | |
assertEquals(value3, map.getString(key1)); | |
map.remove(key1); | |
assertEquals(null, map.getString(key1)); | |
assertFalse(map.replace(key1, value2, value3)); // not replaced | |
assertEquals(null, map.getString(key1)); | |
} | |
/** | |
* Tests concurrent invocations of replace(). Only one replace may be successful. | |
*/ | |
public void testMultiThreadReplace() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key = "key"; | |
final String oldValue = "value1"; | |
final String newValue = "value2"; | |
assertEquals(null, map.getString(key)); | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
if (i == NO_OF_PARALLEL_THREADS / 2) { | |
completionService.submit(new Callable<Boolean>() { | |
@Override | |
public Boolean call() throws Exception { | |
map.put(key, oldValue); // one thread should set the correct "oldValue" to allow a replace | |
return false; | |
} | |
}); | |
} else { | |
completionService.submit(new Callable<Boolean>() { | |
@Override | |
public Boolean call() throws Exception { | |
final boolean ret = map.replace(key, oldValue, newValue); | |
return ret; | |
} | |
}); | |
} | |
} | |
boolean success = false; | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<Boolean> f = null; | |
try { | |
f = completionService.take(); | |
final Boolean replaceSuccessful = f.get(10, TimeUnit.SECONDS); | |
// we expect that only one of the threads can set its value | |
if (replaceSuccessful) { | |
// this thread was successful | |
assertFalse("we expect exactly one successful result", success); | |
success = true; | |
} | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
if (!success) { | |
// oh. the put might've been executed last. | |
// So a replace NOW must return true, since now the put has already returned... | |
success = map.replace(key, oldValue, newValue); | |
} | |
assertTrue("we expect that one thread is successful", success); | |
assertEquals(newValue, map.getString(key)); | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* test ZkConcurrentMap.add() with single thread. | |
*/ | |
public void testOneThreadAdd() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String key2 = "key2"; | |
assertEquals(null, map.getString(key1)); | |
assertEquals(new Integer(0), map.add(key1, 0)); | |
assertEquals(new Integer(0), map.getInt(key1)); | |
assertEquals(new Integer(2), map.add(key1, 2)); | |
assertEquals(new Integer(2), map.getInt(key1)); | |
assertEquals(null, map.getString(key2)); | |
assertEquals(new Integer(1), map.add(key2, 1)); | |
assertEquals(new Integer(1), map.getInt(key2)); | |
assertEquals(new Integer(2), map.add(key2, 1)); | |
assertEquals(new Integer(2), map.getInt(key2)); | |
} | |
/** | |
* test ZkConcurrentMap.add() with multiple threads. | |
*/ | |
public void testMultiThreadAdd() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key = "key"; | |
assertEquals(null, map.getString(key)); | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
completionService.submit(new Callable<Integer>() { | |
@Override | |
public Integer call() throws Exception { | |
return map.add(key, 1); | |
} | |
}); | |
} | |
final List<Integer> resultList = new ArrayList<Integer>(NO_OF_PARALLEL_THREADS); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<Integer> f = null; | |
try { | |
f = completionService.take(); | |
final Integer result = f.get(10, TimeUnit.SECONDS); | |
assertTrue("we expect that all threads are successful", result != null); | |
assertTrue("we expect the sum less equal than number of threads", result <= NO_OF_PARALLEL_THREADS); | |
assertFalse("we expect each result only once", resultList.contains(result)); | |
resultList.add(result); | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
assertEquals(map.getInt(key).intValue(), NO_OF_PARALLEL_THREADS); | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* test ZkConcurrentMap.incIfLessThan() with single thread. | |
*/ | |
public void testOneThreadIncIfLessThan() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String key2 = "key2"; | |
assertEquals(null, map.getString(key1)); | |
// test failure for max = 0 | |
int max = 0; | |
assertEquals(new Integer(-1), map.incIfLessThan(key1, max)); | |
assertEquals(null, map.getString(key1)); | |
// test success | |
max = 3; | |
assertEquals(new Integer(1), map.incIfLessThan(key1, max)); | |
assertEquals(new Integer(2), map.incIfLessThan(key1, max)); | |
assertEquals(new Integer(3), map.incIfLessThan(key1, max)); | |
assertEquals(new Integer(3), map.getInt(key1)); | |
// test failure exceeded max | |
assertEquals(new Integer(-1), map.incIfLessThan(key1, max)); | |
assertEquals(new Integer(3), map.getInt(key1)); | |
// test failure for old value < 0 | |
map.add(key1, -10); | |
assertEquals(new Integer(-7), map.getInt(key1)); | |
assertEquals(new Integer(-1), map.incIfLessThan(key1, max)); | |
// test success for another key | |
assertEquals(null, map.getString(key2)); | |
assertEquals(new Integer(1), map.incIfLessThan(key2, 3)); | |
assertEquals(new Integer(1), map.getInt(key2)); | |
} | |
/** | |
* test {@link ZkConcurrentMap#decIfGreaterThan(String, long)} with single thread. | |
*/ | |
public void testOneThreadDecIfGreaterThan() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String key2 = "key2"; | |
assertEquals(null, map.getString(key1)); | |
// test failure for min = -1 | |
int min = -1; | |
assertEquals(new Integer(-1), map.decIfGreaterThan(key1, min)); | |
assertEquals(null, map.getString(key1)); | |
// test success | |
min = 1; | |
map.put(key1, "4"); | |
assertEquals(new Integer(3), map.decIfGreaterThan(key1, min)); | |
assertEquals(new Integer(2), map.decIfGreaterThan(key1, min)); | |
assertEquals(new Integer(1), map.decIfGreaterThan(key1, min)); | |
assertEquals(new Integer(1), map.getInt(key1)); | |
// test failure decrease below min | |
assertEquals(new Integer(-1), map.decIfGreaterThan(key1, min)); | |
assertEquals(new Integer(1), map.getInt(key1)); | |
// test failure for old value < 0 | |
map.add(key1, -10); | |
assertEquals(new Integer(-9), map.getInt(key1)); | |
assertEquals(new Integer(-1), map.decIfGreaterThan(key1, min)); | |
// test success for another key | |
assertEquals(null, map.getString(key2)); | |
map.add(key2, 3); | |
assertEquals(new Integer(2), map.decIfGreaterThan(key2, 1)); | |
assertEquals(new Integer(1), map.decIfGreaterThan(key2, 1)); | |
assertEquals(new Integer(-1), map.decIfGreaterThan(key2, 1)); | |
assertEquals(new Integer(1), map.getInt(key2)); | |
} | |
/** | |
* test ZkConcurrentMap.incIfLessThan() with multiple threads. | |
*/ | |
public void testMultiThreadIncIfLessThan() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key = "key"; | |
final int max = NO_OF_PARALLEL_THREADS - 1; // -> one thread should exceed max | |
assertEquals(null, map.getString(key)); | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
completionService.submit(new Callable<Integer>() { | |
@Override | |
public Integer call() throws Exception { | |
Thread.sleep(new Random().nextInt(10)); // wait a while before execution | |
return map.incIfLessThan(key, max); | |
} | |
}); | |
} | |
final List<Integer> resultList = new ArrayList<Integer>(NO_OF_PARALLEL_THREADS); | |
boolean exceededMax = false; | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<Integer> f = null; | |
try { | |
f = completionService.take(); | |
final Integer result = f.get(10, TimeUnit.SECONDS); | |
assertTrue("we expect no failure caused by too much zookeeper conflicts", result != null); | |
assertTrue("we expect the result less equal than max", result <= max); | |
assertFalse("we expect each result only once", resultList.contains(result)); | |
if (result == -1) { | |
exceededMax = true; | |
} | |
resultList.add(result); | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
assertTrue("we expect one thread to exceed max", exceededMax); | |
assertEquals("we expect max as final value", map.getInt(key).intValue(), max); | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* test {@link ZkConcurrentMap#decIfGreaterThan(String, long)} with multiple threads. | |
*/ | |
public void testMultiThreadDecIfGreaterThan() { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key = "key"; | |
map.put(key, String.valueOf(NO_OF_PARALLEL_THREADS)); // -> one thread should dec below min | |
final int min = 1; | |
assertEquals(String.valueOf(NO_OF_PARALLEL_THREADS), map.getString(key)); | |
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS); | |
try { | |
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor); | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
completionService.submit(new Callable<Integer>() { | |
@Override | |
public Integer call() throws Exception { | |
Thread.sleep(new Random().nextInt(10)); // wait a while before execution | |
return map.decIfGreaterThan(key, min); | |
} | |
}); | |
} | |
final List<Integer> resultList = new ArrayList<Integer>(NO_OF_PARALLEL_THREADS); | |
boolean decBelowMin = false; | |
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) { | |
Future<Integer> f = null; | |
try { | |
f = completionService.take(); | |
final Integer result = f.get(10, TimeUnit.SECONDS); | |
assertTrue("we expect no failure caused by too much zookeeper conflicts", result != null); | |
assertFalse("we expect each result only once", resultList.contains(result)); | |
if (result == -1) { | |
decBelowMin = true; | |
} else { | |
assertTrue("we expect the result greater equal than min", result >= min); | |
} | |
resultList.add(result); | |
} catch (final Exception e) { | |
fail("Error getting result: " + e.getMessage()); | |
} | |
} | |
assertTrue("we expect one thread to exceed max", decBelowMin); | |
assertEquals("we expect min as final value", map.getInt(key).intValue(), min); | |
} finally { | |
executor.shutdownNow(); | |
} | |
} | |
/** | |
* test ZkConcurrentMap.keySet() with single thread. | |
*/ | |
public void testOneThreadKeySet() throws Exception { | |
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE); | |
final String key1 = "key1"; | |
final String key2 = "key2"; | |
final String key3 = "key3"; | |
final String value1 = "value1"; | |
assertEquals(null, map.getString(key1)); | |
map.put(key1, value1); | |
assertEquals(map.keySet().size(), 1); | |
assertTrue(map.keySet().contains(key1)); | |
map.put(key2, value1); | |
map.put(key3, value1); | |
assertTrue(map.keySet().contains(key1)); | |
assertTrue(map.keySet().contains(key2)); | |
assertTrue(map.keySet().contains(key3)); | |
map.clear(); | |
assertEquals(map.keySet().size(), 0); | |
} | |
} |