/******************************************************************************* | |
* Copyright (c) 2008, 2011 Attensity Europe GmbH and brox IT Solutions GmbH. All rights reserved. This program and the | |
* accompanying materials are made available under the terms of the Eclipse Public License v1.0 which accompanies this | |
* distribution, and is available at http://www.eclipse.org/legal/epl-v10.html | |
* | |
* Contributors: Andreas Schank (Attensity Europe GmbH) - initial implementation | |
**********************************************************************************************************************/ | |
package org.eclipse.smila.objectstore.filesystem.test; | |
import java.io.BufferedInputStream; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import org.apache.commons.io.IOUtils; | |
import org.eclipse.smila.objectstore.ObjectStoreException; | |
import org.eclipse.smila.objectstore.ObjectStoreService; | |
import org.eclipse.smila.objectstore.StoreObject; | |
import org.eclipse.smila.objectstore.StoreOutputStream; | |
import org.eclipse.smila.objectstore.util.ObjectStoreRetryUtil; | |
import org.eclipse.smila.test.DeclarativeServiceTestCase; | |
/** | |
* Tests concurrent access to the ObjectStoreService. | |
*/ | |
public class TestConcurrentAccess extends DeclarativeServiceTestCase { | |
/** name of the test store. */ | |
private static final String STORE_NAME = "testconcurrentaccess-store"; | |
/** object id to test with. */ | |
private static final String OBJECT_ID = "1/2/3/4/testconcurrentaccess-object"; | |
/** test data template. */ | |
private static final String TEST_DATA_TEMPLATE = | |
"{ \"version\": \"$number\", \"content\": \"Lorem ipsum number $number\" }"; | |
/** test data pattern. */ | |
private static final String TEST_DATA_PATTERN = | |
"\\{ \"version\": \"([0-9]*)\", \"content\": \"Lorem ipsum number ([0-9]*)\" \\}"; | |
/** the test instance of the service. */ | |
private ObjectStoreService _service; | |
/** CountDouwnLatch to wait for synchronized go. */ | |
private CountDownLatch _countDownLatch; | |
/** {@inheritDoc} */ | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
_service = getService(ObjectStoreService.class); | |
assertNotNull(_service); | |
_service.ensureStore(STORE_NAME); | |
_service.removeObject(STORE_NAME, OBJECT_ID); | |
_countDownLatch = new CountDownLatch(1); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
protected void tearDown() throws Exception { | |
_service.removeObject(STORE_NAME, OBJECT_ID); | |
_service.removeStore(STORE_NAME); | |
super.tearDown(); | |
} | |
/** Abstract SuperClass of Writer. */ | |
private abstract class AbstractReaderWriter { | |
protected final String _storeName; | |
protected final String _objectId; | |
protected final String _data; | |
/** Constructor. */ | |
public AbstractReaderWriter(final String storeName, final String objectId, final String data) { | |
_storeName = storeName; | |
_objectId = objectId; | |
_data = data; | |
} | |
/** waits for the starting flag to be waved. */ | |
protected void waitForGo() { | |
try { | |
_countDownLatch.await(); | |
} catch (final InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
/** writes data to an object in the store via {@link ObjectStoreService#putObject(String, String, byte[])}. */ | |
private class PutWriter extends AbstractReaderWriter implements Callable<String> { | |
/** Constructs a new PutWriter. */ | |
public PutWriter(final String storeName, final String objectId, final String data) { | |
super(storeName, objectId, data); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public String call() { | |
waitForGo(); | |
try { | |
ObjectStoreRetryUtil.retryPutObject(_service, _storeName, _objectId, _data.getBytes()); | |
return _data; | |
} catch (final ObjectStoreException e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
} | |
/** writes data to an object in the store via {@link ObjectStoreService#appendToObject(String, String, byte[])}. */ | |
private class AppendWriter extends AbstractReaderWriter implements Callable<String> { | |
/** Constructs a new AppendWriter. */ | |
public AppendWriter(final String storeName, final String objectId, final String data) { | |
super(storeName, objectId, data); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public String call() { | |
waitForGo(); | |
try { | |
_service.appendToObject(_storeName, _objectId, _data.getBytes()); | |
return _data; | |
} catch (final ObjectStoreException e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
} | |
/** writes data to an object in the store via {@link ObjectStoreService#writeObject(String, String)}. */ | |
private class StreamWriter extends AbstractReaderWriter implements Callable<String> { | |
/** Constructs a new StreamWriter. */ | |
public StreamWriter(final String storeName, final String objectId, final String data) { | |
super(storeName, objectId, data); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public String call() { | |
waitForGo(); | |
try { | |
final StoreOutputStream sos = _service.writeObject(_storeName, _objectId); | |
sos.write(_data.getBytes()); | |
sos.close(); | |
return _data; | |
} catch (final ObjectStoreException e) { | |
e.printStackTrace(); | |
return null; | |
} catch (final IOException e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
} | |
/** reads data from an object in the store via {@link ObjectStoreService#getObject(String, String)}. */ | |
private class GetReader extends AbstractReaderWriter implements Callable<String> { | |
/** Constructs a new GetReader. */ | |
public GetReader(final String storeName, final String objectId) { | |
super(storeName, objectId, null); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public String call() { | |
waitForGo(); | |
try { | |
final String data = new String(_service.getObject(_storeName, _objectId)); | |
return data; | |
} catch (final ObjectStoreException e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
} | |
/** reads data from an object in the store via {@link ObjectStoreService#readObject(String, String)}. */ | |
private class StreamReader extends AbstractReaderWriter implements Callable<String> { | |
/** Constructs a new StreamReader. */ | |
public StreamReader(final String storeName, final String objectId) { | |
super(storeName, objectId, null); | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public String call() { | |
waitForGo(); | |
BufferedInputStream bis = null; | |
try { | |
bis = new BufferedInputStream(_service.readObject(_storeName, _objectId)); | |
return IOUtils.toString(bis); | |
} catch (final ObjectStoreException e) { | |
e.printStackTrace(); | |
} catch (final IOException e) { | |
e.printStackTrace(); | |
} finally { | |
IOUtils.closeQuietly(bis); | |
} | |
return null; | |
} | |
} | |
/** | |
* tests concurrent {@link ObjectStoreService#putObject(String, String, byte[])} calls. | |
* | |
* @throws InterruptedException | |
* execution failed. | |
* @throws ExecutionException | |
* a writer experienced an exception | |
* @throws ObjectStoreException | |
*/ | |
public void testConcurrentPutAndGet() throws InterruptedException, ExecutionException, ObjectStoreException { | |
final ExecutorService executor = Executors.newFixedThreadPool(50); | |
final int numberOfTasks = 250; | |
final List<Future<String>> futures = new ArrayList<Future<String>>(); | |
for (int i = 0; i < numberOfTasks; i++) { | |
final PutWriter writer = | |
new PutWriter(STORE_NAME, OBJECT_ID, TEST_DATA_TEMPLATE.replace("$number", String.valueOf(i))); | |
futures.add(executor.submit(writer)); | |
} | |
// now really start the action: | |
_countDownLatch.countDown(); | |
// and wait for every task to finish. | |
for (final Future<String> future : futures) { | |
assertNotNull(future.get()); | |
// assert there's only one visible object... | |
final Collection<StoreObject> objects = _service.getStoreObjectInfos(STORE_NAME); | |
assertTrue(objects.size() == 1); | |
assertEquals(OBJECT_ID, objects.iterator().next().getId()); | |
} | |
validateResultObject(); | |
final String storedObject = new String(_service.getObject(STORE_NAME, OBJECT_ID)); | |
// now start the readers... | |
_countDownLatch = new CountDownLatch(1); | |
futures.clear(); | |
for (int i = 0; i < numberOfTasks; i++) { | |
final GetReader reader = new GetReader(STORE_NAME, OBJECT_ID); | |
futures.add(executor.submit(reader)); | |
} | |
// now really start the action: | |
_countDownLatch.countDown(); | |
// and wait for every task to finish. | |
for (final Future<String> future : futures) { | |
final String result = future.get(); | |
assertNotNull(result); | |
assertEquals(storedObject, result); | |
} | |
executor.shutdown(); | |
} | |
/** | |
* tests concurrent {@link ObjectStoreService#writeObject(String, String)} calls. | |
* | |
* @throws InterruptedException | |
* execution failed. | |
* @throws ExecutionException | |
* a writer experienced an exception | |
* @throws ObjectStoreException | |
*/ | |
public void testConcurrentStreamWriteAndRead() throws InterruptedException, ExecutionException, | |
ObjectStoreException { | |
final ExecutorService executor = Executors.newFixedThreadPool(50); | |
final int numberOfTasks = 250; | |
final List<Future<String>> futures = new ArrayList<Future<String>>(); | |
for (int i = 0; i < numberOfTasks; i++) { | |
final StreamWriter writer = | |
new StreamWriter(STORE_NAME, OBJECT_ID, TEST_DATA_TEMPLATE.replace("$number", String.valueOf(i))); | |
futures.add(executor.submit(writer)); | |
} | |
// now really start the action: | |
_countDownLatch.countDown(); | |
// and wait for every task to finish. | |
for (final Future<String> future : futures) { | |
assertNotNull(future.get()); | |
} | |
validateResultObject(); | |
final String storedObject = new String(_service.getObject(STORE_NAME, OBJECT_ID)); | |
// now start the readers... | |
_countDownLatch = new CountDownLatch(1); | |
futures.clear(); | |
for (int i = 0; i < numberOfTasks; i++) { | |
final StreamReader reader = new StreamReader(STORE_NAME, OBJECT_ID); | |
futures.add(executor.submit(reader)); | |
} | |
// now really start the action: | |
_countDownLatch.countDown(); | |
// and wait for every task to finish. | |
for (final Future<String> future : futures) { | |
final String result = future.get(); | |
assertNotNull(result); | |
assertEquals(storedObject, result); | |
} | |
executor.shutdown(); | |
} | |
/** | |
* tests concurrent {@link ObjectStoreService#appendToObject(String, String, byte[])} calls. | |
* | |
* @throws InterruptedException | |
* execution failed. | |
* @throws ExecutionException | |
* a writer experienced an exception | |
* @throws ObjectStoreException | |
*/ | |
public void testConcurrentAppend() throws InterruptedException, ExecutionException, ObjectStoreException { | |
final ExecutorService executor = Executors.newFixedThreadPool(50); | |
final int numberOfTasks = 100; | |
final List<Future<String>> futures = new ArrayList<Future<String>>(); | |
for (int i = 0; i < numberOfTasks; i++) { | |
final AppendWriter writer = | |
new AppendWriter(STORE_NAME, OBJECT_ID, TEST_DATA_TEMPLATE.replace("$number", String.valueOf(i))); | |
futures.add(executor.submit(writer)); | |
} | |
executor.shutdown(); | |
// now really start the action: | |
_countDownLatch.countDown(); | |
// and wait for every task to finish. | |
for (final Future<String> future : futures) { | |
assertNotNull(future.get()); | |
} | |
validateAppendedResultObject(numberOfTasks); | |
} | |
/** validates an object that include one single result. */ | |
protected void validateResultObject() throws ObjectStoreException { | |
// now check if the file has not been corrupted and contains only one valid result. | |
final String result = new String(_service.getObject(STORE_NAME, OBJECT_ID)); | |
final Pattern dataPattern = Pattern.compile(TEST_DATA_PATTERN); | |
final Matcher matcher = dataPattern.matcher(result); | |
assertTrue(matcher.find()); | |
assertEquals(matcher.group(1), matcher.group(2)); | |
assertEquals(0, matcher.start()); | |
assertEquals(result.length(), matcher.end()); | |
} | |
/** validates an object that include more than one result. */ | |
protected void validateAppendedResultObject(final int noOfRecords) throws ObjectStoreException { | |
// now check if the file has not been corrupted and contains only one valid result. | |
final String result = new String(_service.getObject(STORE_NAME, OBJECT_ID)); | |
final Pattern dataPattern = Pattern.compile(TEST_DATA_PATTERN); | |
final Matcher matcher = dataPattern.matcher(result); | |
final Collection<String> numberStrings = new HashSet<String>(); | |
for (int i = 0; i < noOfRecords; i++) { | |
assertTrue(matcher.find()); | |
assertEquals(matcher.group(1), matcher.group(2)); | |
numberStrings.add(matcher.group(1)); | |
} | |
assertEquals(noOfRecords, numberStrings.size()); | |
} | |
} |