blob: d6af303427e3af8921ca7e59cbc289137bdd8204 [file] [log] [blame]
/**
* Copyright (c) 2011, 2015 - Lunifera GmbH (Gross Enzersdorf, Austria), Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Florian Pirchner - Initial implementation
*/
package org.eclipse.osbp.runtime.common.session;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.eclipse.osbp.runtime.common.session.ISessionManager.FutureResult;
import org.junit.BeforeClass;
import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
public class SessionManagerTest {
private static BundleContext bc;
private static ISessionManager sessionManager;
@BeforeClass
public static void setup() {
bc = FrameworkUtil.getBundle(SessionManagerTest.class).getBundleContext();
ServiceReference<ISessionManager> ref = bc.getServiceReference(ISessionManager.class);
sessionManager = bc.getService(ref);
assertNotNull(sessionManager);
bc.registerService(ISession.class, new Session("1"), null);
bc.registerService(ISession.class, new Session("2"), null);
bc.registerService(ISession.class, new Session("3"), null);
bc.registerService(ISession.class, new Session("4"), null);
}
private CountDownLatch latch;
private AtomicInteger count;
private AtomicInteger error;
private AtomicInteger generalCount;
@Test
public void test_getSessions() {
List<ISession> result = sessionManager.getSessions((t) -> t.get("id") != null);
assertEquals(4, result.size());
List<ISession> result2 = sessionManager.getSessions((t) -> t.get("id").equals("2"));
assertEquals(1, result2.size());
}
@Test
public void test_getSessionsAsync() {
CompletableFuture<List<ISession>> result = sessionManager.getSessionsAsync((t) -> t.get("id") != null);
result.thenAccept((sessions) -> {
assertEquals(4, sessions.size());
});
}
@Test
public void test_asyncEach_Simple() {
latch = new CountDownLatch(2);
count = new AtomicInteger();
error = new AtomicInteger();
Set<String> filterIds = Collections.synchronizedSet(new HashSet<>());
filterIds.add("1");
filterIds.add("3");
List<CompletableFuture<FutureResult<String>>> results = sessionManager.asyncEach((s) -> {
return (String) s.get("id");
}, (r) -> {
count.incrementAndGet();
latch.countDown();
if (!r.isError) {
filterIds.remove(r.value);
}
}, (sessionFilter) -> filterIds.contains(sessionFilter.get("id")));
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertEquals(2, count.get());
assertEquals(0, error.get());
assertEquals(0, filterIds.size());
}
@SuppressWarnings("unused")
@Test
public void test_asyncEach_WithException() {
latch = new CountDownLatch(4);
generalCount = new AtomicInteger();
count = new AtomicInteger();
error = new AtomicInteger();
List<CompletableFuture<FutureResult<String>>> results = sessionManager.asyncEach((s) -> {
int _c = generalCount.incrementAndGet();
if (_c % 2 == 0) {
throw new IllegalStateException();
}
return (String) s.get("id");
}, (r) -> {
if (!r.isError) {
count.incrementAndGet();
} else {
error.incrementAndGet();
}
latch.countDown();
});
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertEquals(2, count.get());
assertEquals(2, error.get());
}
@Test
public void test_asyncAll_Simple() {
latch = new CountDownLatch(2);
count = new AtomicInteger();
error = new AtomicInteger();
Set<String> filterIds = Collections.synchronizedSet(new HashSet<>());
filterIds.add("1");
filterIds.add("3");
CompletableFuture<List<FutureResult<String>>> result = sessionManager.asyncAll((s) -> {
return (String) s.get("id");
}, (sessionFilter) -> filterIds.contains(sessionFilter.get("id")));
result.thenAccept((rl) -> {
for (FutureResult<String> r : rl) {
count.incrementAndGet();
latch.countDown();
if (!r.isError) {
filterIds.remove(r.value);
}
}
});
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertEquals(2, count.get());
assertEquals(0, error.get());
assertEquals(0, filterIds.size());
}
@Test
public void test_asyncAll_WithException() {
latch = new CountDownLatch(4);
generalCount = new AtomicInteger();
count = new AtomicInteger();
error = new AtomicInteger();
CompletableFuture<List<FutureResult<String>>> result = sessionManager.asyncAll((s) -> {
int _c = generalCount.incrementAndGet();
if (_c % 2 == 0) {
throw new IllegalStateException();
}
return (String) s.get("id");
});
result.thenAccept((rlist) -> {
for (FutureResult<String> r : rlist) {
if (!r.isError) {
count.incrementAndGet();
} else {
error.incrementAndGet();
}
latch.countDown();
}
});
try {
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail();
}
assertEquals(2, count.get());
assertEquals(2, error.get());
}
public static class Session extends AbstractSession {
final Map<String, Object> props = new HashMap<>();
public Session(String id) {
super();
props.put("id", id);
}
@SuppressWarnings("unchecked")
@Override
public <T> T get(Class<T> key) {
return (T) props.get(key.getName());
}
@Override
public Object get(String key) {
return props.get(key);
}
@Override
protected <T> CompletableFuture<T> doAsync(final Function<ISession, T> function, ExecutorService executor) {
CompletableFuture<T> promise = new CompletableFuture<>();
runAsync(function, promise);
return promise;
}
protected <T> void runAsync(final Function<ISession, T> function, CompletableFuture<T> promise) {
new Thread(() -> {
try {
T value = function.apply(this);
promise.complete(value);
} catch (Exception e) {
promise.completeExceptionally(e);
}
}).start();
}
}
}