| package org.eclipse.osbp.runtime.common.session.impl; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| |
| import org.eclipse.osbp.runtime.common.session.ISession; |
| import org.eclipse.osbp.runtime.common.session.ISessionManager; |
| import org.eclipse.osbp.runtime.common.session.SessionCallback; |
| import org.eclipse.osbp.runtime.common.session.SessionUtil; |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.component.annotations.ReferenceCardinality; |
| import org.osgi.service.component.annotations.ReferencePolicy; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @Component(immediate = true) |
| public class SessionManager implements ISessionManager { |
| |
| static Logger LOGGER = LoggerFactory.getLogger(SessionManager.class); |
| |
| List<ISession> sessions; |
| |
| ExecutorService executorService; |
| |
| @Activate |
| void activate() { |
| sessions = new CopyOnWriteArrayList<>(); |
| executorService = Executors.newFixedThreadPool(3, (r) -> { |
| Thread t = new Thread(r); |
| t.setDaemon(true); |
| t.setName(SessionManager.class.getName() + "-" + new Date().getTime()); |
| return t; |
| }); |
| } |
| |
| @Deactivate |
| void deactivate() { |
| sessions.clear(); |
| sessions = null; |
| |
| try { |
| executorService.awaitTermination(5, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| executorService.shutdownNow(); |
| } |
| } |
| |
| @Override |
| public List<ISession> getSessions(Predicate<ISession> filter) { |
| if (filter == null) { |
| return Collections.unmodifiableList(sessions); |
| } else { |
| List<ISession> result = new ArrayList<>(); |
| for (ISession session : sessions.toArray(new ISession[sessions.size()])) { |
| if (filter != null && !filter.test(session)) { |
| continue; |
| } |
| result.add(session); |
| } |
| return Collections.unmodifiableList(result); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<List<ISession>> getSessionsAsync(Predicate<ISession> filter) { |
| return CompletableFuture.supplyAsync(() -> getSessions(filter), executorService); |
| } |
| |
| @Override |
| public <T> CompletableFuture<List<FutureResult<T>>> asyncAll(Function<ISession, T> function) { |
| return asyncAll(function, null); |
| } |
| |
| @Override |
| public <T> CompletableFuture<List<FutureResult<T>>> asyncAll(Function<ISession, T> function, |
| Predicate<ISession> filter) { |
| |
| // create a promise |
| CompletableFuture<List<FutureResult<T>>> promise = new CompletableFuture<>(); |
| |
| // query the sessions async |
| CompletableFuture<List<ISession>> sessions = getSessionsAsync(filter); |
| |
| // if sessions queried, then call session#async for each session |
| sessions.thenCompose((_sessions) -> { |
| List<CompletableFuture<FutureResult<T>>> asyncs = collectSessionAsyncCalls(_sessions, function); |
| |
| // we need a trigger which triggers, if all s#async are done |
| CompletableFuture<Void> allOfTrigger = CompletableFuture |
| .allOf(asyncs.toArray(new CompletableFuture<?>[asyncs.size()])); |
| allOfTrigger.thenRun(() -> { |
| // now lets collect the values from the async calls |
| List<FutureResult<T>> tempResult = asyncs.stream().map((a) -> a.join()).collect(Collectors.toList()); |
| |
| // and notify the promise |
| promise.complete(tempResult); |
| }); |
| |
| return allOfTrigger; |
| }); |
| |
| return promise; |
| } |
| |
| /** |
| * Collects all async session calls and returns the resulting list. |
| * |
| * @param sessions |
| * @param function |
| * @return |
| */ |
| <T> List<CompletableFuture<FutureResult<T>>> collectSessionAsyncCalls(List<ISession> sessions, |
| Function<ISession, T> function) { |
| List<CompletableFuture<FutureResult<T>>> asyncs = sessions.stream() |
| .<CompletableFuture<FutureResult<T>>>map((s) -> s.async(function, null)).collect(Collectors.toList()); |
| return asyncs; |
| } |
| |
| @Override |
| public <T> List<CompletableFuture<FutureResult<T>>> asyncEach(Function<ISession, T> function, |
| SessionCallback<T> callback) { |
| return asyncEach(function, callback, null); |
| } |
| |
| @Override |
| public <T> List<CompletableFuture<FutureResult<T>>> asyncEach(Function<ISession, T> function, |
| SessionCallback<T> callback, Predicate<ISession> filter) { |
| |
| // query the sessions sync |
| List<ISession> sessions = getSessions(filter); |
| |
| // create future list querying the FutureResult async |
| List<CompletableFuture<FutureResult<T>>> result = sessions.stream().map((s) -> s.async(function, callback)) |
| .collect(Collectors.toList()); |
| return result; |
| } |
| |
| @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) |
| void addSession(ISession session) { |
| if (session.isSlaveSession()) { |
| // register slave |
| session.set(ISession.IS_SLAVE, true); |
| SessionUtil.Info info = SessionUtil.getFragmentInfo(session.getFragment()); |
| if (info != null) { |
| List<ISession> temp = getSessions((s) -> ((String)s.get(ISession.HOSTNAME)).equalsIgnoreCase(info.host)); |
| |
| for (ISession s : temp) { |
| if (s.getType() == ISession.Type.MASTER) { |
| s.addSlave(session); |
| break; |
| } |
| } |
| } |
| } else { |
| // register master |
| List<ISession> temp = getSessions((s) -> ((Boolean)s.get(ISession.IS_SLAVE))); |
| for (ISession s : temp) { |
| session.addSlave(s); |
| } |
| } |
| session.set(ISession.HOSTNAME, session.getHost()); |
| sessions.add(session); |
| } |
| |
| void removeSession(ISession session) { |
| if (session.isSlaveSession()) { |
| SessionUtil.Info info = SessionUtil.getFragmentInfo(session.getFragment()); |
| if (info != null) { |
| List<ISession> temp = getSessions((s) -> ((String)s.get(ISession.HOSTNAME)).equalsIgnoreCase(info.host)); |
| |
| for (ISession s : temp) { |
| if (s.isMasterSession()) { |
| s.removeSlave(session); |
| break; |
| } |
| } |
| } |
| } |
| sessions.remove(session); |
| |
| } |
| |
| } |