Fix global subscription error

Handle finer websocket frames
diff --git a/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java b/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java
index 942b73b..d868c70 100644
--- a/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java
+++ b/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java
@@ -10,22 +10,22 @@
  */
 package org.eclipse.sensinact.gateway.agent.mqtt.generic.internal;
 
+import java.io.IOException;
+
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.eclipse.sensinact.gateway.core.message.AbstractMidAgentCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * Generic MQTT Agent
  */
-public class GenericMqttAgent extends AbstractMidAgentCallback {
+public class GenericMqttAgent {
+	
     private static final Logger LOG = LoggerFactory.getLogger(GenericMqttAgent.class);
     private final String broker;
     private final int qos;
diff --git a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java
index f413e30..83e022e 100644
--- a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java
+++ b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java
@@ -10,7 +10,6 @@
  */
 package org.eclipse.sensinact.gateway.nthbnd.rest.internal.ws;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Future;
@@ -99,13 +98,25 @@
             restAccess.proceed();
         } catch (IOException | JSONException e) {
             this.mediator.error(e);
-            this.send(new JSONObject().put("statusCode", 400).put("message", "Bad request").toString());
+            try {
+				this.send(new JSONObject().put("statusCode", 400).put("message", "Bad request").toString());
+			} catch (Exception e1) {
+	            this.mediator.error(e1);
+			}
         } catch (InvalidCredentialException e) {
             this.mediator.error(e);
-            this.send(new JSONObject().put("statusCode", 403).put("message", e.getMessage()).toString());
+            try {
+				this.send(new JSONObject().put("statusCode", 403).put("message", e.getMessage()).toString());
+			} catch (Exception e1) {
+	            this.mediator.error(e1);
+			}
         } catch (Exception e) {
             this.mediator.error(e);
-            this.send(new JSONObject().put("statusCode", 500).put("message", "Exception - Internal server error").toString());
+            try {
+				this.send(new JSONObject().put("statusCode", 500).put("message", "Exception - Internal server error").toString());
+			} catch (Exception e1) {
+	            this.mediator.error(e1);
+			}
         }
     }
 
@@ -113,25 +124,42 @@
 	private byte[] payload; 
 	
 	@OnWebSocketFrame
