blob: 26d0d7b1f3ce0967f75975ecab8b34e0f2abe9fa [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation
**********************************************************************************************************************/
package org.eclipse.smila.objectstore.filesystem.test;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.eclipse.smila.objectstore.ObjectStoreException;
import org.eclipse.smila.objectstore.ObjectStoreService;
import org.eclipse.smila.objectstore.StoreObject;
import org.eclipse.smila.objectstore.StoreOutputStream;
import org.eclipse.smila.objectstore.util.ObjectStoreRetryUtil;
import org.eclipse.smila.test.DeclarativeServiceTestCase;
/**
* Tests concurrent access to the ObjectStoreService.
*/
public class TestConcurrentAccess extends DeclarativeServiceTestCase {
/** name of the test store. */
private static final String STORE_NAME = "testconcurrentaccess-store";
/** object id to test with. */
private static final String OBJECT_ID = "1/2/3/4/testconcurrentaccess-object";
/** test data template. */
private static final String TEST_DATA_TEMPLATE =
"{ \"version\": \"$number\", \"content\": \"Lorem ipsum number $number\" }";
/** test data pattern. */
private static final String TEST_DATA_PATTERN =
"\\{ \"version\": \"([0-9]*)\", \"content\": \"Lorem ipsum number ([0-9]*)\" \\}";
/** the test instance of the service. */
private ObjectStoreService _service;
/** CountDouwnLatch to wait for synchronized go. */
private CountDownLatch _countDownLatch;
/** {@inheritDoc} */
@Override
protected void setUp() throws Exception {
super.setUp();
_service = getService(ObjectStoreService.class);
assertNotNull(_service);
_service.ensureStore(STORE_NAME);
_service.removeObject(STORE_NAME, OBJECT_ID);
_countDownLatch = new CountDownLatch(1);
}
/** {@inheritDoc} */
@Override
protected void tearDown() throws Exception {
_service.removeObject(STORE_NAME, OBJECT_ID);
_service.removeStore(STORE_NAME);
super.tearDown();
}
/** Abstract SuperClass of Writer. */
private abstract class AbstractReaderWriter {
protected final String _storeName;
protected final String _objectId;
protected final String _data;
/** Constructor. */
public AbstractReaderWriter(final String storeName, final String objectId, final String data) {
_storeName = storeName;
_objectId = objectId;
_data = data;
}
/** waits for the starting flag to be waved. */
protected void waitForGo() {
try {
_countDownLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/** writes data to an object in the store via {@link ObjectStoreService#putObject(String, String, byte[])}. */
private class PutWriter extends AbstractReaderWriter implements Callable<String> {
/** Constructs a new PutWriter. */
public PutWriter(final String storeName, final String objectId, final String data) {
super(storeName, objectId, data);
}
/** {@inheritDoc} */
@Override
public String call() {
waitForGo();
try {
ObjectStoreRetryUtil.retryPutObject(_service, _storeName, _objectId, _data.getBytes());
return _data;
} catch (final ObjectStoreException e) {
e.printStackTrace();
return null;
}
}
}
/** writes data to an object in the store via {@link ObjectStoreService#appendToObject(String, String, byte[])}. */
private class AppendWriter extends AbstractReaderWriter implements Callable<String> {
/** Constructs a new AppendWriter. */
public AppendWriter(final String storeName, final String objectId, final String data) {
super(storeName, objectId, data);
}
/** {@inheritDoc} */
@Override
public String call() {
waitForGo();
try {
_service.appendToObject(_storeName, _objectId, _data.getBytes());
return _data;
} catch (final ObjectStoreException e) {
e.printStackTrace();
return null;
}
}
}
/** writes data to an object in the store via {@link ObjectStoreService#writeObject(String, String)}. */
private class StreamWriter extends AbstractReaderWriter implements Callable<String> {
/** Constructs a new StreamWriter. */
public StreamWriter(final String storeName, final String objectId, final String data) {
super(storeName, objectId, data);
}
/** {@inheritDoc} */
@Override
public String call() {
waitForGo();
try {
final StoreOutputStream sos = _service.writeObject(_storeName, _objectId);
sos.write(_data.getBytes());
sos.close();
return _data;
} catch (final ObjectStoreException e) {
e.printStackTrace();
return null;
} catch (final IOException e) {
e.printStackTrace();
return null;
}
}
}
/** reads data from an object in the store via {@link ObjectStoreService#getObject(String, String)}. */
private class GetReader extends AbstractReaderWriter implements Callable<String> {
/** Constructs a new GetReader. */
public GetReader(final String storeName, final String objectId) {
super(storeName, objectId, null);
}
/** {@inheritDoc} */
@Override
public String call() {
waitForGo();
try {
final String data = new String(_service.getObject(_storeName, _objectId));
return data;
} catch (final ObjectStoreException e) {
e.printStackTrace();
return null;
}
}
}
/** reads data from an object in the store via {@link ObjectStoreService#readObject(String, String)}. */
private class StreamReader extends AbstractReaderWriter implements Callable<String> {
/** Constructs a new StreamReader. */
public StreamReader(final String storeName, final String objectId) {
super(storeName, objectId, null);
}
/** {@inheritDoc} */
@Override
public String call() {
waitForGo();
BufferedInputStream bis = null;
try {
bis = new BufferedInputStream(_service.readObject(_storeName, _objectId));
return IOUtils.toString(bis);
} catch (final ObjectStoreException e) {
e.printStackTrace();
} catch (final IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(bis);
}
return null;
}
}
/**
* tests concurrent {@link ObjectStoreService#putObject(String, String, byte[])} calls.
*
* @throws InterruptedException
* execution failed.
* @throws ExecutionException
* a writer experienced an exception
* @throws ObjectStoreException
*/
public void testConcurrentPutAndGet() throws InterruptedException, ExecutionException, ObjectStoreException {
final ExecutorService executor = Executors.newFixedThreadPool(50);
final int numberOfTasks = 250;
final List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < numberOfTasks; i++) {
final PutWriter writer =
new PutWriter(STORE_NAME, OBJECT_ID, TEST_DATA_TEMPLATE.replace("$number", String.valueOf(i)));
futures.add(executor.submit(writer));
}
// now really start the action:
_countDownLatch.countDown();
// and wait for every task to finish.
for (final Future<String> future : futures) {
assertNotNull(future.get());
// assert there's only one visible object...
final Collection<StoreObject> objects = _service.getStoreObjectInfos(STORE_NAME);
assertTrue(objects.size() == 1);
assertEquals(OBJECT_ID, objects.iterator().next().getId());
}
validateResultObject();
final String storedObject = new String(_service.getObject(STORE_NAME, OBJECT_ID));
// now start the readers...
_countDownLatch = new CountDownLatch(1);
futures.clear();
for (int i = 0; i < numberOfTasks; i++) {
final GetReader reader = new GetReader(STORE_NAME, OBJECT_ID);
futures.add(executor.submit(reader));
}
// now really start the action:
_countDownLatch.countDown();
// and wait for every task to finish.
for (final Future<String> future : futures) {
final String result = future.get();
assertNotNull(result);
assertEquals(storedObject, result);
}
executor.shutdown();
}
/**
* tests concurrent {@link ObjectStoreService#writeObject(String, String)} calls.
*
* @throws InterruptedException
* execution failed.
* @throws ExecutionException
* a writer experienced an exception
* @throws ObjectStoreException
*/
public void testConcurrentStreamWriteAndRead() throws InterruptedException, ExecutionException,
ObjectStoreException {
final ExecutorService executor = Executors.newFixedThreadPool(50);
final int numberOfTasks = 250;
final List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < numberOfTasks; i++) {
final StreamWriter writer =
new StreamWriter(STORE_NAME, OBJECT_ID, TEST_DATA_TEMPLATE.replace("$number", String.valueOf(i)));
futures.add(executor.submit(writer));
}
// now really start the action:
_countDownLatch.countDown();
// and wait for every task to finish.
for (final Future<String> future : futures) {
assertNotNull(future.get());
}
validateResultObject();
final String storedObject = new String(_service.getObject(STORE_NAME, OBJECT_ID));
// now start the readers...
_countDownLatch = new CountDownLatch(1);
futures.clear();
for (int i = 0; i < numberOfTasks; i++) {
final StreamReader reader = new StreamReader(STORE_NAME, OBJECT_ID);
futures.add(executor.submit(reader));
}
// now really start the action:
_countDownLatch.countDown();
// and wait for every task to finish.
for (final Future<String> future : futures) {
final String result = future.get();
assertNotNull(result);
assertEquals(storedObject, result);
}
executor.shutdown();
}
/**
* tests concurrent {@link ObjectStoreService#appendToObject(String, String, byte[])} calls.
*
* @throws InterruptedException
* execution failed.
* @throws ExecutionException
* a writer experienced an exception
* @throws ObjectStoreException
*/
public void testConcurrentAppend() throws InterruptedException, ExecutionException, ObjectStoreException {
final ExecutorService executor = Executors.newFixedThreadPool(50);
final int numberOfTasks = 100;
final List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < numberOfTasks; i++) {
final AppendWriter writer =
new AppendWriter(STORE_NAME, OBJECT_ID, TEST_DATA_TEMPLATE.replace("$number", String.valueOf(i)));
futures.add(executor.submit(writer));
}
executor.shutdown();
// now really start the action:
_countDownLatch.countDown();
// and wait for every task to finish.
for (final Future<String> future : futures) {
assertNotNull(future.get());
}
validateAppendedResultObject(numberOfTasks);
}
/** validates an object that include one single result. */
protected void validateResultObject() throws ObjectStoreException {
// now check if the file has not been corrupted and contains only one valid result.
final String result = new String(_service.getObject(STORE_NAME, OBJECT_ID));
final Pattern dataPattern = Pattern.compile(TEST_DATA_PATTERN);
final Matcher matcher = dataPattern.matcher(result);
assertTrue(matcher.find());
assertEquals(matcher.group(1), matcher.group(2));
assertEquals(0, matcher.start());
assertEquals(result.length(), matcher.end());
}
/** validates an object that include more than one result. */
protected void validateAppendedResultObject(final int noOfRecords) throws ObjectStoreException {
// now check if the file has not been corrupted and contains only one valid result.
final String result = new String(_service.getObject(STORE_NAME, OBJECT_ID));
final Pattern dataPattern = Pattern.compile(TEST_DATA_PATTERN);
final Matcher matcher = dataPattern.matcher(result);
final Collection<String> numberStrings = new HashSet<String>();
for (int i = 0; i < noOfRecords; i++) {
assertTrue(matcher.find());
assertEquals(matcher.group(1), matcher.group(2));
numberStrings.add(matcher.group(1));
}
assertEquals(noOfRecords, numberStrings.size());
}
}