/******************************************************************************* | |
* Copyright (c) 2008, 2014 IBM Corp. | |
* | |
* All rights reserved. This program and the accompanying materials | |
* are made available under the terms of the Eclipse Public License v1.0 | |
* and Eclipse Distribution License v1.0 which accompany this distribution. | |
* | |
* The Eclipse Public License is available at | |
* http://www.eclipse.org/legal/epl-v10.html | |
* and the Eclipse Distribution License is available at | |
* http://www.eclipse.org/org/documents/edl-v10.php. | |
* | |
* Contributors: | |
* Ian Craggs - initial API and implementation and/or initial documentation | |
*******************************************************************************/ | |
package org.eclipse.paho.mqttsn.gateway.broker.tcp; | |
import java.io.DataInputStream; | |
import java.io.DataOutputStream; | |
import java.io.EOFException; | |
import java.io.IOException; | |
import java.io.InterruptedIOException; | |
import java.net.Socket; | |
import java.net.UnknownHostException; | |
import org.eclipse.paho.mqttsn.gateway.broker.BrokerInterface; | |
import org.eclipse.paho.mqttsn.gateway.core.Dispatcher; | |
import org.eclipse.paho.mqttsn.gateway.exceptions.MqttsException; | |
import org.eclipse.paho.mqttsn.gateway.messages.Message; | |
import org.eclipse.paho.mqttsn.gateway.messages.control.ControlMessage; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttConnack; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttMessage; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPingReq; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPingResp; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPubComp; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPubRec; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPubRel; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPuback; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPublish; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttSuback; | |
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttUnsuback; | |
import org.eclipse.paho.mqttsn.gateway.utils.Address; | |
import org.eclipse.paho.mqttsn.gateway.utils.GWParameters; | |
import org.eclipse.paho.mqttsn.gateway.utils.GatewayLogger; | |
import org.eclipse.paho.mqttsn.gateway.utils.Utils; | |
/** | |
* This class represents the interface to the broker and is instantiated by the | |
* MessageHandler.Is is used for opening a TCP/IP connection with the broker | |
* and sending/receiving Mqtt Messages. | |
* For the reading functionality a reading thread is created. | |
* For every client there is one instance of this class. | |
* | |
* @see com.ibm.zurich.core.ClientMsgHandler | |
* | |
* Parts of this code were imported from com.ibm.mqttdirect.modules.common.StreamDeframer.java | |
* | |
* | |
*/ | |
public class TCPBrokerInterface implements BrokerInterface, Runnable { | |
private DataInputStream streamIn = null; | |
private DataOutputStream streamOut = null; | |
private Socket socket; | |
private Address address; | |
private String brokerURL; | |
private int port; | |
private String clientId; | |
private volatile boolean running; | |
private Thread readThread; | |
private Dispatcher dispatcher; | |
//the maximum length of a Mqtt fixed header | |
public static final int MAX_HDR_LENGTH = 5; | |
//the maximum length of the remaining part of a Mqtt message | |
public static final int MAX_MSG_LENGTH = 268435455; | |
/** | |
* Constructor of the broker interface. | |
*/ | |
public TCPBrokerInterface(Address address) { | |
this.address = address; | |
this.brokerURL = GWParameters.getBrokerURL(); | |
this.port = GWParameters.getBrokerTcpPort(); | |
this.running = false; | |
this.readThread = null; | |
this.dispatcher = Dispatcher.getInstance(); | |
} | |
/** | |
* This method opens the TCP/IP connection with the broker and creates | |
* a new thread for reading from the socket. | |
* | |
* @throws MqttsException | |
*/ | |
public void initialize() throws MqttsException{ | |
try { | |
socket = new Socket(brokerURL, port); | |
streamIn = new DataInputStream(socket.getInputStream()); | |
streamOut = new DataOutputStream(socket.getOutputStream()); | |
} catch (UnknownHostException e) { | |
disconnect(); | |
throw new MqttsException(e.getMessage()); | |
} catch (IOException e) { | |
disconnect(); | |
throw new MqttsException(e.getMessage()); | |
} | |
//create thread for reading | |
this.readThread = new Thread (this, "BrokerInterface"); | |
this.running = true; | |
this.readThread.start(); | |
} | |
/** | |
* This method sends a Mqtt message to the broker over the already established | |
* TCP/IP connection.Before that, converts the message to byte array calling | |
* the method {@link org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttMessage#toBytes()}. | |
* | |
* @param message The MqttMessage to be send to the broker. | |
* @throws MqttsException | |
*/ | |
public void sendMsg(MqttMessage message) throws MqttsException{ | |
// send the message over the TCP/IP socket | |
if (this.streamOut != null) { | |
try { | |
//System.out.println(">> sending msg: " + Utils.hexString(message.toBytes())); | |
this.streamOut.write(message.toBytes()); | |
this.streamOut.flush(); | |
} catch (IOException e) { | |
disconnect(); | |
throw new MqttsException(e.getMessage()); | |
} | |
}else{ | |
disconnect(); | |
throw new MqttsException("Writing stream is null!"); | |
} | |
} | |
/** | |
* This method is used for reading a Mqtt message from the socket.It blocks on the | |
* reading stream until a message arrives. | |
*/ | |
public void readMsg(){ | |
byte [] body = null; | |
// read the header from the input stream | |
MqttHeader hdr = new MqttHeader(); | |
hdr.header = new byte[MAX_HDR_LENGTH]; | |
if (this.streamIn == null){ | |
return; | |
} | |
try{ | |
int res = streamIn.read(); | |
hdr.header[0]=(byte) res; | |
hdr.headerLength=1; | |
if(res==-1) { | |
// if EOF detected | |
throw new EOFException(); | |
} | |
// read the Mqtt length | |
int multiplier = 1; | |
hdr.remainingLength=0; | |
do { | |
//read MsgLength bytes | |
res = streamIn.read(); | |
if(res==-1) { | |
// if EOF detected. | |
throw new EOFException(); | |
} | |
hdr.header[hdr.headerLength++] = (byte) res; | |
hdr.remainingLength += (res & 127) * multiplier; | |
multiplier *= 128; | |
} while ((res & 128) != 0 && hdr.headerLength<MAX_HDR_LENGTH); | |
//some checks | |
if (hdr.headerLength > MAX_HDR_LENGTH || hdr.remainingLength > MAX_MSG_LENGTH || hdr.remainingLength < 0) { | |
GatewayLogger.log(GatewayLogger.WARN, "TCPBrokerInterface ["+Utils.hexString(this.address.getAddress())+"]/["+clientId+"] - Not a valid Mqtts message."); | |
return; | |
} | |
body = new byte[hdr.remainingLength+hdr.headerLength]; | |
for (int i = 0; i < hdr.headerLength; i++) { | |
body[i] = hdr.header[i]; | |
} | |
if (hdr.remainingLength >= 0) { | |
streamIn.readFully(body, hdr.headerLength, hdr.remainingLength); | |
} | |
//start:just for the testing purposes we simulate here a network delay | |
//TODO This will NOT be included in the final version | |
try { | |
Thread.sleep(20); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
//end | |
if(body!=null) | |
decodeMsg(body); | |
}catch(IOException e){ | |
if(e instanceof InterruptedIOException) { | |
//do nothing | |
}else if(this.running == true){ | |
//an error occurred | |
//stop the reading thread | |
this.running = false; | |
//generate a control message | |
ControlMessage controlMsg = new ControlMessage(); | |
controlMsg.setMsgType(ControlMessage.CONNECTION_LOST); | |
//construct an "internal" message and put it to dispatcher's queue | |
//@see org.eclipse.paho.mqttsn.gateway.core.Message | |
Message msg = new Message(this.address); | |
msg.setType(Message.CONTROL_MSG); | |
msg.setControlMessage(controlMsg); | |
this.dispatcher.putMessage(msg); | |
} | |
} | |
} | |
/** | |
* This method is used for decoding the received Mqtt message from the broker. | |
* @param data The Mqtt message as it was received from the socket (byte array). | |
*/ | |
private void decodeMsg(byte[] data){ | |
MqttMessage mqttMsg = null; | |
int msgType = (data[0] >>> 4) & 0x0F; | |
switch (msgType) { | |
case MqttMessage.CONNECT: | |
// we will never receive such a message from the broker | |
break; | |
case MqttMessage.CONNACK: | |
mqttMsg = new MqttConnack(data); | |
break; | |
case MqttMessage.PUBLISH: | |
mqttMsg = new MqttPublish(data); | |
break; | |
case MqttMessage.PUBACK: | |
mqttMsg = new MqttPuback(data); | |
break; | |
case MqttMessage.PUBREC: | |
mqttMsg = new MqttPubRec(data); | |
break; | |
case MqttMessage.PUBREL: | |
mqttMsg = new MqttPubRel(data); | |
break; | |
case MqttMessage.PUBCOMP: | |
mqttMsg = new MqttPubComp(data); | |
break; | |
case MqttMessage.SUBSCRIBE: | |
//we will never receive such a message from the broker | |
break; | |
case MqttMessage.SUBACK: | |
mqttMsg = new MqttSuback(data); | |
break; | |
case MqttMessage.UNSUBSCRIBE: | |
//we will never receive such a message from the broker | |
break; | |
case MqttMessage.UNSUBACK: | |
mqttMsg = new MqttUnsuback(data); | |
break; | |
case MqttMessage.PINGREQ: | |
mqttMsg = new MqttPingReq(data); | |
break; | |
case MqttMessage.PINGRESP: | |
mqttMsg = new MqttPingResp(data); | |
break; | |
case MqttMessage.DISCONNECT: | |
//we will never receive such a message from the broker | |
break; | |
default: | |
GatewayLogger.log(GatewayLogger.WARN, "TCPBrokerInterface ["+Utils.hexString(this.address.getAddress())+"]/["+clientId+"] - Mqtt message of unknown type \"" + msgType+"\" received."); | |
break; | |
} | |
//construct an "internal" message and put it to dispatcher's queue | |
//@see org.eclipse.paho.mqttsn.gateway.core.Message | |
Message msg = new Message(this.address); | |
msg.setType(Message.MQTT_MSG); | |
msg.setMqttMessage(mqttMsg); | |
this.dispatcher.putMessage(msg); | |
} | |
/** | |
*/ | |
public void disconnect() { | |
//stop the reading thread (if any) | |
this.running = false; | |
//close the out stream | |
if (this.streamOut != null) { | |
try { | |
this.streamOut.flush(); | |
this.streamOut.close(); | |
} catch (IOException e) { | |
// ignore it | |
} | |
this.streamOut = null; | |
} | |
//close the in stream | |
if (this.streamIn != null) { | |
try { | |
this.streamIn.close(); | |
} catch (IOException e) { | |
// ignore it | |
} | |
streamIn = null; | |
} | |
//close the socket | |
if (socket != null) { | |
try { | |
socket.close(); | |
} catch (IOException e) { | |
} | |
socket = null; | |
} | |
} | |
/* (non-Javadoc) | |
* @see java.lang.Runnable#run() | |
*/ | |
public void run() { | |
while (running) { | |
readMsg(); | |
} | |
} | |
/** | |
* @param running | |
*/ | |
public void setRunning(boolean running) { | |
this.running = running; | |
} | |
/** | |
* @param clientId | |
*/ | |
public void setClientId(String clientId) { | |
this.clientId = clientId; | |
} | |
/** | |
* This class represents a Mqtt header and is used for decoding a Mqtt message | |
* from the broker. | |
*/ | |
public static class MqttHeader { | |
public byte[] header; | |
public int remainingLength; | |
public int headerLength; | |
} | |
} |