blob: 9b2ad39777805b1cd959cbac9e631ea718b13664 [file] [log] [blame]
/*
* Copyright (c) 2020 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package org.eclipse.sensinact.gateway.core.remote;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Deque;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.common.execution.Executable;
import org.eclipse.sensinact.gateway.core.Session;
import org.eclipse.sensinact.gateway.core.message.Recipient;
import org.eclipse.sensinact.gateway.core.message.RemoteAgentCallback;
import org.eclipse.sensinact.gateway.core.message.SnaAgent;
import org.eclipse.sensinact.gateway.core.message.SnaConstants;
import org.eclipse.sensinact.gateway.core.message.SnaFilter;
import org.eclipse.sensinact.gateway.core.message.SnaMessage;
import org.eclipse.sensinact.gateway.core.message.SnaRemoteMessage;
import org.eclipse.sensinact.gateway.core.message.SnaRemoteMessageImpl;
import org.eclipse.sensinact.gateway.core.method.legacy.SubscribeResponse;
import org.eclipse.sensinact.gateway.util.UriUtils;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.osgi.framework.ServiceRegistration;
/**
* {@link RemoteCore} implementation
*
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
public class RemoteSensiNact implements RemoteCore {
// ********************************************************************//
// NESTED DECLARATIONS //
// ********************************************************************//
/**
*
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
final class SubscriptionReference {
public final String publicKey;
public final String serviceProviderId;
public final String serviceId;
public final String resourceId;
SubscriptionReference(String publicKey, String serviceProviderId, String serviceId, String resourceId) {
this.publicKey = publicKey;
this.serviceProviderId = serviceProviderId;
this.serviceId = serviceId;
this.resourceId = resourceId;
}
}
// ********************************************************************//
// ABSTRACT DECLARATIONS //
// ********************************************************************//
// ********************************************************************//
// STATIC DECLARATIONS //
// ********************************************************************//
public static final String NAMESPACE_PROP = "namespace";
// ********************************************************************//
// INSTANCE DECLARATIONS //
// ********************************************************************//
protected Mediator mediator;
protected LocalEndpoint localEndpoint;
protected RemoteEndpoint remoteEndpoint;
protected Map<String, String> localAgents;
protected ServiceRegistration<RemoteCore> registration;
protected Map<String, SubscriptionReference> subscriptionIds;
protected Deque<Executable<String, Void>> onConnectedCallback;
protected Deque<Executable<String, Void>> onDisconnectedCallback;
/**
* Constructor
*
* @param mediator
* the {@link Mediator} allowing to interact with the OSGi host
* environment
* @param remoteEndpoint
* the {@link RemoteEndpoint} the {@link RemoteCore} to be
* instantiated will be attached to, in manner of communicating with
* a remote instance of sensiNact
* @param localEndpoint
* the {@link LocalEndpoint} the {@link RemoteCore} to be
* instantiated will be attached to in manner of communicating with
* the local instance of sensiNact
*/
public RemoteSensiNact(Mediator mediator, LocalEndpoint localEndpoint) {
this.mediator = mediator;
this.localAgents = new HashMap<String, String>();
this.subscriptionIds = new HashMap<String, SubscriptionReference>();
this.localEndpoint = localEndpoint;
this.onConnectedCallback = new LinkedList<Executable<String, Void>>();
this.onDisconnectedCallback = new LinkedList<Executable<String, Void>>();
this.onConnectedCallback.add(new Executable<String,Void>(){
@Override
public Void execute(String namespace) throws Exception {
RemoteSensiNact.this.propagateRemoteConnectionStatus(true, namespace);
return null;
}
});
this.onDisconnectedCallback.add(new Executable<String,Void>(){
@Override
public Void execute(String namespace) throws Exception {
RemoteSensiNact.this.propagateRemoteConnectionStatus(false, namespace);
return null;
}
});
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#onConnected(org.eclipse.sensinact.gateway.common.execution.Executable)
*/
@Override
public void onConnected(Collection<Executable<String, Void>> onConnectedCallbacks) {
if (onConnectedCallbacks == null) {
return;
}
this.onConnectedCallback.addAll(onConnectedCallbacks);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#onDisconnected(org.eclipse.sensinact.gateway.common.execution.Executable)
*/
@Override
public void onDisconnected(Collection<Executable<String, Void>> onDisconnectedCallbacks) {
if (onDisconnectedCallbacks == null) {
return;
}
if (this.onDisconnectedCallback == null) {
}
this.onDisconnectedCallback.addAll(onDisconnectedCallbacks);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#open(org.eclipse.sensinact.gateway.core.remote.RemoteEndpoint)
*/
@Override
public void open(RemoteEndpoint remoteEndpoint) {
if (remoteEndpoint == null) {
throw new NullPointerException("A remote endpoint is needed");
}
this.remoteEndpoint = remoteEndpoint;
this.remoteEndpoint.open(this);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#connect(java.lang.String)
*/
@Override
public void connect(final String namespace) {
synchronized(this) {
if (namespace == null) {
mediator.debug("Unable to register because of null namespace");
return;
}
this.registration = AccessController.<ServiceRegistration<RemoteCore>>doPrivileged(
new PrivilegedAction<ServiceRegistration<RemoteCore>>() {
@Override
public ServiceRegistration<RemoteCore> run() {
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(NAMESPACE_PROP, namespace);
ServiceRegistration<RemoteCore> reg = mediator.getContext().registerService(
RemoteCore.class, RemoteSensiNact.this, props);
mediator.debug("RemoteCore '%s' registration done", namespace);
return reg;
}
});
if (this.registration == null) {
mediator.debug("Registration error");
return;
}
if (this.onConnectedCallback != null) {
Iterator<Executable<String, Void>> it = this.onConnectedCallback.iterator();
while (it.hasNext()) {
try {
it.next().execute(namespace);
} catch (Exception e) {
mediator.error(e);
}
}
}
}
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#close()
*/
@Override
public void close() {
this.disconnect();
synchronized (this.subscriptionIds) {
this.subscriptionIds.clear();
}
this.remoteEndpoint.close();
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#disconnect()
*/
@Override
public void disconnect() {
synchronized(this) {
final String namespace = this.remoteEndpoint.namespace();
if (this.onDisconnectedCallback!= null && !this.onDisconnectedCallback.isEmpty() && namespace!=null) {
Iterator<Executable<String, Void>> it = this.onDisconnectedCallback.iterator();
while (it.hasNext()) {
try {
it.next().execute(namespace);
} catch (Exception e) {
mediator.error(e);
}
}
}
if (this.registration != null) {
AccessController.<Void>doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
try {
RemoteSensiNact.this.registration.unregister();
} catch (IllegalStateException e) {
RemoteSensiNact.this.mediator.error(e);
} finally {
RemoteSensiNact.this.registration = null;
}
return null;
}
});
}
if (!this.subscriptionIds.isEmpty()) {
String[] keys = null;
synchronized (this.subscriptionIds) {
keys = new String[this.subscriptionIds.size()];
this.subscriptionIds.keySet().toArray(keys);
int index = 0;
int length = keys == null ? 0 : keys.length;
for (; index < length; index++) {
SubscriptionReference ref = this.subscriptionIds.remove(keys[index]);
if (ref == null) {
continue;
}
JSONObject response = this.localEndpoint.unsubscribe(ref.publicKey,
ref.serviceProviderId, ref.serviceId, ref.resourceId, keys[index]);
mediator.debug(response.toString());
}
}
}
this.localEndpoint.close();
this.localAgents.clear();
}
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#endpoint()
*/
@Override
public RemoteEndpoint endpoint() {
return this.remoteEndpoint;
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#getAll(java.lang.String)
*/
@Override
public String getAll(String publicKey) {
return this.getAll(publicKey, null);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#getAll(java.lang.String,java.lang.String)
*/
@Override
public String getAll(String publicKey, String filter) {
return this.localEndpoint.getAll(publicKey, filter);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#getProviders(java.lang.String)
*/
@Override
public String getProviders(String publicKey) {
return this.localEndpoint.getProviders(publicKey);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#getProvider(java.lang.String, java.lang.String)
*/
@Override
public String getProvider(String publicKey, String serviceProviderId) {
return this.localEndpoint.getProvider(publicKey, serviceProviderId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#getServices(java.lang.String, java.lang.String)
*/
@Override
public String getServices(String publicKey, String serviceProviderId) {
return this.localEndpoint.getServices(publicKey, serviceProviderId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#getService(java.lang.String, java.lang.String, java.lang.String)
*/
@Override
public String getService(String publicKey, String serviceProviderId, String serviceId) {
return this.localEndpoint.getService(publicKey, serviceProviderId, serviceId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* getResources(java.lang.String, java.lang.String, java.lang.String)
*/
@Override
public String getResources(String publicKey, String serviceProviderId, String serviceId) {
return this.localEndpoint.getResources(publicKey, serviceProviderId, serviceId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* getResource(java.lang.String,java.lang.String,java.lang.String,java.lang.String)
*/
@Override
public String getResource(String publicKey, String serviceProviderId, String serviceId, String resourceId) {
return this.localEndpoint.getResource(publicKey, serviceProviderId, serviceId, resourceId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* get(java.lang.String,java.lang.String, java.lang.String, java.lang.String, java.lang.String)
*/
@Override
public JSONObject get(String publicKey, String serviceProviderId, String serviceId, String resourceId,
String attributeId) {
return this.localEndpoint.get(publicKey, serviceProviderId, serviceId, resourceId, attributeId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* set(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.Object)
*/
@Override
public JSONObject set(String publicKey, String serviceProviderId, String serviceId, String resourceId,
String attributeId, Object parameter) {
return this.localEndpoint.set(publicKey, serviceProviderId, serviceId, resourceId, attributeId, parameter);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* act(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.Object[])
*/
@Override
public JSONObject act(String publicKey, String serviceProviderId, String serviceId, String resourceId,
Object[] parameters) {
return this.localEndpoint.act(publicKey, serviceProviderId, serviceId, resourceId, parameters);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#
* subscribe(java.lang.String,java.lang.String,java.lang.String,java.lang.String,org.json.JSONArray)
*/
@Override
public JSONObject subscribe(String publicKey, String serviceProviderId, String serviceId, String resourceId,
JSONArray conditions) {
JSONObject response = this.subscribe(publicKey, serviceProviderId, serviceId, resourceId, this.remoteEndpoint,
conditions);
return response;
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* subscribe(java.lang.String,java.lang.String,java.lang.String,java.lang.String,org.eclipse.sensinact.gateway.core.message.Recipient,org.json.JSONArray)
*/
@Override
public JSONObject subscribe(String publicKey, String serviceProviderId, String serviceId, String resourceId,
Recipient recipient, JSONArray conditions) {
JSONObject response = this.localEndpoint.subscribe(publicKey, serviceProviderId, serviceId, resourceId,
recipient, conditions);
if (response.optInt("statusCode") == 200) {
try {
String subscriptionId = response.getJSONObject("response").getString("subscriptionId");
this.subscriptionIds.put(subscriptionId,
new SubscriptionReference(publicKey, serviceProviderId, serviceId, resourceId));
} catch (JSONException e) {
mediator.error(e);
}
}
return response;
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#
* unsubscribe(java.lang.String, java.lang.String, java.lang.String,java.lang.String, java.lang.String)
*/
@Override
public JSONObject unsubscribe(String publicKey, String serviceProviderId, String serviceId, String resourceId,
String subscriptionId) {
this.subscriptionIds.remove(subscriptionId);
JSONObject response = this.localEndpoint.unsubscribe(publicKey, serviceProviderId, serviceId, resourceId,
subscriptionId);
return response;
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.Endpoint#isAccessible(java.lang.String, java.lang.String)
*/
@Override
public boolean isAccessible(String publicKey, String path) {
return this.localEndpoint.isAccessible(publicKey, path);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#namespace()
*/
@Override
public String namespace() {
return this.localEndpoint.localNamespace();
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#localID()
*/
@Override
public int localID() {
return this.localEndpoint.localID();
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#
* registerAgent(java.lang.String,org.eclipse.sensinact.gateway.core.message.SnaFilter,java.lang.String)
*/
@Override
public void registerAgent(final String remoteAgentId, final SnaFilter filter, final String publicKey) {
if(this.localAgents.get(remoteAgentId) != null) {
mediator.warn("the remote agent already exists");
return;
}
Session session = this.localEndpoint.getSession(publicKey);
RemoteAgentCallback callback = new RemoteAgentCallback(remoteAgentId, this);
SubscribeResponse resp = session.registerSessionAgent(callback, filter);
JSONObject registration = resp.getResponse();
String localAgentId = null;
if (!JSONObject.NULL.equals(registration) && (localAgentId = (String)
registration.opt("subscriptionId")) != null) {
this.localAgents.put(remoteAgentId, localAgentId);
}
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#unregisterAgent(java.lang.String)
*/
@Override
public void unregisterAgent(String remoteAgentId) {
String localAgentId = this.localAgents.remove(remoteAgentId);
if (localAgentId == null) {
return;
}
this.localEndpoint.unregisterAgent(localAgentId);
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#
* dispatch(java.lang.String,org.eclipse.sensinact.gateway.core.message.SnaMessage)
*/
@Override
public void dispatch(String agentId, final SnaMessage<?> message) {
this.callAgent(agentId, true, new Executable<SnaAgent, Void>() {
@Override
public Void execute(SnaAgent agent) throws Exception {
if (agent != null) {
agent.register(message);
}
return null;
}
});
}
/**
* @inheritDoc
*
* @see org.eclipse.sensinact.gateway.core.remote.RemoteCore#closeSession(java.lang.String)
*/
@Override
public void closeSession(String publicKey) {
this.localEndpoint.closeSession(publicKey);
}
private final void callAgent(final String agentId, final boolean local,
final Executable<SnaAgent, Void> executable) {
AccessController.<Void>doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
RemoteSensiNact.this.mediator.callServices(SnaAgent.class,
new StringBuilder().append("(org.eclipse.sensinact.gateway.agent.id="
).append(agentId).append(")").toString(), executable);
return null;
}
});
}
private final void propagateRemoteConnectionStatus(boolean connected, String namespace) {
SnaRemoteMessageImpl message = new SnaRemoteMessageImpl(mediator, UriUtils.ROOT,
connected?SnaRemoteMessage.Remote.CONNECTED:SnaRemoteMessage.Remote.DISCONNECTED);
message.setNotification(new JSONObject().put(SnaConstants.REMOTE,
connected?SnaRemoteMessage.Remote.CONNECTED.name():SnaRemoteMessage.Remote.DISCONNECTED.name())
.put(SnaConstants.NAMESPACE, namespace));
dispatch("*", message);
}
}