blob: d5e66aba9c4e5ee156c891131391197da749c743 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Weber (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.zookeeper.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
import org.eclipse.smila.zookeeper.ZkConcurrentMap;
import org.eclipse.smila.zookeeper.ZkConnection;
import org.eclipse.smila.zookeeper.ZooKeeperService;
/**
* Test for ZkConcurrentMap class.
*
* @author aweber
*/
public class TestZkConcurrentMap extends DeclarativeServiceTestCase {
/** root znode. */
private static final String ROOT_ZNODE = "/data";
/** for multi thread tests. */
private static final int NO_OF_PARALLEL_THREADS = 100;
/** ZooKeeper service. */
private ZooKeeperService _zkService;
/**
* get ZooKeeper service.
*
* @throws Exception
* no service found.
*/
@Override
protected void setUp() throws Exception {
super.setUp();
_zkService = getService(ZooKeeperService.class);
assertNotNull(_zkService);
prepareRootNode();
}
/**
* @throws Exception
* error creating root node
*/
private void prepareRootNode() throws Exception {
ZkConnection zkCon = null;
zkCon = new ZkConnection(_zkService);
if (zkCon.exists(ROOT_ZNODE) == null) {
zkCon.createPath(ROOT_ZNODE, "".getBytes());
} else {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
map.clear();
}
}
/**
* test ZkConcurrentMap.put(),get() and remove() with single thread.
*/
public void testOneThreadPutGetRemove() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String key2 = "key2";
final String value1 = "value1";
final String value2 = "value2";
assertEquals(null, map.getString(key1));
map.put(key1, value1);
assertEquals(value1, map.getString(key1));
map.put(key1, value2);
assertEquals(value2, map.getString(key1));
map.put(key1, value1);
assertEquals(value1, map.getString(key1));
assertEquals(null, map.getString(key2));
map.put(key2, value2);
assertEquals(value1, map.getString(key1));
assertEquals(value2, map.getString(key2));
map.remove(key1);
assertEquals(null, map.getString(key1));
assertEquals(value2, map.getString(key2));
map.remove(key2);
assertEquals(null, map.getString(key2));
}
/**
* test ZkConcurrentMap.put(),get() and remove() with muliple threads.
*/
public void testMultiThreadPutGetRemove() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key = "key";
final String value = "value";
assertEquals(null, map.getString(key));
final int noOfOperations = 3; // put, get, remove
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
if (i % noOfOperations == 0) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
final boolean success = map.put(key, value);
if (success) {
return "put";
} else {
return "error";
}
}
});
} else if (i % noOfOperations == 1) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return map.getString(key);
}
});
} else if (i % noOfOperations == 2) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
map.remove(key);
return "remove";
}
});
}
}
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<String> f = null;
try {
f = completionService.take();
final String result = f.get(10, TimeUnit.SECONDS);
assertTrue((result == null) || result.equals("remove") || result.equals("put") || result.equals(value));
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
} finally {
executor.shutdownNow();
}
}
/**
* test ZkConcurrentMap.putIfAbsent() with single thread.
*/
public void testOneThreadPutIfAbsent() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String value1 = "value1";
final String value2 = "value2";
assertEquals(null, map.getString(key1));
String result = map.putIfAbsent(key1, value1);
assertEquals(value1, result); // putIfAbsent succesful
result = map.putIfAbsent(key1, value2);
assertEquals(value1, result); // putIfAbsent not succesful
map.remove(key1);
assertEquals(null, map.getString(key1));
result = map.putIfAbsent(key1, value2); // putIfAbsent succesful
assertEquals(value2, result);
}
/**
* test ZkConcurrentMap.putIfAbsent() with muliple threads.
*/
public void testMultiThreadPutIfAbsent() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key = "key";
assertEquals(null, map.getString(key));
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
final String value = "value" + i;
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return map.putIfAbsent(key, value);
}
});
}
boolean success = false;
String previousResult = null;
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<String> f = null;
try {
f = completionService.take();
final String result = f.get(10, TimeUnit.SECONDS);
assertNotNull(result);
assertTrue(result.startsWith("value"));
success = true;
if (previousResult != null) {
assertEquals(previousResult, result);
}
previousResult = result;
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
assertTrue("we expect that one thread is successful", success);
} finally {
executor.shutdownNow();
}
}
/**
* test ZkConcurrentMap.replace() with single thread.
*/
public void testOneThreadReplace() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String value1 = "value1";
final String value2 = "value2";
final String value3 = "value3";
assertEquals(null, map.getString(key1));
assertFalse(map.replace(key1, value2, value3)); // not replaced
assertEquals(null, map.getString(key1));
map.put(key1, value1);
assertFalse(map.replace(key1, value2, value3)); // not replaced
assertEquals(value1, map.getString(key1));
map.put(key1, value2);
assertTrue(map.replace(key1, value2, value3)); // replaced
assertEquals(value3, map.getString(key1));
map.remove(key1);
assertEquals(null, map.getString(key1));
assertFalse(map.replace(key1, value2, value3)); // not replaced
assertEquals(null, map.getString(key1));
}
/**
* Tests concurrent invocations of replace(). Only one replace may be successful.
*/
public void testMultiThreadReplace() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key = "key";
final String oldValue = "value1";
final String newValue = "value2";
assertEquals(null, map.getString(key));
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
if (i == NO_OF_PARALLEL_THREADS / 2) {
completionService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
map.put(key, oldValue); // one thread should set the correct "oldValue" to allow a replace
return false;
}
});
} else {
completionService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
final boolean ret = map.replace(key, oldValue, newValue);
return ret;
}
});
}
}
boolean success = false;
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<Boolean> f = null;
try {
f = completionService.take();
final Boolean replaceSuccessful = f.get(10, TimeUnit.SECONDS);
// we expect that only one of the threads can set its value
if (replaceSuccessful) {
// this thread was successful
assertFalse("we expect exactly one successful result", success);
success = true;
}
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
if (!success) {
// oh. the put might've been executed last.
// So a replace NOW must return true, since now the put has already returned...
success = map.replace(key, oldValue, newValue);
}
assertTrue("we expect that one thread is successful", success);
assertEquals(newValue, map.getString(key));
} finally {
executor.shutdownNow();
}
}
/**
* test ZkConcurrentMap.add() with single thread.
*/
public void testOneThreadAdd() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String key2 = "key2";
assertEquals(null, map.getString(key1));
assertEquals(new Integer(0), map.add(key1, 0));
assertEquals(new Integer(0), map.getInt(key1));
assertEquals(new Integer(2), map.add(key1, 2));
assertEquals(new Integer(2), map.getInt(key1));
assertEquals(null, map.getString(key2));
assertEquals(new Integer(1), map.add(key2, 1));
assertEquals(new Integer(1), map.getInt(key2));
assertEquals(new Integer(2), map.add(key2, 1));
assertEquals(new Integer(2), map.getInt(key2));
}
/**
* test ZkConcurrentMap.add() with multiple threads.
*/
public void testMultiThreadAdd() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key = "key";
assertEquals(null, map.getString(key));
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return map.add(key, 1);
}
});
}
final List<Integer> resultList = new ArrayList<Integer>(NO_OF_PARALLEL_THREADS);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<Integer> f = null;
try {
f = completionService.take();
final Integer result = f.get(10, TimeUnit.SECONDS);
assertTrue("we expect that all threads are successful", result != null);
assertTrue("we expect the sum less equal than number of threads", result <= NO_OF_PARALLEL_THREADS);
assertFalse("we expect each result only once", resultList.contains(result));
resultList.add(result);
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
assertEquals(map.getInt(key).intValue(), NO_OF_PARALLEL_THREADS);
} finally {
executor.shutdownNow();
}
}
/**
* test ZkConcurrentMap.incIfLessThan() with single thread.
*/
public void testOneThreadIncIfLessThan() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String key2 = "key2";
assertEquals(null, map.getString(key1));
// test failure for max = 0
int max = 0;
assertEquals(new Integer(-1), map.incIfLessThan(key1, max));
assertEquals(null, map.getString(key1));
// test success
max = 3;
assertEquals(new Integer(1), map.incIfLessThan(key1, max));
assertEquals(new Integer(2), map.incIfLessThan(key1, max));
assertEquals(new Integer(3), map.incIfLessThan(key1, max));
assertEquals(new Integer(3), map.getInt(key1));
// test failure exceeded max
assertEquals(new Integer(-1), map.incIfLessThan(key1, max));
assertEquals(new Integer(3), map.getInt(key1));
// test failure for old value < 0
map.add(key1, -10);
assertEquals(new Integer(-7), map.getInt(key1));
assertEquals(new Integer(-1), map.incIfLessThan(key1, max));
// test success for another key
assertEquals(null, map.getString(key2));
assertEquals(new Integer(1), map.incIfLessThan(key2, 3));
assertEquals(new Integer(1), map.getInt(key2));
}
/**
* test {@link ZkConcurrentMap#decIfGreaterThan(String, long)} with single thread.
*/
public void testOneThreadDecIfGreaterThan() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String key2 = "key2";
assertEquals(null, map.getString(key1));
// test failure for min = -1
int min = -1;
assertEquals(new Integer(-1), map.decIfGreaterThan(key1, min));
assertEquals(null, map.getString(key1));
// test success
min = 1;
map.put(key1, "4");
assertEquals(new Integer(3), map.decIfGreaterThan(key1, min));
assertEquals(new Integer(2), map.decIfGreaterThan(key1, min));
assertEquals(new Integer(1), map.decIfGreaterThan(key1, min));
assertEquals(new Integer(1), map.getInt(key1));
// test failure decrease below min
assertEquals(new Integer(-1), map.decIfGreaterThan(key1, min));
assertEquals(new Integer(1), map.getInt(key1));
// test failure for old value < 0
map.add(key1, -10);
assertEquals(new Integer(-9), map.getInt(key1));
assertEquals(new Integer(-1), map.decIfGreaterThan(key1, min));
// test success for another key
assertEquals(null, map.getString(key2));
map.add(key2, 3);
assertEquals(new Integer(2), map.decIfGreaterThan(key2, 1));
assertEquals(new Integer(1), map.decIfGreaterThan(key2, 1));
assertEquals(new Integer(-1), map.decIfGreaterThan(key2, 1));
assertEquals(new Integer(1), map.getInt(key2));
}
/**
* test ZkConcurrentMap.incIfLessThan() with multiple threads.
*/
public void testMultiThreadIncIfLessThan() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key = "key";
final int max = NO_OF_PARALLEL_THREADS - 1; // -> one thread should exceed max
assertEquals(null, map.getString(key));
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(10)); // wait a while before execution
return map.incIfLessThan(key, max);
}
});
}
final List<Integer> resultList = new ArrayList<Integer>(NO_OF_PARALLEL_THREADS);
boolean exceededMax = false;
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<Integer> f = null;
try {
f = completionService.take();
final Integer result = f.get(10, TimeUnit.SECONDS);
assertTrue("we expect no failure caused by too much zookeeper conflicts", result != null);
assertTrue("we expect the result less equal than max", result <= max);
assertFalse("we expect each result only once", resultList.contains(result));
if (result == -1) {
exceededMax = true;
}
resultList.add(result);
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
assertTrue("we expect one thread to exceed max", exceededMax);
assertEquals("we expect max as final value", map.getInt(key).intValue(), max);
} finally {
executor.shutdownNow();
}
}
/**
* test {@link ZkConcurrentMap#decIfGreaterThan(String, long)} with multiple threads.
*/
public void testMultiThreadDecIfGreaterThan() {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key = "key";
map.put(key, String.valueOf(NO_OF_PARALLEL_THREADS)); // -> one thread should dec below min
final int min = 1;
assertEquals(String.valueOf(NO_OF_PARALLEL_THREADS), map.getString(key));
final ExecutorService executor = Executors.newFixedThreadPool(NO_OF_PARALLEL_THREADS);
try {
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(10)); // wait a while before execution
return map.decIfGreaterThan(key, min);
}
});
}
final List<Integer> resultList = new ArrayList<Integer>(NO_OF_PARALLEL_THREADS);
boolean decBelowMin = false;
for (int i = 0; i < NO_OF_PARALLEL_THREADS; i++) {
Future<Integer> f = null;
try {
f = completionService.take();
final Integer result = f.get(10, TimeUnit.SECONDS);
assertTrue("we expect no failure caused by too much zookeeper conflicts", result != null);
assertFalse("we expect each result only once", resultList.contains(result));
if (result == -1) {
decBelowMin = true;
} else {
assertTrue("we expect the result greater equal than min", result >= min);
}
resultList.add(result);
} catch (final Exception e) {
fail("Error getting result: " + e.getMessage());
}
}
assertTrue("we expect one thread to exceed max", decBelowMin);
assertEquals("we expect min as final value", map.getInt(key).intValue(), min);
} finally {
executor.shutdownNow();
}
}
/**
* test ZkConcurrentMap.keySet() with single thread.
*/
public void testOneThreadKeySet() throws Exception {
final ZkConcurrentMap map = new ZkConcurrentMap(_zkService, ROOT_ZNODE);
final String key1 = "key1";
final String key2 = "key2";
final String key3 = "key3";
final String value1 = "value1";
assertEquals(null, map.getString(key1));
map.put(key1, value1);
assertEquals(map.keySet().size(), 1);
assertTrue(map.keySet().contains(key1));
map.put(key2, value1);
map.put(key3, value1);
assertTrue(map.keySet().contains(key1));
assertTrue(map.keySet().contains(key2));
assertTrue(map.keySet().contains(key3));
map.clear();
assertEquals(map.keySet().size(), 0);
}
}