| package org.eclipse.osbp.runtime.common.session; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.fail; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Function; |
| |
| import org.eclipse.osbp.runtime.common.session.ISessionManager.FutureResult; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.osgi.framework.BundleContext; |
| import org.osgi.framework.FrameworkUtil; |
| import org.osgi.framework.ServiceReference; |
| |
| public class SessionManagerTest { |
| |
| private static BundleContext bc; |
| private static ISessionManager sessionManager; |
| |
| @BeforeClass |
| public static void setup() { |
| bc = FrameworkUtil.getBundle(SessionManagerTest.class).getBundleContext(); |
| |
| ServiceReference<ISessionManager> ref = bc.getServiceReference(ISessionManager.class); |
| sessionManager = bc.getService(ref); |
| assertNotNull(sessionManager); |
| |
| bc.registerService(ISession.class, new Session("1"), null); |
| bc.registerService(ISession.class, new Session("2"), null); |
| bc.registerService(ISession.class, new Session("3"), null); |
| bc.registerService(ISession.class, new Session("4"), null); |
| } |
| |
| private CountDownLatch latch; |
| private AtomicInteger count; |
| private AtomicInteger error; |
| private AtomicInteger generalCount; |
| |
| @Test |
| public void test_getSessions() { |
| List<ISession> result = sessionManager.getSessions((t) -> t.get("id") != null); |
| assertEquals(4, result.size()); |
| |
| List<ISession> result2 = sessionManager.getSessions((t) -> t.get("id").equals("2")); |
| assertEquals(1, result2.size()); |
| } |
| |
| @Test |
| public void test_getSessionsAsync() { |
| CompletableFuture<List<ISession>> result = sessionManager.getSessionsAsync((t) -> t.get("id") != null); |
| result.thenAccept((sessions) -> { |
| assertEquals(4, sessions.size()); |
| }); |
| } |
| |
| @Test |
| public void test_asyncEach_Simple() { |
| |
| latch = new CountDownLatch(2); |
| count = new AtomicInteger(); |
| error = new AtomicInteger(); |
| |
| Set<String> filterIds = Collections.synchronizedSet(new HashSet<>()); |
| filterIds.add("1"); |
| filterIds.add("3"); |
| |
| List<CompletableFuture<FutureResult<String>>> results = sessionManager.asyncEach((s) -> { |
| return (String) s.get("id"); |
| }, (r) -> { |
| count.incrementAndGet(); |
| latch.countDown(); |
| if (!r.isError) { |
| filterIds.remove(r.value); |
| } |
| }, (sessionFilter) -> filterIds.contains(sessionFilter.get("id"))); |
| |
| try { |
| latch.await(500, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| fail(); |
| } |
| |
| assertEquals(2, count.get()); |
| assertEquals(0, error.get()); |
| assertEquals(0, filterIds.size()); |
| } |
| |
| @SuppressWarnings("unused") |
| @Test |
| public void test_asyncEach_WithException() { |
| |
| latch = new CountDownLatch(4); |
| generalCount = new AtomicInteger(); |
| count = new AtomicInteger(); |
| error = new AtomicInteger(); |
| |
| List<CompletableFuture<FutureResult<String>>> results = sessionManager.asyncEach((s) -> { |
| int _c = generalCount.incrementAndGet(); |
| if (_c % 2 == 0) { |
| throw new IllegalStateException(); |
| } |
| return (String) s.get("id"); |
| }, (r) -> { |
| if (!r.isError) { |
| count.incrementAndGet(); |
| } else { |
| error.incrementAndGet(); |
| } |
| latch.countDown(); |
| }); |
| |
| try { |
| latch.await(500, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| fail(); |
| } |
| |
| assertEquals(2, count.get()); |
| assertEquals(2, error.get()); |
| } |
| |
| @Test |
| public void test_asyncAll_Simple() { |
| |
| latch = new CountDownLatch(2); |
| count = new AtomicInteger(); |
| error = new AtomicInteger(); |
| |
| Set<String> filterIds = Collections.synchronizedSet(new HashSet<>()); |
| filterIds.add("1"); |
| filterIds.add("3"); |
| |
| CompletableFuture<List<FutureResult<String>>> result = sessionManager.asyncAll((s) -> { |
| return (String) s.get("id"); |
| }, (sessionFilter) -> filterIds.contains(sessionFilter.get("id"))); |
| |
| result.thenAccept((rl) -> { |
| for (FutureResult<String> r : rl) { |
| count.incrementAndGet(); |
| latch.countDown(); |
| if (!r.isError) { |
| filterIds.remove(r.value); |
| } |
| } |
| }); |
| |
| try { |
| latch.await(500, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| fail(); |
| } |
| |
| assertEquals(2, count.get()); |
| assertEquals(0, error.get()); |
| assertEquals(0, filterIds.size()); |
| } |
| |
| @Test |
| public void test_asyncAll_WithException() { |
| |
| latch = new CountDownLatch(4); |
| generalCount = new AtomicInteger(); |
| count = new AtomicInteger(); |
| error = new AtomicInteger(); |
| |
| CompletableFuture<List<FutureResult<String>>> result = sessionManager.asyncAll((s) -> { |
| int _c = generalCount.incrementAndGet(); |
| if (_c % 2 == 0) { |
| throw new IllegalStateException(); |
| } |
| return (String) s.get("id"); |
| }); |
| |
| result.thenAccept((rlist) -> { |
| for (FutureResult<String> r : rlist) { |
| if (!r.isError) { |
| count.incrementAndGet(); |
| } else { |
| error.incrementAndGet(); |
| } |
| latch.countDown(); |
| } |
| }); |
| |
| try { |
| latch.await(500, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| fail(); |
| } |
| |
| assertEquals(2, count.get()); |
| assertEquals(2, error.get()); |
| } |
| |
| public static class Session extends AbstractSession { |
| |
| final Map<String, Object> props = new HashMap<>(); |
| |
| public Session(String id) { |
| super(); |
| props.put("id", id); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public <T> T get(Class<T> key) { |
| return (T) props.get(key.getName()); |
| } |
| |
| @Override |
| public Object get(String key) { |
| return props.get(key); |
| } |
| |
| @Override |
| protected <T> CompletableFuture<T> doAsync(final Function<ISession, T> function, ExecutorService executor) { |
| CompletableFuture<T> promise = new CompletableFuture<>(); |
| runAsync(function, promise); |
| return promise; |
| } |
| |
| protected <T> void runAsync(final Function<ISession, T> function, CompletableFuture<T> promise) { |
| new Thread(() -> { |
| try { |
| T value = function.apply(this); |
| promise.complete(value); |
| } catch (Exception e) { |
| promise.completeExceptionally(e); |
| } |
| }).start(); |
| } |
| |
| } |
| } |