blob: b6a5b2a606813b9bcdd9cafcbd931f547a8f7e78 [file] [log] [blame]
/**
* Copyright (c) 2011, 2015 - Lunifera GmbH (Gross Enzersdorf, Austria), Loetz GmbH&Co.KG (69115 Heidelberg, Germany)
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Florian Pirchner - Initial implementation
*/
package org.eclipse.osbp.runtime.common.session.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.eclipse.osbp.runtime.common.session.ISession;
import org.eclipse.osbp.runtime.common.session.ISessionManager;
import org.eclipse.osbp.runtime.common.session.ITransactionHandler;
import org.eclipse.osbp.runtime.common.session.SessionCallback;
import org.eclipse.osbp.runtime.common.session.SessionUtil;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(immediate = true)
public class SessionManager implements ISessionManager {
static Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
List<ISession> sessions;
ExecutorService executorService;
@Activate
void activate() {
sessions = new CopyOnWriteArrayList<>();
executorService = Executors.newFixedThreadPool(3, (r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName(SessionManager.class.getName() + "-" + new Date().getTime());
return t;
});
}
@Deactivate
void deactivate() {
sessions.clear();
sessions = null;
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
@Override
public List<ISession> getSessions(Predicate<ISession> filter) {
if (filter == null) {
return Collections.unmodifiableList(sessions);
} else {
List<ISession> result = new ArrayList<>();
for (ISession session : sessions.toArray(new ISession[sessions.size()])) {
if (!filter.test(session)) {
continue;
}
result.add(session);
}
return Collections.unmodifiableList(result);
}
}
@Override
public CompletableFuture<List<ISession>> getSessionsAsync(Predicate<ISession> filter) {
return CompletableFuture.supplyAsync(() -> getSessions(filter), executorService);
}
@Override
public <T> CompletableFuture<List<FutureResult<T>>> asyncAll(Function<ISession, T> function) {
return asyncAll(function, null);
}
@Override
public <T> CompletableFuture<List<FutureResult<T>>> asyncAll(Function<ISession, T> function,
Predicate<ISession> filter) {
// create a promise
CompletableFuture<List<FutureResult<T>>> promise = new CompletableFuture<>();
// query the sessions async
CompletableFuture<List<ISession>> sessions = getSessionsAsync(filter);
// if sessions queried, then call session#async for each session
sessions.thenCompose((_sessions) -> {
List<CompletableFuture<FutureResult<T>>> asyncs = collectSessionAsyncCalls(_sessions, function);
// we need a trigger which triggers, if all s#async are done
CompletableFuture<Void> allOfTrigger = CompletableFuture
.allOf(asyncs.toArray(new CompletableFuture<?>[asyncs.size()]));
allOfTrigger.thenRun(() -> {
// now lets collect the values from the async calls
List<FutureResult<T>> tempResult = asyncs.stream().map((a) -> a.join()).collect(Collectors.toList());
// and notify the promise
promise.complete(tempResult);
});
return allOfTrigger;
});
return promise;
}
/**
* Collects all async session calls and returns the resulting list.
*
* @param sessions
* @param function
* @return
*/
<T> List<CompletableFuture<FutureResult<T>>> collectSessionAsyncCalls(List<ISession> sessions,
Function<ISession, T> function) {
List<CompletableFuture<FutureResult<T>>> asyncs = sessions.stream()
.<CompletableFuture<FutureResult<T>>>map((s) -> s.async(function, null)).collect(Collectors.toList());
return asyncs;
}
@Override
public <T> List<CompletableFuture<FutureResult<T>>> asyncEach(Function<ISession, T> function,
SessionCallback<T> callback) {
return asyncEach(function, callback, null);
}
@Override
public <T> List<CompletableFuture<FutureResult<T>>> asyncEach(Function<ISession, T> function,
SessionCallback<T> callback, Predicate<ISession> filter) {
// query the sessions sync
List<ISession> sessions = getSessions(filter);
// create future list querying the FutureResult async
List<CompletableFuture<FutureResult<T>>> result = sessions.stream().map((s) -> s.async(function, callback))
.collect(Collectors.toList());
return result;
}
@Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
void addSession(ISession session) {
if (session.isSlaveSession()) {
// register slave
session.set(ISession.IS_SLAVE, true);
SessionUtil.Info info = SessionUtil.getFragmentInfo(session.getFragment());
if (info != null) {
List<ISession> temp = getSessions((s) -> ((String)s.get(ISession.HOSTNAME)).equalsIgnoreCase(info.host));
for (ISession s : temp) {
if (s.getType() == ISession.Type.MASTER) {
s.addSlave(session);
break;
}
}
}
} else {
// register master
List<ISession> temp = getSessions((s) -> ((Boolean)s.get(ISession.IS_SLAVE)!=null));
for (ISession s : temp) {
session.addSlave(s);
}
}
session.set(ISession.HOSTNAME, session.getHost());
sessions.add(session);
}
void removeSession(ISession session) {
if (session.isSlaveSession()) {
SessionUtil.Info info = SessionUtil.getFragmentInfo(session.getFragment());
if (info != null) {
List<ISession> temp = getSessions((s) -> ((String)s.get(ISession.HOSTNAME)).equalsIgnoreCase(info.host));
for (ISession s : temp) {
if (s.isMasterSession()) {
s.removeSlave(session);
break;
}
}
}
}
sessions.remove(session);
}
@Override
public ITransactionHandler getTransactionHandler(Object ui) {
List<ISession> temp = getSessions(s -> s.getUI() == ui);
if(!temp.isEmpty()) {
return temp.get(0).getTransactionHandler();
}
return null;
}
@Override
public boolean setTransactionHandler(Object ui, ITransactionHandler txn) {
List<ISession> temp = getSessions(s -> s.getUI() == ui);
if(temp.isEmpty()) {
return false;
}
temp.get(0).setTransactionHandler(txn);
return true;
}
@Override
public boolean disposeTransactionHandler(Object ui) {
List<ISession> temp = getSessions(s -> s.getUI() == ui);
if(temp.isEmpty()) {
return false;
}
ITransactionHandler txn = temp.get(0).getTransactionHandler();
if(txn != null) {
txn.dispose();
temp.get(0).setTransactionHandler(null);
}
return true;
}
}