blob: 6b35744df37e2a28f7534994cd205866f9cb658a [file] [log] [blame]
package org.eclipse.osbp.runtime.common.session;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.eclipse.osbp.runtime.common.session.ISessionManager.FutureResult;
import org.junit.Before;
import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
public class SessionTest {
private BundleContext bc;
@Before
public void setup() {
bc = FrameworkUtil.getBundle(getClass()).getBundleContext();
}
@Test
public void test_apply() {
Session session = new Session("1");
String value = session.apply((s) -> {
return "Foo";
});
assertEquals("Foo", value);
}
@Test
public void test_async() {
final CountDownLatch latch = new CountDownLatch(1);
Session session = new Session("1");
CompletableFuture<FutureResult<String>> promise = session.async((s) -> {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
}
return "Foo";
}, null);
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertTrue(promise.isDone());
}
@Test
public void test_async_Success() {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();
final AtomicBoolean error = new AtomicBoolean();
Session session = new Session("1");
CompletableFuture<FutureResult<String>> promise = session.async((s) -> {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
}
return "Foo";
}, (r) -> {
if (r.isError) {
error.set(true);
} else {
if (r.value.equals("Foo")) {
success.set(true);
}
}
});
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertTrue(promise.isDone());
assertTrue(success.get());
assertFalse(error.get());
}
@Test
public void test_async_Error() {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();
final AtomicBoolean error = new AtomicBoolean();
Session session = new Session("1");
CompletableFuture<FutureResult<String>> promise = session.async((s) -> {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
}
throw new IllegalArgumentException();
}, (r) -> {
if (r.isError) {
if (r.error != null) {
error.set(true);
}
} else {
success.set(true);
}
});
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertTrue(promise.isDone());
assertFalse(success.get());
assertTrue(error.get());
}
public static class Session extends AbstractSession {
final Map<String, Object> props = new HashMap<>();
public Session(String id) {
super();
props.put("id", id);
}
@SuppressWarnings("unchecked")
@Override
public <T> T get(Class<T> key) {
return (T) props.get(key.getName());
}
@Override
public Object get(String key) {
return props.get(key);
}
@Override
protected <T> CompletableFuture<T> doAsync(final Function<ISession, T> function, ExecutorService executor) {
CompletableFuture<T> promise = new CompletableFuture<>();
runAsync(function, promise);
return promise;
}
protected <T> void runAsync(final Function<ISession, T> function, CompletableFuture<T> promise) {
new Thread(() -> {
try {
T value = function.apply(this);
promise.complete(value);
} catch (Exception e) {
promise.completeExceptionally(e);
}
}).start();
}
}
}