Handle downlink messages
Define QoS to 1 when pushishing (avoid to wait timeout error)
diff --git a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
index 764dc5c..c67d630 100644
--- a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
+++ b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
@@ -83,9 +83,7 @@
public void publish(String topic, String message){
try {
LOG.info("Publishing message {} on the topic {}", message,topic);
- MqttMessage mqMessage = new MqttMessage(message.getBytes());
- mqMessage.setQos(0);
- client.publish(topic, mqMessage);
+ client.publish(topic, message.getBytes(), 1, false);
} catch (Exception e) {
LOG.error("Unable to publishing message {} on the topic {}", message,topic);
}
diff --git a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnDownlinkListener.java b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnDownlinkListener.java
new file mode 100644
index 0000000..d367e52
--- /dev/null
+++ b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnDownlinkListener.java
@@ -0,0 +1,55 @@
+/*
+* Copyright (c) 2020 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+* Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.sthbnd.ttn.listener;
+
+import java.util.Base64;
+
+import org.eclipse.sensinact.gateway.common.bundle.Mediator;
+import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api.MqttBroker;
+
+public class TtnDownlinkListener {
+
+ private final Mediator mediator;
+ private MqttBroker broker;
+
+ public TtnDownlinkListener(Mediator mediator) {
+ this.mediator = mediator;
+ }
+
+ public void setBroker(MqttBroker broker) {
+ this.broker = broker;
+ }
+
+ public void messageReceived(String applicationId, String deviceId, Object value) {
+ byte[] bytes = null;
+ String topic = String.format("%s/devices/%s/down",applicationId,deviceId);
+ if(value != null) {
+ if(value.getClass().isArray() && value.getClass().getComponentType() == byte.class) {
+ bytes = (byte[])value;
+ } else {
+ if(value.getClass()==String.class)
+ bytes = ((String)value).getBytes();
+ else
+ bytes = String.valueOf(value).getBytes();
+ }
+ }
+ if(bytes == null) {
+ if(mediator.isErrorLoggable())
+ mediator.error("Null downlink value ");
+ return;
+ }
+ String message = String.format("{\"port\":1,\"confirmed\":true,\"payload_raw\":\"%s\"}",
+ Base64.getEncoder().encodeToString(bytes));
+ this.broker.publish(topic, message);
+ if(mediator.isDebugLoggable())
+ mediator.debug("Sent downlink message: " + message);
+ }
+}
diff --git a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java
index 2f4294c..9045173 100644
--- a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java
+++ b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2020 Kentyou.
+* Copyright (c) 2020-2021 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -10,11 +10,13 @@
*/
package org.eclipse.sensinact.gateway.sthbnd.ttn.listener;
+import java.util.List;
+
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.generic.packet.InvalidPacketException;
-import org.eclipse.sensinact.gateway.sthbnd.mqtt.device.MqttPacket;
import org.eclipse.sensinact.gateway.sthbnd.mqtt.device.MqttProtocolStackEndpoint;
import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.listener.MqttTopicMessage;
+import org.eclipse.sensinact.gateway.sthbnd.ttn.model.TtnSubPacket;
import org.eclipse.sensinact.gateway.sthbnd.ttn.model.TtnUplinkPayload;
import org.eclipse.sensinact.gateway.sthbnd.ttn.packet.TtnUplinkPacket;
import org.json.JSONException;
@@ -22,12 +24,16 @@
public class TtnUplinkListener extends MqttTopicMessage {
+ public static final String DOWNLINK_MARKER = "#DOWNLINK#";
+
private final Mediator mediator;
private final MqttProtocolStackEndpoint endpoint;
+ private TtnDownlinkListener dowlinkListener;
- public TtnUplinkListener(Mediator mediator, MqttProtocolStackEndpoint endpoint) {
+ public TtnUplinkListener(Mediator mediator, TtnDownlinkListener downlinkListener, MqttProtocolStackEndpoint endpoint) {
this.mediator = mediator;
this.endpoint = endpoint;
+ this.dowlinkListener = downlinkListener;
}
/* (non-Javadoc)
@@ -36,9 +42,9 @@
@Override
public void messageReceived(String topic, String message) {
- if(mediator.isDebugLoggable()) {
+ if(mediator.isDebugLoggable())
mediator.debug("Uplink message: " + message);
- }
+
String device = topic.split("/")[2];
JSONObject json = new JSONObject(message);
TtnUplinkPayload payload = null;
@@ -46,10 +52,24 @@
try {
payload = new TtnUplinkPayload(mediator, json);
} catch (JSONException e) {
- e.printStackTrace();
+ if(mediator.isErrorLoggable())
+ mediator.error(e.getMessage(),e);
}
if (payload != null) {
- TtnUplinkPacket packet = new TtnUplinkPacket(device, payload.getSubPackets());
+ List<TtnSubPacket> subPackets = payload.getSubPackets();
+ if(subPackets.isEmpty())
+ return;
+ int i=0;
+ while(i<subPackets.size()) {
+ TtnSubPacket subPacket = subPackets.get(i);
+ if(DOWNLINK_MARKER.equals(subPacket.getMetadata())) {
+ this.dowlinkListener.messageReceived(payload.getApplicationId(),payload.getDeviceId(), subPacket.getValue());
+ subPackets.remove(i);
+ continue;
+ }
+ i++;
+ }
+ TtnUplinkPacket packet = new TtnUplinkPacket(device, subPackets);
try {
endpoint.process(packet);
} catch (InvalidPacketException e) {
diff --git a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java
index 024a8fd..c031e3e 100644
--- a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java
+++ b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java
@@ -23,6 +23,7 @@
import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api.MqttBroker;
import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api.MqttTopic;
import org.eclipse.sensinact.gateway.sthbnd.ttn.listener.TtnActivationListener;
+import org.eclipse.sensinact.gateway.sthbnd.ttn.listener.TtnDownlinkListener;
import org.eclipse.sensinact.gateway.sthbnd.ttn.listener.TtnUplinkListener;
@SensiNactBridgeConfiguration(
@@ -58,12 +59,14 @@
.password(appKey)
.build();
- final MqttTopic activationTopic = new MqttTopic("+/devices/+/events/activations",
- new TtnActivationListener(mediator, (MqttProtocolStackEndpoint) super.endpoint));
+ final TtnDownlinkListener ttnDownlinkListener = new TtnDownlinkListener(mediator);
final MqttTopic messageTopic = new MqttTopic("+/devices/+/up",
- new TtnUplinkListener(mediator, (MqttProtocolStackEndpoint) super.endpoint));
+ new TtnUplinkListener(mediator, ttnDownlinkListener, (MqttProtocolStackEndpoint) super.endpoint));
+ final MqttTopic activationTopic = new MqttTopic("+/devices/+/events/activations",
+ new TtnActivationListener(mediator, (MqttProtocolStackEndpoint) super.endpoint));
+
MqttBroker broker = new MqttBroker.Builder()
.host(brokerHost)
.port(brokerPort)
@@ -72,8 +75,9 @@
.topics(new ArrayList<MqttTopic>() {{
add(activationTopic);
add(messageTopic);
- }}).build();
+ }}).build();
+ ttnDownlinkListener.setBroker(broker);
((MqttProtocolStackEndpoint)super.endpoint).addBroker(broker);
}
}
\ No newline at end of file