blob: 821bea416fe9e0a52623df08b235b5e0d591ac61 [file] [log] [blame]
/*
* Copyright (c) 2020 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package org.eclipse.sensinact.gateway.core.message;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.common.execution.Executable;
import org.eclipse.sensinact.gateway.common.primitive.Name;
import org.eclipse.sensinact.gateway.core.SensiNactResourceModelConfiguration;
import org.eclipse.sensinact.gateway.core.method.AccessMethod;
import org.eclipse.sensinact.gateway.core.method.AccessMethodResponse;
import org.eclipse.sensinact.gateway.core.security.AccessLevelOption;
import org.eclipse.sensinact.gateway.core.security.MethodAccessibility;
import org.eclipse.sensinact.gateway.util.UriUtils;
import org.eclipse.sensinact.gateway.util.stack.AbstractStackEngineHandler;
/**
* {@link SnaMessage} handler managing a set of callbacks mapped to
* {@link SnaFilter}s defining whether to transmit or not triggered messages
*
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
public class SnaMessageListener extends AbstractStackEngineHandler<SnaMessage<?>> implements MessageRouter {
/**
* The set of {@link MidCallback} mapped to {@link SnaFilter}s defining whether
* to call them or not
*/
protected final Map<SnaFilter, List<MidCallback>> callbacks;
/**
* The set of {@link MidCallback} mapped to {@link SnaFilter}s defining whether
* to call them or not
*/
protected final Map<String, List<MethodAccessibility>> agentsAccessibility;
/**
* The sensiNact resource model configuration providing access rules applying on
* potentially registered {@link SnaAgent}
*/
private SensiNactResourceModelConfiguration configuration;
/**
* the {@link Mediator} allowing to interact with the OSGi host environment
*/
private Mediator mediator;
/**
* Constructor
*
* @param mediator
* @param configuration
*/
public SnaMessageListener(Mediator mediator, SensiNactResourceModelConfiguration configuration) {
super();
this.mediator = mediator;
this.configuration = configuration;
this.callbacks = new HashMap<SnaFilter, List<MidCallback>>();
this.agentsAccessibility = new HashMap<String, List<MethodAccessibility>>();
}
@Override
public void addCallback(SnaFilter filter, MidCallback callback) {
if (filter == null || callback == null ||!callback.isActive()) {
return;
}
synchronized (this.callbacks) {
List<MidCallback> list = this.callbacks.get(filter);
if (list == null) {
list = new LinkedList<MidCallback>();
this.callbacks.put(filter, list);
}
list.add(callback);
}
}
@Override
public void deleteCallback(String callback) {
synchronized (this.callbacks) {
Iterator<Entry<SnaFilter, List<MidCallback>>> iterator = this.callbacks.entrySet().iterator();
while (iterator.hasNext()) {
Entry<SnaFilter, List<MidCallback>> entry = iterator.next();
List<MidCallback> list = entry.getValue();
if (list.remove(new Name<MidCallback>(callback))) {
if (list.isEmpty()) {
iterator.remove();
}
break;
}
}
}
}
/**
* @inheritDoc
*
* @see MessageRouter#count(java.lang.String)
*/
@Override
public int count(String uri) {
int count = 0;
String formatedUri = UriUtils.formatUri(uri);
synchronized (this.callbacks) {
Iterator<SnaFilter> iterator = this.callbacks.keySet().iterator();
while (iterator.hasNext()) {
SnaFilter snaFilter = iterator.next();
if (snaFilter.sender.equals(formatedUri)) {
count++;
}
}
}
return count;
}
/**
* Removes the {@link SnaFilter} passed as parameter and all mapped
* {@link MidCallback}s
*
* @param filter
* the {@link SnaFilter} to remove
*/
public void removeFilter(SnaFilter filter) {
synchronized (this.callbacks) {
this.callbacks.remove(filter);
}
}
@Override
public void handle(SnaMessage message) {
super.eventEngine.push(message);
}
@Override
public void doHandle(SnaMessage<?> message) {
String messageMethod = null;
switch (((SnaMessageSubType) message.getType()).getSnaMessageType()) {
case RESPONSE:
switch ((AccessMethodResponse.Response) message.getType()) {
case ACT_RESPONSE:
messageMethod = AccessMethod.ACT;
break;
case DESCRIBE_RESPONSE:
messageMethod = AccessMethod.DESCRIBE;
break;
case GET_RESPONSE:
messageMethod = AccessMethod.GET;
break;
case SET_RESPONSE:
messageMethod = AccessMethod.SET;
break;
case SUBSCRIBE_RESPONSE:
messageMethod = AccessMethod.SUBSCRIBE;
break;
case UNSUBSCRIBE_RESPONSE:
messageMethod = AccessMethod.UNSUBSCRIBE;
break;
default:
break;
}
break;
case ERROR:
case LIFECYCLE:
messageMethod = AccessMethod.DESCRIBE;
break;
case UPDATE:
messageMethod = AccessMethod.GET;
break;
default:
break;
}
if (messageMethod != null)
doHandleAgents(message, messageMethod);
doHandleSubscribers(message);
}
/**
* Transmits the {@link SnaMessage} passed as parameter to the appropriate
* subscribers according to their associated filter, target, and message type
*
* @param message
* the {@link SnaMessage} to transmit
*/
private void doHandleSubscribers(final SnaMessage<?> message) {
synchronized (this.callbacks) {
Iterator<Entry<SnaFilter, List<MidCallback>>> iterator = this.callbacks.entrySet().iterator();
while (iterator.hasNext()) {
Entry<SnaFilter, List<MidCallback>> entry = iterator.next();
SnaFilter filter = entry.getKey();
if (!filter.matches(message)) {
continue;
}
Iterator<MidCallback> callbackIterator = entry.getValue().iterator();
while (callbackIterator.hasNext()) {
MidCallback callback = callbackIterator.next();
if ((callback.getTimeout() != MidCallback.ENDLESS
&& System.currentTimeMillis() > callback.getTimeout())
|| !callback.isActive()) {
callbackIterator.remove();
continue;
}
callback.getMessageRegisterer().register(message);
}
}
}
}
/**
* Transmits the {@link SnaMessage} passed as parameter to the appropriate
* agents according to their access rights
*
* @param message
* the {@link SnaMessage} to transmit
*/
private void doHandleAgents(final SnaMessage<?> message, final String method) {
final String path = message.getPath();
synchronized(this) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
mediator.callServices(SnaAgent.class, new Executable<SnaAgent, Void>() {
@Override
public Void execute(SnaAgent agent) throws Exception {
String agentKey = agent.getPublicKey();
List<MethodAccessibility> methodAccessibilities = SnaMessageListener.this.agentsAccessibility.get(agentKey);
int index = -1;
if (methodAccessibilities == null) {
AccessLevelOption option = SnaMessageListener.this.configuration.getAuthenticatedAccessLevelOption(path, agentKey);
if (option == null) {
option = AccessLevelOption.ANONYMOUS;
}
methodAccessibilities = SnaMessageListener.this.configuration.getAccessibleMethods(path, option);
SnaMessageListener.this.agentsAccessibility.put(agentKey, methodAccessibilities);
}
if ((index = methodAccessibilities.indexOf(new Name<MethodAccessibility>(method))) > -1
&& methodAccessibilities.get(index).isAccessible()) {
agent.register(message);
}
return null;
}
});
return null;
}
}
);
}
}
@Override
public void close(boolean wait) {
if (wait) {
super.close();
} else {
super.stop();
}
}
}