| /******************************************************************************* |
| * Copyright (c) 2008, 2014 IBM Corp. |
| * |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * Contributors: |
| * Ian Craggs - initial API and implementation and/or initial documentation |
| *******************************************************************************/ |
| |
| package org.eclipse.paho.mqttsn.gateway.client.udp; |
| |
| import java.io.IOException; |
| import java.net.DatagramPacket; |
| import java.net.DatagramSocket; |
| import java.net.InetAddress; |
| //import java.net.SocketException; |
| import java.util.Vector; |
| |
| import org.eclipse.paho.mqttsn.gateway.client.ClientInterface; |
| //import org.eclipse.paho.mqttsn.gateway.client.udp.UDPClientInterface.Forwarder; |
| import org.eclipse.paho.mqttsn.gateway.core.Dispatcher; |
| import org.eclipse.paho.mqttsn.gateway.exceptions.MqttsException; |
| import org.eclipse.paho.mqttsn.gateway.messages.Message; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsAdvertise; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsConnect; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsDisconnect; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsGWInfo; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsMessage; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPingReq; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPingResp; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPubComp; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPubRec; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPubRel; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPuback; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPublish; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsRegack; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsRegister; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsSearchGW; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsSubscribe; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsUnsubscribe; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsWillMsg; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsWillMsgUpd; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsWillTopic; |
| import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsWillTopicUpd; |
| import org.eclipse.paho.mqttsn.gateway.utils.ClientAddress; |
| import org.eclipse.paho.mqttsn.gateway.utils.GWParameters; |
| import org.eclipse.paho.mqttsn.gateway.utils.GatewayLogger; |
| import org.eclipse.paho.mqttsn.gateway.utils.Utils; |
| |
| /** |
| * This class implements a UDP interface to Mqtts clients.Implements the |
| * interface {@link org.eclipse.paho.mqttsn.gateway.client.ClientInterface}. |
| * For the reading functionality a reading thread is created. |
| * There is only one instance of this class. |
| * |
| */ |
| public class UDPClientInterface implements ClientInterface, Runnable { |
| |
| private DatagramSocket udpSocket; |
| private volatile boolean running; |
| private Thread readThread; |
| private Vector<Forwarder> forwarders; |
| private Dispatcher dispatcher; |
| private byte[] recData = new byte[512]; |
| |
| |
| /** |
| * This method initializes the interface.It creates an new UDP socket and |
| * a new thread for reading from the socket. |
| * @throws MqttsException |
| */ |
| public void initialize() throws MqttsException { |
| try { |
| //create the udp socket |
| udpSocket = new DatagramSocket(GWParameters.getUdpPort()); |
| |
| //get the Dispatcher |
| dispatcher = Dispatcher.getInstance(); |
| |
| forwarders = new Vector<Forwarder>(); |
| |
| //create thread for reading |
| this.readThread = new Thread (this, "UDPClientInterface"); |
| this.running = true; |
| this.readThread.start(); |
| } catch (Exception e) { |
| throw new MqttsException ("UDPClientInterface - Error initializing :" +e); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.paho.mqttsn.gateway.client.ClientInterface#broadcastMsg(org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsMessage) |
| */ |
| public void broadcastMsg(MqttsMessage msg) { |
| // GatewayLogger.log(GatewayLogger.INFO, "UDPClientInterface - Broadcasting Mqtts \"" +Utils.hexString(msg.toBytes())+"\" message to the network."); |
| for(int i = forwarders.size() - 1; i >= 0; i--) { |
| Forwarder fr = (Forwarder)forwarders.get(i); |
| //check also if this forwarder is inactive |
| if (System.currentTimeMillis() > fr.timeout) |
| forwarders.remove(i); |
| else{ |
| try { |
| byte[] wireMsg = msg.toBytes(); |
| byte[] data = new byte[wireMsg.length + 2]; |
| data[0] = (byte)0x00;//0x00 means broadcast to all network |
| data[1] = (byte)0x00; |
| System.arraycopy(wireMsg, 0, data, 2, wireMsg.length); |
| DatagramPacket packet = new DatagramPacket(data, data.length, fr.addr, fr.port); |
| udpSocket.send(packet); |
| } catch (IOException e) { |
| GatewayLogger.log(GatewayLogger.ERROR, "UDPClientInterface - Error while writing on the UDP socket."); |
| } |
| } |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.paho.mqttsn.gateway.client.ClientInterface#broadcastMsg(int, org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsMessage) |
| */ |
| public void broadcastMsg(int radius, MqttsMessage msg) { |
| // GatewayLogger.log(GatewayLogger.INFO, "UDPClientInterface - Broadcasting Mqtts \"" +Utils.hexString(msg.toBytes())+"\" message to the network with broadcast radius "+radius+"."); |
| for(int i = forwarders.size() - 1; i >= 0; i--) { |
| Forwarder fr = (Forwarder)forwarders.get(i); |
| //check also if this forwarder is inactive |
| if (System.currentTimeMillis() > fr.timeout) |
| forwarders.remove(i); |
| else{ |
| try { |
| byte[] wireMsg = msg.toBytes(); |
| byte[] data = new byte[wireMsg.length + 2]; |
| data[0] = (byte)radius;//broadcast to the specified radius |
| data[1] = (byte)0x00; |
| System.arraycopy(wireMsg, 0, data, 2, wireMsg.length); |
| DatagramPacket packet = new DatagramPacket(data, data.length, fr.addr, fr.port); |
| udpSocket.send(packet); |
| } catch (IOException e) { |
| GatewayLogger.log(GatewayLogger.ERROR, "UDPClientInterface - Error while writing on the UDP socket."); |
| } |
| } |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.paho.mqttsn.gateway.client.ClientInterface#readMsg() |
| */ |
| public void readMsg() { |
| DatagramPacket packet = new DatagramPacket(recData,0, recData.length); |
| try { |
| packet.setLength(recData.length); |
| udpSocket.receive(packet); |
| |
| //add the forwarder from which we received the message to the list |
| //if it is already on the list just update its timeout |
| Forwarder forw = new Forwarder(); |
| forw.addr = packet.getAddress(); |
| forw.port = packet.getPort(); |
| forw.timeout = System.currentTimeMillis() + GWParameters.getForwarderTimeout()*1000; |
| //GatewayLogger.log(GatewayLogger.INFO, "UDPClientInterface - New forwarder:addr = " + forw.addr+ " port = "+forw.port); |
| |
| boolean found = false; |
| for(int i = 0 ; i < forwarders.size(); i++) { |
| Forwarder fr = (Forwarder)forwarders.get(i); |
| if(forw.equals(fr)){ |
| found = true; |
| fr.timeout = System.currentTimeMillis() + GWParameters.getForwarderTimeout()*1000; |
| break; |
| } |
| } |
| if (!found) forwarders.add(forw); |
| |
| // if(packet.getLength() > 3) { // not a keep alive packet |
| byte[] data = new byte[packet.getLength()]; |
| System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); |
| |
| //old encaps v1.1 |
| // byte[] clAddr = new byte[data[1]]; //data[1] contains length of clAddr (wireless node id) |
| // System.arraycopy(data, 2, clAddr, 0, clAddr.length); |
| // ClientAddress address = new ClientAddress(clAddr, packet.getAddress(), packet.getPort()); |
| // byte[] mqttsData = new byte[data.length - clAddr.length - 2]; |
| // System.arraycopy(data, clAddr.length + 2, mqttsData, 0, mqttsData.length); |
| //end old encaps v1.1 |
| |
| byte[] mqttsData = null; |
| ClientAddress address = null; |
| |
| if (data[0] == (byte)0x00) { //old encaps v 1.1 |
| byte[] clAddr = new byte[data[1]]; //data[1] contains length of clAddr (wireless node id) |
| System.arraycopy(data, 2, clAddr, 0, clAddr.length); |
| byte[] encaps = new byte[data[1]+2]; |
| System.arraycopy(data, 0, encaps, 0, encaps.length); |
| address = new ClientAddress(clAddr, packet.getAddress(), packet.getPort(), true, encaps); |
| mqttsData = new byte[data.length - clAddr.length - 2]; |
| System.arraycopy(data, clAddr.length + 2, mqttsData, 0, mqttsData.length); |
| } else if (data[1] == (byte)MqttsMessage.ENCAPSMSG) { //new encaps v1.2 |
| //we have an encapsulated msg |
| byte[] clAddr = new byte[((int)data[0]&0xFF) - 3]; //data[0]: length of encaps |
| System.arraycopy(data, 3, clAddr, 0, clAddr.length); |
| byte[] encaps = new byte[data[0]]; |
| System.arraycopy(data, 0, encaps, 0, encaps.length); |
| address = new ClientAddress(clAddr, packet.getAddress(), packet.getPort(), true, encaps); |
| mqttsData = new byte[(int)data[data[0]]]; |
| System.arraycopy(data, data[0], mqttsData, 0, mqttsData.length); |
| } else { |
| //we have a non-encapsulated mqtts msg |
| //we will create an address out of the forwarder address |
| byte[] a1 = packet.getAddress().getAddress(); |
| byte[] a2 = new byte[2]; |
| a2[0] = (byte)((packet.getPort() >> 8) & 0xFF); |
| a2[1] = (byte) ( packet.getPort() & 0xFF); |
| byte[] clAddr = new byte[a1.length+a2.length]; |
| System.arraycopy(a1, 0, clAddr, 0, a1.length); |
| System.arraycopy(a2, 0, clAddr, a1.length, a2.length); |
| address = new ClientAddress(clAddr, packet.getAddress(), packet.getPort(), false, null); |
| mqttsData = new byte[(int)data[0]]; |
| System.arraycopy(data, 0, mqttsData, 0, mqttsData.length); |
| } |
| |
| |
| //start-just for the testing purposes we simulate here a network delay |
| //This will NOT be included in the final version |
| // try { |
| // Thread.sleep(10); |
| // } catch (InterruptedException e) { |
| // // TODO Auto-generated catch block |
| // e.printStackTrace(); |
| // } |
| |
| //end |
| decodeMsg(mqttsData,address); |
| // } |
| }catch (IOException ex){ |
| ex.printStackTrace(); |
| GatewayLogger.log(GatewayLogger.ERROR, "UDPClientInterface - An I/O error occurred while reading from the socket."); |
| } |
| } |
| |
| /** |
| * This method decodes the received Mqtts message and then constructs a |
| * general "internal" message {@link org.eclipse.paho.mqttsn.gateway.messages.Message} |
| * which puts it to Dispatcher's queue {@link org.eclipse.paho.mqttsn.gateway.core.Dispatcher. |
| * |
| * @param data The received Mqtts packet. |
| * @param address The address of the SA client. |
| */ |
| public void decodeMsg(byte[] data, ClientAddress address) { |
| MqttsMessage mqttsMsg = null; |
| |
| //do some checks for the received packet |
| if(data == null) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - The received data packet is null. The packet cannot be processed."); |
| return; |
| } |
| |
| if(data.length < GWParameters.getMinMqttsLength()) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts message. The received data packet is too short (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| |
| if(data.length > GWParameters.getMaxMqttsLength()){ |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts message. The received data packet is too long (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| |
| } |
| |
| if(data[0] < GWParameters.getMinMqttsLength()) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts message. Field \"Length\" in the received data packet is less than "+GWParameters.getMinMqttsLength()+" . The packet cannot be processed."); |
| return; |
| } |
| |
| if(data[0] != data.length) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts message. Field \"Length\" in the received data packet does not match the actual length of the packet. The packet cannot be processed."); |
| return; |
| } |
| |
| |
| int msgType = (data[1] & 0xFF); |
| switch (msgType) { |
| case MqttsMessage.ADVERTISE: |
| if(data.length != 5) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts ADVERTISE message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsAdvertise(data); |
| //TODO Handle this case for load balancing issues |
| break; |
| |
| case MqttsMessage.SEARCHGW: |
| if(data.length != 3) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts SEARCHGW message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsSearchGW(data); |
| break; |
| |
| case MqttsMessage.GWINFO: |
| mqttsMsg = new MqttsGWInfo(data); |
| //TODO Handle this case for load balancing issues |
| break; |
| |
| case MqttsMessage.CONNECT: |
| if(data.length < 7) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts CONNECT message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsConnect(data); |
| break; |
| |
| case MqttsMessage.CONNACK: |
| //we will never receive such a message from the client |
| break; |
| |
| case MqttsMessage.WILLTOPICREQ: |
| //we will never receive such a message from the client |
| break; |
| |
| case MqttsMessage.WILLTOPIC: |
| if(data.length < 2) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts WILLTOPIC message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsWillTopic(data); |
| break; |
| |
| case MqttsMessage.WILLMSGREQ: |
| //we will never receive such a message from the client |
| break; |
| |
| case MqttsMessage.WILLMSG: |
| if(data.length < 3) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts WILLMSG message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsWillMsg(data); |
| break; |
| |
| case MqttsMessage.REGISTER: |
| if(data.length < 7) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts REGISTER message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsRegister(data); |
| break; |
| |
| case MqttsMessage.REGACK: |
| if(data.length != 7) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts REGACK message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsRegack(data); |
| break; |
| |
| case MqttsMessage.PUBLISH: |
| if(data.length < 8) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts PUBLISH message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsPublish(data); |
| break; |
| |
| case MqttsMessage.PUBACK: |
| if(data.length != 7) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts PUBACK message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsPuback(data); |
| break; |
| |
| case MqttsMessage.PUBCOMP: |
| if(data.length != 4) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts PUBCOMP message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsPubComp(data); |
| break; |
| |
| case MqttsMessage.PUBREC: |
| if(data.length != 4) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts PUBREC message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsPubRec(data); |
| break; |
| |
| case MqttsMessage.PUBREL: |
| if(data.length != 4) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts PUBREL message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| mqttsMsg = new MqttsPubRel(data); |
| break; |
| |
| case MqttsMessage.SUBSCRIBE: |
| if(data.length < 6) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts SUBSCRIBE message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| |
| try { |
| mqttsMsg = new MqttsSubscribe(data); |
| } catch (MqttsException e) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts SUBSCRIBE message. "+e.getMessage()); |
| return; |
| } |
| break; |
| |
| case MqttsMessage.SUBACK: |
| //we will never receive such a message from the client |
| break; |
| |
| case MqttsMessage.UNSUBSCRIBE : |
| if(data.length < 6) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts UNSUBSCRIBE message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| |
| try { |
| mqttsMsg = new MqttsUnsubscribe(data); |
| } catch (MqttsException e) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts UNSUBSCRIBE message. "+e.getMessage()); |
| return; |
| } |
| break; |
| |
| case MqttsMessage.UNSUBACK: |
| //we will never receive such a message from the client |
| break; |
| |
| case MqttsMessage.PINGREQ: |
| mqttsMsg = new MqttsPingReq(data); |
| break; |
| |
| case MqttsMessage.PINGRESP: |
| mqttsMsg = new MqttsPingResp(data); |
| break; |
| |
| case MqttsMessage.DISCONNECT : |
| mqttsMsg = new MqttsDisconnect(data); |
| break; |
| |
| case MqttsMessage.WILLTOPICUPD: |
| if(data.length < 2) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts WILLTOPICUPD message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| |
| mqttsMsg = new MqttsWillTopicUpd(data); |
| break; |
| |
| case MqttsMessage.WILLTOPICRESP: |
| //we will never receive such a message from the client |
| break; |
| |
| case MqttsMessage.WILLMSGUPD: |
| if(data.length < 3) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Not a valid Mqtts WILLMSGUPD message. Wrong packet length (length = "+data.length +"). The packet cannot be processed."); |
| return; |
| } |
| |
| mqttsMsg = new MqttsWillMsgUpd(data); |
| break; |
| |
| case MqttsMessage.WILLMSGRESP: |
| //we will never receive such a message from the client |
| break; |
| |
| default: |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - Mqtts message of unknown type \"" + msgType+"\" received."); |
| return; |
| } |
| |
| //construct an "internal" message and put it to dispatcher's queue |
| Message msg = new Message(address); |
| msg.setType(Message.MQTTS_MSG); |
| msg.setMqttsMessage(mqttsMsg); |
| msg.setClientInterface(this); |
| this.dispatcher.putMessage(msg); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.eclipse.paho.mqttsn.gateway.client.ClientInterface#sendMsg(org.eclipse.paho.mqttsn.gateway.utils.SAaddress, org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsMessage) |
| */ |
| public void sendMsg(ClientAddress address, MqttsMessage msg) { |
| // GatewayLogger.log(GatewayLogger.INFO, "UDPClientInterface - Sending Mqtts \"" + Utils.hexString(msg.toBytes())+ "\" message to the client with address \"" +Utils.hexString(address.getAddress())+"\"."); |
| |
| if(address == null) { |
| GatewayLogger.log(GatewayLogger.WARN, "UDPClientInterface - The address of the receiver is null.The Mqtts message " + Utils.hexString(msg.toBytes())+ " cannot be sent."); |
| return; |
| } |
| |
| try { |
| byte[] addr = address.getAddress(); |
| byte[] wireMsg = msg.toBytes(); |
| |
| //old encaps v1.1 |
| // byte[] data = new byte[wireMsg.length + addr.length + 2]; |
| // data[0] = (byte)0x00; |
| // data[1] = (byte)addr.length; |
| // System.arraycopy(addr, 0, data, 2, addr.length); |
| // System.arraycopy(wireMsg, 0, data, addr.length + 2, wireMsg.length); |
| //end old encaps v1.1 |
| |
| //new encaps v1.2 |
| byte[] data = null; |
| if (address.isEncaps()) { |
| // byte[] encaps = new byte[3+addr.length]; |
| // encaps[0] = (byte)(addr.length + 3); |
| // encaps[1] = (byte) MqttsMessage.ENCAPSMSG; |
| // encaps[2] = 0x00; |
| byte[] encaps = address.getEncaps(); |
| System.arraycopy(addr, 0, encaps, 0, addr.length); |
| data = new byte[encaps.length + wireMsg.length]; |
| System.arraycopy(encaps, 0, data, 0, encaps.length); |
| System.arraycopy(wireMsg, 0, data, encaps.length, wireMsg.length); |
| } else { |
| data = wireMsg; |
| } |
| //end new encaps v1.2 |
| |
| DatagramPacket packet = new DatagramPacket(data, data.length, address.getIPaddress(), address.getPort()); |
| udpSocket.send(packet); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| GatewayLogger.log(GatewayLogger.ERROR, "UDPClientInterface - Error while writing on the UDP socket."); |
| } |
| } |
| |
| |
| /* (non-Javadoc) |
| * @see java.lang.Runnable#run() |
| */ |
| public void run() { |
| while (running) { |
| readMsg(); |
| } |
| } |
| |
| |
| /** |
| * This class represents a forwarder which is defined in the specifications of the |
| * Mqtts protocol. |
| */ |
| public static class Forwarder { |
| private InetAddress addr = null; |
| private int port = 0; |
| private long timeout = 0; |
| |
| public boolean equals(Object o) { |
| boolean same = false; |
| if(o == null) { |
| same = false; |
| } else if(o instanceof Forwarder) { |
| Forwarder fr = (Forwarder)o; |
| if(addr != null && addr.equals(fr.addr) && fr.port == port) { |
| same = true; |
| } |
| } |
| return same; |
| } |
| } |
| } |