Handle downlink messages

Define QoS to 1 when pushishing (avoid to wait timeout error)
diff --git a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
index 764dc5c..c67d630 100644
--- a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
+++ b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
@@ -83,9 +83,7 @@
     public void publish(String topic, String message){
         try {
             LOG.info("Publishing message {} on the topic {}", message,topic);
-            MqttMessage mqMessage = new MqttMessage(message.getBytes());
-            mqMessage.setQos(0);
-            client.publish(topic, mqMessage);
+            client.publish(topic, message.getBytes(), 1, false);
         } catch (Exception e) {
             LOG.error("Unable to publishing message {} on the topic {}", message,topic);
         }
diff --git a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnDownlinkListener.java b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnDownlinkListener.java
new file mode 100644
index 0000000..d367e52
--- /dev/null
+++ b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnDownlinkListener.java
@@ -0,0 +1,55 @@
+/*

+* Copyright (c) 2020 Kentyou.

+ * All rights reserved. This program and the accompanying materials

+ * are made available under the terms of the Eclipse Public License v1.0

+ * which accompanies this distribution, and is available at

+ * http://www.eclipse.org/legal/epl-v10.html

+ *

+ * Contributors:

+*    Kentyou - initial API and implementation

+ */

+package org.eclipse.sensinact.gateway.sthbnd.ttn.listener;

+

+import java.util.Base64;

+

+import org.eclipse.sensinact.gateway.common.bundle.Mediator;

+import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api.MqttBroker;

+

+public class TtnDownlinkListener {

+

+    private final Mediator mediator;

+    private MqttBroker broker;

+

+    public TtnDownlinkListener(Mediator mediator) {

+        this.mediator = mediator;

+    }

+

+    public void setBroker(MqttBroker broker) {

+    	this.broker = broker;

+    }

+    

+    public void messageReceived(String applicationId, String deviceId, Object value) {

+    	byte[] bytes = null;

+    	String topic = String.format("%s/devices/%s/down",applicationId,deviceId);

+    	if(value != null) {

+    		if(value.getClass().isArray() && value.getClass().getComponentType() == byte.class) {

+    			bytes = (byte[])value;

+    		} else {

+    			if(value.getClass()==String.class)

+    				bytes = ((String)value).getBytes();

+    			else 

+    				bytes = String.valueOf(value).getBytes();	        			

+    		}

+		}

+    	if(bytes == null) {

+            if(mediator.isErrorLoggable()) 

+                mediator.error("Null downlink value ");

+    		return;

+    	}

+    	String message = String.format("{\"port\":1,\"confirmed\":true,\"payload_raw\":\"%s\"}", 

+				Base64.getEncoder().encodeToString(bytes));

+    	this.broker.publish(topic, message);

+        if(mediator.isDebugLoggable()) 

+            mediator.debug("Sent downlink message: " + message);

+    }

+}

diff --git a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java
index 2f4294c..9045173 100644
--- a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java
+++ b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/listener/TtnUplinkListener.java
@@ -1,5 +1,5 @@
 /*

-* Copyright (c) 2020 Kentyou.

+* Copyright (c) 2020-2021 Kentyou.

  * All rights reserved. This program and the accompanying materials

  * are made available under the terms of the Eclipse Public License v1.0

  * which accompanies this distribution, and is available at

@@ -10,11 +10,13 @@
  */

 package org.eclipse.sensinact.gateway.sthbnd.ttn.listener;

 

+import java.util.List;

+

 import org.eclipse.sensinact.gateway.common.bundle.Mediator;

 import org.eclipse.sensinact.gateway.generic.packet.InvalidPacketException;

-import org.eclipse.sensinact.gateway.sthbnd.mqtt.device.MqttPacket;

 import org.eclipse.sensinact.gateway.sthbnd.mqtt.device.MqttProtocolStackEndpoint;

 import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.listener.MqttTopicMessage;

+import org.eclipse.sensinact.gateway.sthbnd.ttn.model.TtnSubPacket;

 import org.eclipse.sensinact.gateway.sthbnd.ttn.model.TtnUplinkPayload;

 import org.eclipse.sensinact.gateway.sthbnd.ttn.packet.TtnUplinkPacket;

 import org.json.JSONException;

@@ -22,12 +24,16 @@
 

 public class TtnUplinkListener extends MqttTopicMessage {

 

+	public static final String DOWNLINK_MARKER = "#DOWNLINK#";

+	

     private final Mediator mediator;

     private final MqttProtocolStackEndpoint endpoint;

+	private TtnDownlinkListener dowlinkListener;

 

-    public TtnUplinkListener(Mediator mediator, MqttProtocolStackEndpoint endpoint) {

+    public TtnUplinkListener(Mediator mediator, TtnDownlinkListener downlinkListener, MqttProtocolStackEndpoint endpoint) {

         this.mediator = mediator;

         this.endpoint = endpoint;

+        this.dowlinkListener = downlinkListener;

     }

 

     /* (non-Javadoc)

@@ -36,9 +42,9 @@
     @Override

     public void messageReceived(String topic, String message) {

 

-        if(mediator.isDebugLoggable()) {

+        if(mediator.isDebugLoggable()) 

             mediator.debug("Uplink message: " + message);

-        }

+        

         String device = topic.split("/")[2];

         JSONObject json = new JSONObject(message);

         TtnUplinkPayload payload = null;

@@ -46,10 +52,24 @@
         try {

             payload = new TtnUplinkPayload(mediator, json);

         } catch (JSONException e) {

-            e.printStackTrace();

+            if(mediator.isErrorLoggable()) 

+                mediator.error(e.getMessage(),e);

         }

         if (payload != null) {

-            TtnUplinkPacket packet = new TtnUplinkPacket(device, payload.getSubPackets());

+        	List<TtnSubPacket> subPackets = payload.getSubPackets();

+        	if(subPackets.isEmpty())

+        		return;

+        	int i=0;

+        	while(i<subPackets.size()) {

+        		TtnSubPacket subPacket = subPackets.get(i);

+        		if(DOWNLINK_MARKER.equals(subPacket.getMetadata())) {

+        			this.dowlinkListener.messageReceived(payload.getApplicationId(),payload.getDeviceId(), subPacket.getValue());

+        			subPackets.remove(i);

+        			continue;

+        		}

+        		i++;

+        	}       	

+            TtnUplinkPacket packet = new TtnUplinkPacket(device, subPackets);

             try {

                 endpoint.process(packet);

             } catch (InvalidPacketException e) {

diff --git a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java
index 024a8fd..c031e3e 100644
--- a/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java
+++ b/platform/southbound/mqtt/ttn-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/ttn/osgi/TtnActivator.java
@@ -23,6 +23,7 @@
 import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api.MqttBroker;

 import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api.MqttTopic;

 import org.eclipse.sensinact.gateway.sthbnd.ttn.listener.TtnActivationListener;

+import org.eclipse.sensinact.gateway.sthbnd.ttn.listener.TtnDownlinkListener;

 import org.eclipse.sensinact.gateway.sthbnd.ttn.listener.TtnUplinkListener;

 

 @SensiNactBridgeConfiguration(

@@ -58,12 +59,14 @@
                 .password(appKey)

                 .build();

 

-        final MqttTopic activationTopic = new MqttTopic("+/devices/+/events/activations",

-                new TtnActivationListener(mediator, (MqttProtocolStackEndpoint) super.endpoint));

+        final TtnDownlinkListener ttnDownlinkListener = new TtnDownlinkListener(mediator);

 

         final MqttTopic messageTopic = new MqttTopic("+/devices/+/up",

-                new TtnUplinkListener(mediator, (MqttProtocolStackEndpoint) super.endpoint));

+                new TtnUplinkListener(mediator, ttnDownlinkListener, (MqttProtocolStackEndpoint) super.endpoint));

 

+        final MqttTopic activationTopic = new MqttTopic("+/devices/+/events/activations",

+                new TtnActivationListener(mediator, (MqttProtocolStackEndpoint) super.endpoint));

+        

         MqttBroker broker = new MqttBroker.Builder()

             .host(brokerHost)

             .port(brokerPort)

@@ -72,8 +75,9 @@
             .topics(new ArrayList<MqttTopic>() {{

                 add(activationTopic);

                 add(messageTopic);

-            }}).build();        

+            }}).build();

         

+        ttnDownlinkListener.setBroker(broker);

         ((MqttProtocolStackEndpoint)super.endpoint).addBroker(broker);

     }

 }
\ No newline at end of file