-	public void onFrame(Frame frame) {	
-		if(OpCode.CONTINUATION != frame.getOpCode() && frame.isFin()) {
-			partial = false;
-			return;
-		}
-		partial = true;
-		byte[] bytes = new byte[frame.getPayloadLength()];
-		frame.getPayload().get(bytes);
-		int length = payload==null?0:payload.length;
-		byte[] tmpArray = new byte[length+bytes.length] ;
-		if(bytes.length > 0)
-			System.arraycopy(bytes,0, tmpArray, length, bytes.length);
-		if(length > 0)
-			System.arraycopy(payload, 0, tmpArray, 0, length);
-		payload = tmpArray;	
-		tmpArray = null;
-		if(frame.isFin()) {
-			onMessage(new String(payload));
-			payload = null;
+	public void onFrame(Frame frame) {		
+		switch(frame.getOpCode()) {		
+			case OpCode.PING :
+				partial = true;
+				try {
+					this.session.getRemote().sendPong(ByteBuffer.allocate(0));
+				} catch (IOException e) {
+		            this.mediator.error(e);
+				}
+				break;
+			case OpCode.PONG :
+				partial = true;
+				break;
+			case OpCode.CONTINUATION:
+				partial = true;
+				byte[] bytes = new byte[frame.getPayloadLength()];
+				frame.getPayload().get(bytes);
+				int length = payload==null?0:payload.length;
+				byte[] tmpArray = new byte[length+bytes.length] ;
+				if(bytes.length > 0)
+					System.arraycopy(bytes,0, tmpArray, length, bytes.length);
+				if(length > 0)
+					System.arraycopy(payload, 0, tmpArray, 0, length);
+				payload = tmpArray;	
+				tmpArray = null;
+				if(frame.isFin()) {
+					partial = false;
+					onMessage(new String(payload));
+					partial = true;
+					payload = null;
+				}
+				break;
+			default:
+				if(frame.isFin())
+					partial = false;
+				break;
 		}
 	}
 	
@@ -140,7 +168,7 @@
         error.printStackTrace();
     }
     
-    protected void send(String message) {
+    protected void send(String message) throws Exception {
         if (this.session == null)
             return;
         try {
@@ -160,11 +188,17 @@
         		future.get(1, TimeUnit.SECONDS);
         	}
         } catch (Exception e) {
-            this.mediator.error(new StringBuilder().append("Session ").append(session.getLocalAddress()).append("seems to be invalid, removing from the pool.").toString(), e);
+            this.mediator.error(new StringBuilder(
+            	).append("Session "
+            	).append(session.getLocalAddress()
+            	).append("seems to be invalid, removing from the pool."
+            	).toString(), e);
+            pool.deleteSocketEndpoint(this);
+            throw e;
         }
     }
 
-    protected void send(byte[] message) {
+    protected void send(byte[] message) throws Exception {
         if (this.session == null)
             return;
         try {
@@ -183,7 +217,13 @@
 	    		future.get(1, TimeUnit.SECONDS);
 	    	}
         } catch (Exception e) {
-            this.mediator.error(new StringBuilder().append("Session ").append(session.getLocalAddress()).append("seems to be invalid, removing from the pool.").toString(), e);
+            this.mediator.error(new StringBuilder(
+            		).append("Session "
+            		).append(session.getLocalAddress()
+            		).append("seems to be invalid, removing from the pool."
+            		).toString(), e);
+            pool.deleteSocketEndpoint(this);
+            throw e;
         }
     }
 }
diff --git a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java
index 7b1e960..448e232 100644
--- a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java
+++ b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java
@@ -26,12 +26,8 @@
         this.wsConnection = wsConnection;
     }
 
-    /**
-     * @inheritDoc
-     * @see MidNorthboundRecipient#
-     * doCallback(java.lang.String, SnaMessage)
-     */
-    public void callback(String callbackId, SnaMessage[] messages) {
+    @Override
+    public void callback(String callbackId, SnaMessage[] messages) throws Exception {
         int index = 0;
         int length = messages == null ? 0 : messages.length;
         StringBuilder builder = new StringBuilder();
@@ -53,11 +49,7 @@
         }
         builder.append(JSONUtils.CLOSE_BRACKET);
         builder.append(JSONUtils.CLOSE_BRACE);
-        try {
-            this.wsConnection.send(builder.toString());
-        } catch (Exception e) {
-            super.mediator.error(e);
-        }
+        this.wsConnection.send(builder.toString());
     }
 
 }
