Fix global subscription error
Handle finer websocket frames
diff --git a/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java b/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java
index 942b73b..d868c70 100644
--- a/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java
+++ b/platform/northbound/mqtt/mqtt-generic-agent/src/main/java/org/eclipse/sensinact/gateway/agent/mqtt/generic/internal/GenericMqttAgent.java
@@ -10,22 +10,22 @@
*/
package org.eclipse.sensinact.gateway.agent.mqtt.generic.internal;
+import java.io.IOException;
+
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.eclipse.sensinact.gateway.core.message.AbstractMidAgentCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/**
* Generic MQTT Agent
*/
-public class GenericMqttAgent extends AbstractMidAgentCallback {
+public class GenericMqttAgent {
+
private static final Logger LOG = LoggerFactory.getLogger(GenericMqttAgent.class);
private final String broker;
private final int qos;
diff --git a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java
index f413e30..83e022e 100644
--- a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java
+++ b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketConnection.java
@@ -10,7 +10,6 @@
*/
package org.eclipse.sensinact.gateway.nthbnd.rest.internal.ws;
-import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
@@ -99,13 +98,25 @@
restAccess.proceed();
} catch (IOException | JSONException e) {
this.mediator.error(e);
- this.send(new JSONObject().put("statusCode", 400).put("message", "Bad request").toString());
+ try {
+ this.send(new JSONObject().put("statusCode", 400).put("message", "Bad request").toString());
+ } catch (Exception e1) {
+ this.mediator.error(e1);
+ }
} catch (InvalidCredentialException e) {
this.mediator.error(e);
- this.send(new JSONObject().put("statusCode", 403).put("message", e.getMessage()).toString());
+ try {
+ this.send(new JSONObject().put("statusCode", 403).put("message", e.getMessage()).toString());
+ } catch (Exception e1) {
+ this.mediator.error(e1);
+ }
} catch (Exception e) {
this.mediator.error(e);
- this.send(new JSONObject().put("statusCode", 500).put("message", "Exception - Internal server error").toString());
+ try {
+ this.send(new JSONObject().put("statusCode", 500).put("message", "Exception - Internal server error").toString());
+ } catch (Exception e1) {
+ this.mediator.error(e1);
+ }
}
}
@@ -113,25 +124,42 @@
private byte[] payload;
@OnWebSocketFrame
- public void onFrame(Frame frame) {
- if(OpCode.CONTINUATION != frame.getOpCode() && frame.isFin()) {
- partial = false;
- return;
- }
- partial = true;
- byte[] bytes = new byte[frame.getPayloadLength()];
- frame.getPayload().get(bytes);
- int length = payload==null?0:payload.length;
- byte[] tmpArray = new byte[length+bytes.length] ;
- if(bytes.length > 0)
- System.arraycopy(bytes,0, tmpArray, length, bytes.length);
- if(length > 0)
- System.arraycopy(payload, 0, tmpArray, 0, length);
- payload = tmpArray;
- tmpArray = null;
- if(frame.isFin()) {
- onMessage(new String(payload));
- payload = null;
+ public void onFrame(Frame frame) {
+ switch(frame.getOpCode()) {
+ case OpCode.PING :
+ partial = true;
+ try {
+ this.session.getRemote().sendPong(ByteBuffer.allocate(0));
+ } catch (IOException e) {
+ this.mediator.error(e);
+ }
+ break;
+ case OpCode.PONG :
+ partial = true;
+ break;
+ case OpCode.CONTINUATION:
+ partial = true;
+ byte[] bytes = new byte[frame.getPayloadLength()];
+ frame.getPayload().get(bytes);
+ int length = payload==null?0:payload.length;
+ byte[] tmpArray = new byte[length+bytes.length] ;
+ if(bytes.length > 0)
+ System.arraycopy(bytes,0, tmpArray, length, bytes.length);
+ if(length > 0)
+ System.arraycopy(payload, 0, tmpArray, 0, length);
+ payload = tmpArray;
+ tmpArray = null;
+ if(frame.isFin()) {
+ partial = false;
+ onMessage(new String(payload));
+ partial = true;
+ payload = null;
+ }
+ break;
+ default:
+ if(frame.isFin())
+ partial = false;
+ break;
}
}
@@ -140,7 +168,7 @@
error.printStackTrace();
}
- protected void send(String message) {
+ protected void send(String message) throws Exception {
if (this.session == null)
return;
try {
@@ -160,11 +188,17 @@
future.get(1, TimeUnit.SECONDS);
}
} catch (Exception e) {
- this.mediator.error(new StringBuilder().append("Session ").append(session.getLocalAddress()).append("seems to be invalid, removing from the pool.").toString(), e);
+ this.mediator.error(new StringBuilder(
+ ).append("Session "
+ ).append(session.getLocalAddress()
+ ).append("seems to be invalid, removing from the pool."
+ ).toString(), e);
+ pool.deleteSocketEndpoint(this);
+ throw e;
}
}
- protected void send(byte[] message) {
+ protected void send(byte[] message) throws Exception {
if (this.session == null)
return;
try {
@@ -183,7 +217,13 @@
future.get(1, TimeUnit.SECONDS);
}
} catch (Exception e) {
- this.mediator.error(new StringBuilder().append("Session ").append(session.getLocalAddress()).append("seems to be invalid, removing from the pool.").toString(), e);
+ this.mediator.error(new StringBuilder(
+ ).append("Session "
+ ).append(session.getLocalAddress()
+ ).append("seems to be invalid, removing from the pool."
+ ).toString(), e);
+ pool.deleteSocketEndpoint(this);
+ throw e;
}
}
}
diff --git a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java
index 7b1e960..448e232 100644
--- a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java
+++ b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WebSocketRecipient.java
@@ -26,12 +26,8 @@
this.wsConnection = wsConnection;
}
- /**
- * @inheritDoc
- * @see MidNorthboundRecipient#
- * doCallback(java.lang.String, SnaMessage)
- */
- public void callback(String callbackId, SnaMessage[] messages) {
+ @Override
+ public void callback(String callbackId, SnaMessage[] messages) throws Exception {
int index = 0;
int length = messages == null ? 0 : messages.length;
StringBuilder builder = new StringBuilder();
@@ -53,11 +49,7 @@
}
builder.append(JSONUtils.CLOSE_BRACKET);
builder.append(JSONUtils.CLOSE_BRACE);
- try {
- this.wsConnection.send(builder.toString());
- } catch (Exception e) {
- super.mediator.error(e);
- }
+ this.wsConnection.send(builder.toString());
}
}
diff --git a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java
index 8fcb56b..8695c96 100644
--- a/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java
+++ b/platform/northbound/rest-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/rest/internal/ws/WsRestAccess.java
@@ -20,6 +20,7 @@
import org.json.JSONObject;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.List;
/**
@@ -72,13 +73,17 @@
}
byte[] resultBytes;
List<String> acceptEncoding = super.request.getQueryMap().get("Accept-Encoding");
- if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
- resultBytes = NorthboundAccess.compress(result);
- this.wsConnection.send(resultBytes);
-
- } else {
- resultBytes = result.getBytes("UTF-8");
- this.wsConnection.send(new String(resultBytes));
+ try {
+ if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
+ resultBytes = NorthboundAccess.compress(result);
+ this.wsConnection.send(resultBytes);
+
+ } else {
+ resultBytes = result.getBytes("UTF-8");
+ this.wsConnection.send(new String(resultBytes));
+ }
+ } catch(Exception e) {
+ throw new IOException(e);
}
return true;
}
@@ -92,6 +97,10 @@
JSONObject jsonObject = new JSONObject();
jsonObject.put("statusCode", i);
jsonObject.put("message", message);
- this.wsConnection.send(new String(jsonObject.toString().getBytes("UTF-8")));
+ try {
+ this.wsConnection.send(new String(jsonObject.toString().getBytes("UTF-8")));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
diff --git a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java
index a63f0a2..bab6cad 100644
--- a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java
+++ b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestACTAccess.java
@@ -57,7 +57,7 @@
assertTrue(response.getString("uri").equals("/light/switch/status"));
assertTrue(response.getJSONObject("response").get("value").equals("OFF"));
simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + "/light/switch/turn_on/ACT", null, "POST");
-
+ System.out.println(simulated);
response = new JSONObject(simulated);
assertTrue(response.get("statusCode").equals(200));
@@ -79,7 +79,8 @@
assertTrue(response.getString("uri").equals("/light/switch/brightness"));
assertTrue(response.getJSONObject("response").get("value").equals(10));
simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + "/providers/light/services/switch/resources/dim/ACT", "{\"parameters\":[{\"name\": \"brightness\",\"value\": 5,\"type\": \"int\"}]}", "POST");
-
+ System.out.println(simulated);
+
response = new JSONObject(simulated);
System.out.println(response.toString());
diff --git a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java
index 4237c87..573049f 100644
--- a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java
+++ b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/TestRestSUBSCRIBE_UNSUBSCRIBEAccess.java
@@ -65,7 +65,7 @@
simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL +
"/providers/slider/services/cursor/resources/position/SUBSCRIBE",
"{\"parameters\" : [{\"name\":\"callback\", \"type\":\"string\",\"value\":\"http://localhost:8898\"}]}", "POST");
- //System.out.println(simulated);
+ System.out.println(simulated);
response = new JSONObject(simulated);
@@ -89,6 +89,7 @@
Thread.sleep(5000);
slider.move(0);
message = waitForAvailableMessage(10000);
+ System.out.println(message);
Assert.assertNotNull(message);
response = new JSONObject(message);
response = response.getJSONArray("messages").getJSONObject(0);
@@ -105,7 +106,7 @@
simulated = HttpServiceTestClient.newRequest(mediator, HTTP_ROOTURL + "/providers/slider/services/cursor/resources/position/UNSUBSCRIBE", "{\"parameters\" : [{\"name\":\"subscriptionId\", \"type\":\"string\", \"value\":\"" + subscriptionId + "\"}]}", "POST");
- //System.out.println(simulated);
+ System.out.println(simulated);
response = new JSONObject(simulated);
assertTrue(response.get("statusCode").equals(200));
assertTrue(response.getString("uri").equals("/slider/cursor/position"));
diff --git a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java
index d2d6613..e57c8e7 100644
--- a/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java
+++ b/platform/northbound/rest-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/rest/ws/test/WsServiceTestClient.java
@@ -116,6 +116,7 @@
public void onMessage(String message) {
this.setAvailable(true);
this.lastMessage = message;
+ System.out.println(message);
}
@OnWebSocketError
diff --git a/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java b/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java
index 287cb08..b9c3596 100644
--- a/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java
+++ b/platform/northbound/sensinact-access/src/main/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/DefaultNorthboundRequestHandler.java
@@ -147,8 +147,10 @@
this.isElementsList = true;
break;
case "sensinact":
- this.method = "ALL";
- this.multi = true;
+ if(this.method==null) {
+ this.method = "ALL";
+ }
+ this.multi = true;
break;
default:
break;
@@ -440,7 +442,8 @@
}
}
if (sender == null) {
- sender = "/[^/]+(/[^/]+)*";
+ sender = "(/[^/]+)+";
+ isPattern = true;
}
if (types == null) {
types = SnaMessage.Type.values();
diff --git a/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java b/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java
index 0170a97..6f19c26 100644
--- a/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java
+++ b/platform/northbound/sensinact-access/src/test/java/org/eclipse/sensinact/gateway/nthbnd/endpoint/test/MyModelInstance.java
@@ -41,7 +41,7 @@
}
public MessageRouter getHandler() {
- return super.messageHandler;
+ return super.messageRouter;
}
/**