blob: 73a9591ec8124412fc70dbba6325cfbd56eaf749 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2017 Tasktop Technologies and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Tasktop Technologies - initial API and implementation
*******************************************************************************/
package org.eclipse.mylyn.internal.tasks.core.data;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.mylyn.internal.tasks.core.TaskRepositoryManager;
import org.eclipse.mylyn.tasks.core.TaskRepository;
import org.eclipse.mylyn.tasks.core.data.ITaskDataWorkingCopy;
import org.eclipse.mylyn.tasks.core.data.TaskAttributeMapper;
import org.eclipse.mylyn.tasks.core.data.TaskData;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.xml.sax.SAXException;
import com.google.common.base.Charsets;
import com.google.common.io.ByteStreams;
public class TaskDataStoreTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private static final int THREAD_COUNT = 10;
private static final long DELAY = 100;
private static final String DATA_XML_CONTENT = "test";
private static final TaskData TASK_DATA = newTaskData();
private static final TaskDataState TEST_STATE = new TaskDataState("connectorKind", "repositoryUrl", "taskId");
@Test
public void getTaskDataStateByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(getTaskDataState(file));
assertNoFailures(failures);
}
@Test
public void discardEditsByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(discardEdits(file));
assertNoFailures(failures);
}
@Test
public void putEditsByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(putEdits(file));
assertNoFailures(failures);
}
@Test
public void putTaskDataSetLastReadUserByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(putTaskData(file));
assertNoFailures(failures);
}
@Test
public void setTaskDataByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(setTaskData(file));
assertNoFailures(failures);
}
@Test
public void putTaskDataByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(putTaskDataState(file));
assertNoFailures(failures);
}
@Test
public void deleteTaskDataByMultipleThreads() throws Exception {
File file = newTaskDataZipFile();
List<Throwable> failures = runSameByMultipleThreads(deleteTaskData(file));
assertNoFailures(failures);
}
@Test
public void manipulateTaskDataConcurrently() throws Exception {
File file = newTaskDataZipFile();
// run multiple times to increase chance of a failure
for (int i = 0; i <= 10; i++) {
List<Throwable> failures = runConcurrently( //
getTaskDataState(file), //
discardEdits(file), //
putEdits(file), //
putTaskData(file), //
setTaskData(file), //
putTaskDataState(file), //
deleteTaskData(file));
assertNoFailures(failures);
}
}
@Test
public void manipulateTaskDataByFixedNumberOfThreads() throws Exception {
File file = newTaskDataZipFile();
TaskDataStore store = newTaskDataStore();
List<Throwable> failures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Thread> threads = new ArrayList<>();
threads.addAll(Collections.nCopies(3, thread(getTaskDataState(file), store, Optional.empty(), failures).get()));
threads.addAll(Collections.nCopies(3, thread(discardEdits(file), store, Optional.empty(), failures).get()));
threads.addAll(Collections.nCopies(3, thread(putEdits(file), store, Optional.empty(), failures).get()));
threads.addAll(Collections.nCopies(3, thread(putTaskData(file), store, Optional.empty(), failures).get()));
threads.addAll(Collections.nCopies(3, thread(setTaskData(file), store, Optional.empty(), failures).get()));
threads.addAll(Collections.nCopies(3, thread(putTaskDataState(file), store, Optional.empty(), failures).get()));
threads.addAll(Collections.nCopies(3, thread(deleteTaskData(file), store, Optional.empty(), failures).get()));
Collections.shuffle(threads);
threads.stream().forEach(thread -> executor.submit(thread));
executor.shutdown();
assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS));
assertNoFailures(failures);
}
private static TaskDataStore newTaskDataStore() {
TaskRepositoryManager manager = new TaskRepositoryManager();
TaskDataExternalizer externalizer = new TaskDataExternalizer(manager) {
@Override
public TaskDataState readState(InputStream in) throws IOException, SAXException {
// read the input stream fully but do not return the result, not relevant for the purpose of the test
assertEquals(DATA_XML_CONTENT, new String(ByteStreams.toByteArray(in), Charsets.UTF_8));
in.close();
// pretend that reading state takes more than the blink of an eye
sleep();
return TEST_STATE;
};
@Override
public void writeState(OutputStream out, ITaskDataWorkingCopy state) throws IOException {
out.write(DATA_XML_CONTENT.getBytes(Charsets.UTF_8));
// pretend that writing state takes more than the blink of an eye
sleep();
}
};
return new TaskDataStore(externalizer);
}
private static void sleep() {
try {
Thread.sleep(DELAY);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private File newTaskDataZipFile() throws IOException {
File file = folder.newFile("test.zip");
try (ZipOutputStream outputStream = new ZipOutputStream(new FileOutputStream(file))) {
ZipEntry entry = new ZipEntry("data.xml");
outputStream.putNextEntry(entry);
byte[] data = DATA_XML_CONTENT.getBytes();
outputStream.write(data, 0, data.length);
outputStream.closeEntry();
}
return file;
}
private static TaskData newTaskData() {
TaskRepository repository = new TaskRepository("connectorKind", "repositoryUrl");
TaskAttributeMapper mapper = new TaskAttributeMapper(repository);
return new TaskData(mapper, "connectorKind", "repositoryUrl", "taskId");
}
@FunctionalInterface
public interface ConsumerWithCoreException<T> {
public void accept(T t) throws CoreException;
}
private ConsumerWithCoreException<TaskDataStore> getTaskDataState(File file) {
return store -> store.getTaskDataState(file);
}
private ConsumerWithCoreException<TaskDataStore> discardEdits(File file) {
return store -> store.discardEdits(file);
}
private ConsumerWithCoreException<TaskDataStore> putEdits(File file) {
return store -> store.putEdits(file, TASK_DATA);
}
private ConsumerWithCoreException<TaskDataStore> putTaskData(File file) {
return store -> store.putTaskData(file, TASK_DATA, true, true);
}
private ConsumerWithCoreException<TaskDataStore> setTaskData(File file) {
return store -> store.setTaskData(file, TASK_DATA);
}
private ConsumerWithCoreException<TaskDataStore> putTaskDataState(File file) {
return store -> store.putTaskData(file, TEST_STATE);
}
private ConsumerWithCoreException<TaskDataStore> deleteTaskData(File file) {
return store -> store.deleteTaskData(file);
}
private static List<Throwable> runSameByMultipleThreads(ConsumerWithCoreException<TaskDataStore> consumer)
throws Exception {
TaskDataStore store = newTaskDataStore();
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT + 1);
List<Throwable> failures = new ArrayList<>(THREAD_COUNT);
List<Thread> threads = Stream.generate(thread(consumer, store, Optional.of(barrier), failures))
.limit(THREAD_COUNT)
.collect(toList());
startThreadsSimultaneously(barrier, threads);
return failures;
}
@SafeVarargs
private static List<Throwable> runConcurrently(ConsumerWithCoreException<TaskDataStore>... consumers)
throws Exception {
TaskDataStore store = newTaskDataStore();
CyclicBarrier barrier = new CyclicBarrier(consumers.length + 1);
List<Throwable> failures = new ArrayList<>(consumers.length);
List<Thread> threads = Arrays.asList(consumers)
.stream()
.map(c -> thread(c, store, Optional.of(barrier), failures).get())
.collect(toList());
startThreadsSimultaneously(barrier, threads);
return failures;
}
private static void startThreadsSimultaneously(CyclicBarrier barrier, List<Thread> threads) throws Exception {
threads.stream().forEach(Thread::start);
barrier.await();
for (Thread thread : threads) {
thread.join();
}
}
private static Supplier<Thread> thread(ConsumerWithCoreException<TaskDataStore> consumer, TaskDataStore store,
Optional<CyclicBarrier> barrier, List<Throwable> failures) {
return new Supplier<Thread>() {
@Override
public Thread get() {
return new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.ifPresent(b -> {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
consumer.accept(store);
} catch (Throwable e) {
failures.add(e);
}
}
});
}
};
}
private static void assertNoFailures(List<Throwable> failures) {
assertTrue(format("expected no failures but found %d:\n%s", failures.size(), collectMessages(failures)),
failures.isEmpty());
}
private static String collectMessages(List<Throwable> failures) {
return failures.stream().map(e -> e.getClass().getSimpleName()).collect(Collectors.joining("\n"));
}
}