diff --git a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java
index 8fcb56b..8695c96 100644
--- a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java
+++ b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java
@@ -20,6 +20,7 @@
 import org.json.JSONObject;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.List;
 
 /**
@@ -72,13 +73,17 @@
         }
         byte[] resultBytes;
         List<String> acceptEncoding = super.request.getQueryMap().get("Accept-Encoding");
-        if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
-            resultBytes = NorthboundAccess.compress(result);
-            this.wsConnection.send(resultBytes);
-
-        } else {
-            resultBytes = result.getBytes("UTF-8");
-            this.wsConnection.send(new String(resultBytes));
+        try {
+	        if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
+	            resultBytes = NorthboundAccess.compress(result);
+	            this.wsConnection.send(resultBytes);
+	
+	        } else {
+	            resultBytes = result.getBytes("UTF-8");
+	            this.wsConnection.send(new String(resultBytes));
+	        }
+        } catch(Exception e) {
+        	throw new IOException(e);
         }
         return true;
     }
@@ -92,6 +97,10 @@
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("statusCode", i);
         jsonObject.put("message", message);
-        this.wsConnection.send(new String(jsonObject.toString().getBytes("UTF-8")));
+        try {
+			this.wsConnection.send(new String(jsonObject.toString().getBytes("UTF-8")));
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
     }
 }
diff --git a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java
index a63f0a2..bab6cad 100644
--- a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java
+++ b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java
@@ -57,7 +57,7 @@
         assertTrue(response.getString("uri").equals("/light/switch/status"));
         assertTrue(response.getJSONObject("response").get("value").equals("OFF"));
         simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + "/light/switch/turn_on/ACT", null, "POST");
-
+        System.out.println(simulated);
         response = new JSONObject(simulated);
         assertTrue(response.get("statusCode").equals(200));
 
@@ -79,7 +79,8 @@
         assertTrue(response.getString("uri").equals("/light/switch/brightness"));
         assertTrue(response.getJSONObject("response").get("value").equals(10));
         simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + "/providers/light/services/switch/resources/dim/ACT", "{\"parameters\":[{\"name\": \"brightness\",\"value\": 5,\"type\": \"int\"}]}", "POST");
-
+        System.out.println(simulated);
+        
         response = new JSONObject(simulated);
         System.out.println(response.toString());
 
diff --git a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java
index 4237c87..573049f 100644
--- a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java
+++ b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java
@@ -65,7 +65,7 @@
         simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + 
         	"/providers/slider/services/cursor/resources/position/SUBSCRIBE",
         	"{\"parameters\" : [{\"name\":\"callback\", \"type\":\"string\",\"value\":\"http://localhost:8898\"}]}", "POST");
-        //System.out.println(simulated);
+        System.out.println(simulated);
 
         response = new JSONObject(simulated);
 
@@ -89,6 +89,7 @@
         Thread.sleep(5000);
         slider.move(0);
         message = waitForAvailableMessage(10000);
+        System.out.println(message);
         Assert.assertNotNull(message);
         response = new JSONObject(message);
         response = response.getJSONArray("messages").getJSONObject(0);
@@ -105,7 +106,7 @@
 
         simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + "/providers/slider/services/cursor/resources/position/UNSUBSCRIBE", "{\"parameters\" : [{\"name\":\"subscriptionId\", \"type\":\"string\", \"value\":\"" + subscriptionId + "\"}]}", "POST");
 
-        //System.out.println(simulated);
+        System.out.println(simulated);
         response = new JSONObject(simulated);
         assertTrue(response.get("statusCode").equals(200));
         assertTrue(response.getString("uri").equals("/slider/cursor/position"));
diff --git a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java
index d2d6613..e57c8e7 100644
--- a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java
+++ b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java
@@ -116,6 +116,7 @@
     public void onMessage(String message) {
         this.setAvailable(true);
         this.lastMessage = message;
+        System.out.println(message);
     }
 
     @OnWebSocketError
diff --git a/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java b/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java
index 287cb08..b9c3596 100644
--- a/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java
+++ b/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java
@@ -147,8 +147,10 @@
                 this.isElementsList = true;	
                 break;
 	        case "sensinact":
-	            this.method = "ALL";
-	            this.multi = true;
+	        	if(this.method==null) {
+	        		this.method = "ALL";
+	        	}
+	        	this.multi = true;
                 break;
             default:
             	break;
@@ -440,7 +442,8 @@
                     }
                 }
                 if (sender == null) {
-                    sender = "/[^/]+(/[^/]+)*";
+                    sender = "(/[^/]+)+";
+                    isPattern = true;
                 }
                 if (types == null) {
                     types = SnaMessage.Type.values();
diff --git a/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java b/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java
index 0170a97..6f19c26 100644
--- a/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java
+++ b/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java
@@ -41,7 +41,7 @@
     }
 
     public MessageRouter getHandler() {
-        return super.messageHandler;
+        return super.messageRouter;
     }
 
     /**