blob: 166acc4c741a9ecd30e43e082d048f08ab5e6118 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2010, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
package org.eclipse.paho.mqttsn.udpclient;
import java.net.InetAddress;
import java.net.UnknownHostException;
//import java.util.Hashtable;
import org.eclipse.paho.mqttsn.udpclient.exceptions.MqttsException;
import org.eclipse.paho.mqttsn.udpclient.messages.Message;
import org.eclipse.paho.mqttsn.udpclient.messages.control.ControlMessage;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsConnack;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsConnect;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsDisconnect;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsMessage;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPingReq;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPingResp;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPubComp;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPubRec;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPubRel;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPuback;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPublish;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsRegack;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsRegister;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsSuback;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsSubscribe;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsUnsuback;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsUnsubscribe;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillMsg;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillMsgReq;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillMsgResp;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillTopic;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillTopicReq;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillTopicResp;
import org.eclipse.paho.mqttsn.udpclient.timer.TimerService;
import org.eclipse.paho.mqttsn.udpclient.udp.UDPInterface;
import org.eclipse.paho.mqttsn.udpclient.utils.ClientLogger;
import org.eclipse.paho.mqttsn.udpclient.utils.ClientParameters;
import org.eclipse.paho.mqttsn.udpclient.utils.MsgQueue;
import org.eclipse.paho.mqttsn.udpclient.utils.Utils;
public class MqttsClient implements Runnable{
public static final String version="140217";
private static final int MQTTS_BACKUP_MESSAGE = 0x00;
private static final int MQTTS_BACKUP_SEND_MESSAGE = 0x01;
private MqttsMessage message = null;
private int msgId;
private ClientState clState;
private volatile boolean running;
private boolean lostGw = false;
private MqttsCallback callback = null;
private UDPInterface udpInterface = null;
private MsgQueue queue = null;
private Thread readThread = null;
private TimerService timer = null;
private ClientParameters clientParms = null;
private String clientid;
private boolean will;
private boolean cleanstart;
private int keepalive;
private String willtopic;
private int willQoS;
private String willmsg;
private boolean willretain;
private boolean autoReconnect = false;
int ackMissedCounter = 0;
public MqttsClient(String gatewayAddress, int gatewayPort, int maxMqttsMsgLength, int minMqttsMsgLength,
int maxRetries, int ackWaitingTime, boolean autoReconnect){
InetAddress adr = null;
try {
adr = InetAddress.getByName(gatewayAddress);
clientParms = new ClientParameters();
clientParms.setGatewayAddress(adr);
clientParms.setGatewayPort(gatewayPort);
clientParms.setMaxMqttsLength(maxMqttsMsgLength);
clientParms.setMinMqttsLength(minMqttsMsgLength);
clientParms.setMaxRetries(maxRetries);
clientParms.setWaitingTime(ackWaitingTime);
this.clState = ClientState.NOT_ACTIVE;
this.msgId = 1;
this.autoReconnect= autoReconnect;
queue = new MsgQueue();
timer = new TimerService(queue);
udpInterface = new UDPInterface();
udpInterface.initialize(queue, clientParms);
//create thread for reading
this.readThread = new Thread (this, "MqttsClient");
this.running = true;
this.readThread.start();
String s = null;
if (UDPInterface.ENCAPS) s=" with "; else s=" without ";
ClientLogger.log(ClientLogger.INFO, "MQTT-S client version "+
version + s + ("encapsulation started ..."));
System.out.println("MQTT-S client version "+ version + s + ("encapsulation started ..."));
} catch (MqttsException e) {
ClientLogger.log(ClientLogger.ERROR, ""+e);
e.printStackTrace();
} catch (UnknownHostException e) {
ClientLogger.log(ClientLogger.ERROR, ""+e);
e.printStackTrace();
}
}
public MqttsClient(String gatewayAddress, int gatewayPort,
int maxMqttsMsgLength, int minMqttsMsgLength,
int maxRetries, int ackWaitingTime) {
this(gatewayAddress,gatewayPort,maxMqttsMsgLength,minMqttsMsgLength,
maxRetries,ackWaitingTime,false);
}
public MqttsClient(String gatewayAddress, int gatewayPort) {
this(gatewayAddress,gatewayPort,
256, //max mqtts message length
2, //min mqtts message length
2, //max number retries
5, //ack waiting time
false); //auto reconnect
}
public MqttsClient(String gatewayAddress, int gatewayPort, boolean auto) {
this(gatewayAddress,gatewayPort,
256, //max mqtts message length
2, //min mqtts message length
2, //max number retries
5, //ack waiting time
auto); //auto reconnect
}
/**
* Registers the callback handler of the application.
* This handler is used to notify the app about the completion of async events.
*/
public void registerHandler(MqttsCallback handler) {
this.callback = handler;
this.clState = ClientState.WAITING_CONNECT;
}
public boolean connect(String clientid, boolean cleanstart, int keepalive,
String willtopic, int willQoS, String willmsg, boolean willretain) {
if (this.clState != ClientState.WAITING_CONNECT) {
ClientLogger.log(ClientLogger.WARN, "connect() ignored!");
//System.out.println("mqttsClient>> connect ignored");
callback.disconnected(MqttsCallback.MQTTS_ERR_STACK_NOT_READY);
return false;
}
this.clientid = clientid;
this.cleanstart = cleanstart;
this.keepalive = keepalive;
this.will = true;
this.willtopic = willtopic;
this.willQoS = willQoS;
this.willmsg = willmsg;
this.willretain = willretain;
clState=ClientState.CONNECTING_TO_GW;
mqtts_connecting(this.clientid, this.will, this.cleanstart, this.keepalive);
return true;
}
public boolean connect(String clientid, boolean cleanstart, short keepalive){
if (this.clState != ClientState.WAITING_CONNECT) {
ClientLogger.log(ClientLogger.WARN, "connect() ignored!");
callback.disconnected(MqttsCallback.MQTTS_ERR_STACK_NOT_READY);
return false;
}
this.clientid = clientid;
this.cleanstart = cleanstart;
this.keepalive = keepalive;
this.will = false;
this.clState= ClientState.CONNECTING_TO_GW;
mqtts_connecting(this.clientid, this.will, this.cleanstart, this.keepalive);
return true;
}
public boolean disconnect(){
MqttsDisconnect msg = new MqttsDisconnect();
switch (this.clState) {
case NOT_ACTIVE:
case WAITING_CONNECT:
ClientLogger.log(ClientLogger.WARN,"already disconnected, disconnect() ignored");
break;
case CONNECTING_TO_GW:
case SEARCHING_GW:
// no need for sending a DISC since we are not connected
this.clState = ClientState.WAITING_CONNECT;
timer.unregisterAll();
callback.disconnected(MqttsCallback.MQTTS_OK);
break;
case READY:
case WAITING_ACK:
// send DISC to gw and wait for DISC
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.unregister(ControlMessage.KEEP_ALIVE);
this.clState = ClientState.DISCONNECTING;
ClientLogger.log(ClientLogger.INFO,"DISCONNECT sent: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
break;
default:
break;
}
return true;
}
public boolean publish(int topicId, byte[] message, int qos, boolean retain){
return publish(0, topicId, message, qos, retain);
}
public boolean publish(int topicIdType, int topicId, byte[] message, int qos, boolean retain){
if (this.clState != ClientState.READY) {
ClientLogger.log(ClientLogger.WARN, "client not ready, publish() ignored! "+"Client state = "+ this.clState);
return false;
}
MqttsPublish msg = new MqttsPublish();
switch (qos) {
case 2:
msg.setQos(qos);
int id = getNewMsgId();
msg.setMsgId(id);
break;
case 1:
msg.setQos(qos);
id = getNewMsgId();
msg.setMsgId(id);
break;
default:
msg.setQos(qos);
msg.setMsgId(0);
break;
}
msg.setRetain(retain);
msg.setTopicId(topicId);
msg.setTopicIdType(topicIdType);
msg.setData(message);
if(qos !=0){
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
clState= ClientState.WAITING_ACK;
}
/* Send the message */
ClientLogger.log(ClientLogger.INFO, "Send PUBLISH to gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
return true;
}
public boolean subscribe(String topicName, int qos, int topicIdType){
if (this.clState != ClientState.READY) {
ClientLogger.log(ClientLogger.WARN, "client not ready, subscribe() ignored!");
return false;
}
MqttsSubscribe msg = new MqttsSubscribe();
msg.setQos(qos);
if (topicIdType == MqttsMessage.TOPIC_NAME){
msg.setTopicIdType(MqttsMessage.TOPIC_NAME);
msg.setTopicName(topicName);
}
if (topicIdType == MqttsMessage.SHORT_TOPIC_NAME){
msg.setTopicIdType(MqttsMessage.SHORT_TOPIC_NAME);
msg.setShortTopicName(topicName);
}
int id = getNewMsgId();
msg.setMsgId(id);
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
clState= ClientState.WAITING_ACK;
/* Send the message */
ClientLogger.log(ClientLogger.INFO, "Send SUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
return true;
}
public boolean subscribe(int topicId, int qos){
if (this.clState != ClientState.READY) {
ClientLogger.log(ClientLogger.WARN, "client not ready, subscribe2() ignored!");
return false;
}
MqttsSubscribe msg = new MqttsSubscribe();
msg.setQos(qos);
msg.setTopicIdType(MqttsMessage.PREDIFINED_TOPIC_ID);
msg.setPredefinedTopicId(topicId);
int id = getNewMsgId();
msg.setMsgId(id);
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
clState= ClientState.WAITING_ACK;
/* Send the message */
ClientLogger.log(ClientLogger.INFO, "Send SUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
return true;
}
public boolean unSubscribe(String topicName, int topicIdType){
if (this.clState != ClientState.READY) {
ClientLogger.log(ClientLogger.WARN, "client not ready, unsubscribe1() ignored!");
return false;
}
MqttsUnsubscribe msg= new MqttsUnsubscribe();
if (topicIdType == MqttsMessage.TOPIC_NAME){
msg.setTopicIdType(MqttsMessage.TOPIC_NAME);
msg.setTopicName(topicName);
}
if (topicIdType == MqttsMessage.SHORT_TOPIC_NAME){
msg.setTopicIdType(MqttsMessage.SHORT_TOPIC_NAME);
msg.setShortTopicName(topicName);
}
int id = getNewMsgId();
msg.setMsgId(id);
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
clState= ClientState.WAITING_ACK;
/* Send the message */
ClientLogger.log(ClientLogger.INFO, "Send UNSUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
return true;
}
public boolean unSubscribe(int topicId){
if (this.clState != ClientState.READY) {
ClientLogger.log(ClientLogger.WARN, "client not ready, subscribe2() ignored!");
return false;
}
MqttsUnsubscribe msg= new MqttsUnsubscribe();
msg.setTopicIdType(MqttsMessage.PREDIFINED_TOPIC_ID);
msg.setPredefinedTopicId(topicId);
int id = getNewMsgId();
msg.setMsgId(id);
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
clState= ClientState.WAITING_ACK;
/* Send the message */
ClientLogger.log(ClientLogger.INFO, "Send UNSUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
return true;
}
public boolean register(String topicName){
if (this.clState != ClientState.READY) {
ClientLogger.log(ClientLogger.WARN, "client not ready, register() ignored! "+this.clState);
return false;
}
MqttsRegister msg = new MqttsRegister();
msg.setTopicId(0);
int msgId = getNewMsgId();
msg.setMsgId(msgId);
msg.setTopicName(topicName);
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
/* we waiting for REGACK */
clState= ClientState.WAITING_ACK;
/* Send the message */
ClientLogger.log(ClientLogger.INFO, "Send REGISTER to gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
return true;
}
public void terminate(){
// terminate udp reader
ClientLogger.log(ClientLogger.INFO, "Closing UDP ...");
this.udpInterface.terminate();
// unregister all timers
ClientLogger.log(ClientLogger.INFO, "Stopping all timers ...");
this.timer.terminate();
// stop mqtts client
ClientLogger.log(ClientLogger.INFO, "Stopping client and closing queue ...");
this.running = false;
this.queue.close();
// wait until thread is terminated.
try {
this.readThread.join();
ClientLogger.log(ClientLogger.INFO, "Client terminated ...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ClientParameters getClientParameters() {
return clientParms;
}
public void setLogfile(String filename) {
try {
ClientLogger.setLogFile(filename);
} catch (MqttsException e) {
System.err.println("MqttsClient: cannot set log filename");
e.printStackTrace();
}
}
public void setLogLevel(int level) {
ClientLogger.setLogLevel(level);
}
public void setWaitingTime(int t) {
clientParms.setWaitingTime(t);
}
public int getLocalUDPPort() {
return udpInterface.getUdpPort();
}
/******************************************************************************************/
/** HANDLING OF MQTTS MESSAGES **/
/****************************************************************************************/
private void handleMqttsMessage(MqttsMessage receivedMsg){
if (clState == ClientState.NOT_ACTIVE) {
ClientLogger.log(ClientLogger.WARN, "Client not started yet, received msg ingored.");
return;
}
//get the type of the Mqtts message and handle the message according to that type
switch(receivedMsg.getMsgType()){
case MqttsMessage.ADVERTISE:
//handleMqttsAdvertise((MqttsAdvertise) receivedMsg);
break;
case MqttsMessage.SEARCHGW:
//handleMqttsSearchGW((MqttsSearchGW) receivedMsg);
break;
case MqttsMessage.GWINFO:
//handleMqttsGWInfo((MqttsGWInfo) receivedMsg);
break;
case MqttsMessage.CONNECT:
//we will never receive such a message from the gateway
ClientLogger.log(ClientLogger.WARN, "CONNECT received and ignored!!!");
break;
case MqttsMessage.CONNACK:
handleMqttsConnack((MqttsConnack) receivedMsg);
break;
case MqttsMessage.WILLTOPICREQ:
handleMqttsWillTopicReq((MqttsWillTopicReq) receivedMsg);
break;
case MqttsMessage.WILLTOPIC:
ClientLogger.log(ClientLogger.WARN, "WILLTOPIC received and ignored!!!");
break;
case MqttsMessage.WILLMSGREQ:
handleMqttsWillMsgReq((MqttsWillMsgReq) receivedMsg);
break;
case MqttsMessage.WILLMSG:
ClientLogger.log(ClientLogger.WARN, "WILLMSG received and ignored!!!");
break;
case MqttsMessage.REGISTER:
handleMqttsRegister((MqttsRegister)receivedMsg);
break;
case MqttsMessage.REGACK:
handleMqttsRegack((MqttsRegack) receivedMsg);
break;
case MqttsMessage.PUBLISH:
handleMqttsPublish((MqttsPublish) receivedMsg);
break;
case MqttsMessage.PUBACK:
handleMqttsPuback((MqttsPuback) receivedMsg);
break;
case MqttsMessage.PUBCOMP:
handleMqttsPubComp((MqttsPubComp) receivedMsg);
break;
case MqttsMessage.PUBREC:
handleMqttsPubRec((MqttsPubRec) receivedMsg);
break;
case MqttsMessage.PUBREL:
handleMqttsPubRel((MqttsPubRel) receivedMsg);
break;
case MqttsMessage.SUBSCRIBE:
ClientLogger.log(ClientLogger.WARN, "SUBSCRIBE received and ignored!!!");
break;
case MqttsMessage.SUBACK:
handleMqttsSuback((MqttsSuback) receivedMsg);
break;
case MqttsMessage.UNSUBSCRIBE:
ClientLogger.log(ClientLogger.WARN, "UNSUBSCRIBE and ignored received !!!");
break;
case MqttsMessage.UNSUBACK:
handleMqttsUnsuback((MqttsUnsuback) receivedMsg);
break;
case MqttsMessage.PINGREQ:
handleMqttsPingReq((MqttsPingReq) receivedMsg);
break;
case MqttsMessage.PINGRESP:
handleMqttsPingResp((MqttsPingResp) receivedMsg);
break;
case MqttsMessage.DISCONNECT:
handleMqttsDisconnect((MqttsDisconnect) receivedMsg);
break;
case MqttsMessage.WILLTOPICUPD:
ClientLogger.log(ClientLogger.WARN, "WILLTOPICUPD received and ignored!!!");
break;
case MqttsMessage.WILLTOPICRESP:
handleMqttsWillTopicResp((MqttsWillTopicResp) receivedMsg);
break;
case MqttsMessage.WILLMSGUPD:
ClientLogger.log(ClientLogger.WARN, "WILLMSGUPD received and ignored!!!");
break;
case MqttsMessage.WILLMSGRESP:
handleMqttsWillMsgResp((MqttsWillMsgResp) receivedMsg);
break;
default:
ClientLogger.log(ClientLogger.WARN, "MQTT-S message of unknown type \""
+ receivedMsg.getMsgType()+"\" received and ignored!!!");
break;
}
}
private void handleMqttsWillTopicResp(MqttsWillTopicResp receivedMsg) {
// TODO Auto-generated method stub
}
private void handleMqttsWillMsgResp(MqttsWillMsgResp receivedMsg) {
// TODO Auto-generated method stub
}
private void ack_rx() {
timer.unregister(ControlMessage.ACK);
clState= ClientState.READY;
if (lostGw) {
callback.connected();
lostGw = false;
}
}
private void handleMqttsDisconnect(MqttsDisconnect receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "DISCONNECT received: " +
Utils.hexString(receivedMsg.toBytes()));
timer.unregister(ControlMessage.ACK);
timer.unregister(ControlMessage.KEEP_ALIVE);
//hlt 19.03.2009 Because we cannot distinguish
//between a new gateway => client data still valid at broker
//or a broker restart => client data deleted
//it is better to inform app so that app can do a restart
//e.g. reissue register and subscriptions
clState = ClientState.WAITING_CONNECT;
ClientLogger.log(ClientLogger.INFO, "Disconnected, waiting for connect");
callback.disconnected(MqttsCallback.MQTTS_OK);
}
private void handleMqttsPingResp(MqttsPingResp receivedMsg) {
ClientLogger.log(ClientLogger.INFO,"PINGRESP received: " +
Utils.hexString(receivedMsg.toBytes()));
switch (this.clState){
case WAITING_ACK:
ack_rx();
break;
default:
break;
}
}
private void handleMqttsPingReq(MqttsPingReq receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "PINGREQ received: " +
Utils.hexString(receivedMsg.toBytes()));
mqtts_pingresp();
}
private void handleMqttsUnsuback(MqttsUnsuback receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "UNSUBACK received: " +
Utils.hexString(receivedMsg.toBytes()));
switch (this.clState){
case WAITING_ACK:
if(receivedMsg.getMsgId() != ((MqttsUnsubscribe)this.message).getMsgId()){
ClientLogger.log(ClientLogger.WARN, "MsgId (\""+
receivedMsg.getMsgId()+"\") of the received Mqtts UNSUBACK message does not match the MsgId (\""+
((MqttsUnsubscribe)this.message).getMsgId()+
"\") of the stored Mqtts UNSUBSCRIBE message. The message cannot be processed.");
return;
}
ack_rx();
callback.unsubackReceived();
break;
default:
break;
}
}
private void handleMqttsSuback(MqttsSuback receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "SUBACK received: " +
Utils.hexString(receivedMsg.toBytes()));
switch (this.clState){
case WAITING_ACK:
if(receivedMsg.getMsgId() != ((MqttsSubscribe)this.message).getMsgId()){
ClientLogger.log(ClientLogger.WARN, "MsgId (\""+receivedMsg.getMsgId()+"\") of the received Mqtts SUBACK message does not match the MsgId (\""+((MqttsSubscribe)this.message).getMsgId()+"\") of the stored Mqtts SUBSCRIBE message. The message cannot be processed.");
return;
}
ack_rx();
callback.subackReceived(receivedMsg.getGrantedQoS(), receivedMsg.getTopicId(),receivedMsg.getReturnCode());
break;
default:
break;
}
}
private void handleMqttsPubRel(MqttsPubRel receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "PUBREL received: "+
Utils.hexString(receivedMsg.toBytes()));
// TODO procedure for QoS 2 not yet checked
}
private void handleMqttsPubRec(MqttsPubRec receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "PUBREC received: "+
Utils.hexString(receivedMsg.toBytes()));
// TODO procedure for QoS 2 not yet checked
switch (this.clState){
case WAITING_ACK:
timer.unregister(ControlMessage.ACK);
mqtts_pubrel(receivedMsg.getMsgId());
break;
default:
break;
}
}
private void handleMqttsPubComp(MqttsPubComp receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "PUBCOMP received: "+
Utils.hexString(receivedMsg.toBytes()));
// TODO procedure for QoS 2 not yet checked
switch (this.clState){
case WAITING_ACK:
ack_rx();
callback.pubCompReceived();
break;
default:
break;
}
}
private void handleMqttsPuback(MqttsPuback receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "PUBACK received: "+
Utils.hexString(receivedMsg.toBytes()));
switch (this.clState){
case WAITING_ACK:
if(receivedMsg.getMsgId() != ((MqttsPublish)this.message).getMsgId()){
ClientLogger.log(ClientLogger.WARN, "MsgId (\""+receivedMsg.getMsgId()+"\") of the received Mqtts PUBACK message does not match the MsgId (\""+((MqttsPublish)this.message).getMsgId()+"\") of the stored Mqtts PUBLISH message. The message cannot be processed.");
return;
}
ack_rx();
callback.pubAckReceived(receivedMsg.getTopicId(), receivedMsg.getReturnCode());
break;
default:
if (receivedMsg.getReturnCode() != MqttsMessage.RETURN_CODE_ACCEPTED){
callback.pubAckReceived(receivedMsg.getTopicId(), receivedMsg.getReturnCode());
}
break;
}
}
private void handleMqttsPublish(MqttsPublish receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "PUBLISH received: "+
Utils.hexString(receivedMsg.toBytes()));
int returnCode=-1;
if (receivedMsg.getTopicIdType() == MqttsMessage.PREDIFINED_TOPIC_ID) {
if (callback instanceof MqttsCallbackPreDefinedTopicId) {
MqttsCallbackPreDefinedTopicId ecb = (MqttsCallbackPreDefinedTopicId)callback;
returnCode = ecb.publishArrivedPreDefinedTopicId(receivedMsg.isRetain(), receivedMsg.getQos(),
receivedMsg.getTopicId(),receivedMsg.getData());
} else {
ClientLogger.log(ClientLogger.ERROR, "Unexpected publish with predefined topicId received!");
returnCode = MqttsMessage.RETURN_CODE_INVALID_TOPIC_ID;
}
} else {
returnCode = callback.publishArrived(receivedMsg.isRetain(), receivedMsg.getQos(),
receivedMsg.getTopicId(),receivedMsg.getData());
}
if (receivedMsg.getQos() == 1 || returnCode == MqttsMessage.RETURN_CODE_INVALID_TOPIC_ID) {
mqtts_puback (receivedMsg.getTopicId(),receivedMsg.getMsgId(),returnCode);
}
// TODO procedure for QoS 2 not yet checked
}
private void handleMqttsRegack(MqttsRegack receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "REGACK received: "+
Utils.hexString(receivedMsg.toBytes()));
switch (this.clState){
case WAITING_ACK:
if (!(this.message instanceof MqttsRegister)) {
ClientLogger.log(ClientLogger.ERROR, "Unexpected message received: " + this.message.getMsgType());
break;
}
if(receivedMsg.getMsgId() != ((MqttsRegister)this.message).getMsgId()){
ClientLogger.log(ClientLogger.WARN, "MsgId (\""+receivedMsg.getMsgId()+"\") of the received Mqtts REGACK message does not match the MsgId (\""+((MqttsRegister)this.message).getMsgId()+"\") of the stored Mqtts REGISTER message. The message cannot be processed.");
return;
}
ack_rx();
callback.regAckReceived(receivedMsg.getTopicId(), receivedMsg.getReturnCode());
break;
default:
break;
}
}
private void handleMqttsRegister(MqttsRegister receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "REGISTER received: " +
Utils.hexString(receivedMsg.toBytes()));
mqtts_regack(receivedMsg.getTopicId(), receivedMsg.getMsgId(), MqttsMessage.RETURN_CODE_ACCEPTED);
callback.registerReceived(receivedMsg.getTopicId(), receivedMsg.getTopicName());
}
private void handleMqttsWillMsgReq(MqttsWillMsgReq receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "WILLMSGREQ received "+
Utils.hexString(receivedMsg.toBytes()));
timer.unregister(ControlMessage.ACK);
mqtts_willmsg();
}
private void handleMqttsWillTopicReq(MqttsWillTopicReq receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "WILLTOPCREQ received: "+
Utils.hexString(receivedMsg.toBytes()));
timer.unregister(ControlMessage.ACK);
mqtts_willtopic();
}
private void handleMqttsConnack(MqttsConnack receivedMsg) {
ClientLogger.log(ClientLogger.INFO, "CONNACK received: " +
Utils.hexString(receivedMsg.toBytes()));
switch (this.clState){
case CONNECTING_TO_GW:
if (receivedMsg.getReturnCode() == MqttsMessage.RETURN_CODE_ACCEPTED) {
timer.unregister(ControlMessage.ACK);
clState= ClientState.READY;
callback.connected();
lostGw = false;
} else {
clState = ClientState.WAITING_CONNECT;
timer.unregister(ControlMessage.ACK);
timer.unregister(ControlMessage.KEEP_ALIVE);
callback.disconnected(receivedMsg.getReturnCode());
}
break;
default:
ClientLogger.log(ClientLogger.WARN,"CONNACK received in state "+ this.clState);
break;
}
}
private int getNewMsgId(){
return msgId ++;
}
/**
send a PUBACK message
*/
private void mqtts_puback(int topicId, int msgId, int returnCode) {
//construct a Mqtts PUBACK message
MqttsPuback puback = new MqttsPuback();
puback.setTopicId(topicId);
puback.setMsgId(msgId);
puback.setReturnCode(returnCode);
// re-start keep alive timer and send the message
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
//send the Mqtts PUBACK message to the gateway
ClientLogger.log(ClientLogger.INFO, "Sending PUBACK message with \"TopicId\" = \"" +topicId+"\" to the gateway:"+ Utils.hexString(puback.toBytes()));
udpInterface.sendMsg(puback);
}
/**
send a REGACK message
*/
private void mqtts_regack(int topicId, int msgId, int returnCode) {
//construct a Mqtts REGACK message
MqttsRegack regack = new MqttsRegack();
regack.setTopicId(topicId);
regack.setMsgId(msgId);
regack.setReturnCode(returnCode);
// re-start keep alive timer and send the message*/
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
//send the Mqtts REGACK message to the gateway
ClientLogger.log(ClientLogger.INFO, "Sending REGACK message with \"TopicId\" = \"" +topicId+"\" to the gateway :"+ Utils.hexString(regack.toBytes()));
udpInterface.sendMsg(regack);
}
/**
send a PUBREL message
*/
private void mqtts_pubrel(int msgId) {
//construct a Mqtts PUBREL message
MqttsPubRel msg = new MqttsPubRel();
msg.setMsgId(msgId);
//send the Mqtts PUBREL message
ClientLogger.log(ClientLogger.INFO, "Sending PUBREL message to the gateway: "+ Utils.hexString(msg.toBytes()));
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
clState= ClientState.WAITING_ACK;
/* we are waiting for PUBCOMP */
/* TODO What happens if we do not rx a PUBCOMP ? Retransmit PUBREL? */
/* backup the message for the ack*/
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
udpInterface.sendMsg(msg);
}
/**
send an PINGREQ message
*/
private void mqtts_pingreq() {
//construct a Mqtts PINGREQ message
MqttsPingReq msg = new MqttsPingReq();
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
//send the Mqtts PINGREQ message to the client
ClientLogger.log(ClientLogger.INFO, "Sending Mqtts PINGREQ message to the gateway: "+ Utils.hexString(msg.toBytes()));
clState= ClientState.WAITING_ACK;
udpInterface.sendMsg(msg);
}
/**
send an PINGRESP message
*/
private void mqtts_pingresp() {
//construct a Mqtts PINGREQ message
MqttsPingResp msg = new MqttsPingResp();
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
//send the Mqtts PINGRESP message to the client
ClientLogger.log(ClientLogger.INFO, "Sending PINGRESP to the gateway: "+ Utils.hexString(msg.toBytes()));;
udpInterface.sendMsg(msg);
}
/**
send an WILLMSG message
*/
private void mqtts_willmsg() {
//construct a Mqtts WILLMSG message
MqttsWillMsg msg = new MqttsWillMsg();
msg.setWillMsg(this.willmsg);
/* backup the message for the ack*/
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* star timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
ClientLogger.log(ClientLogger.INFO, "Sending WILLMSG to the gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
}
/**
send an WILLTOPIC message
*/
private void mqtts_willtopic() {
MqttsWillTopic msg = new MqttsWillTopic();
msg.setQos(this.willQoS);
msg.setRetain(this.willretain);
msg.setWillTopic(this.willtopic);
/* backup the message for the ack*/
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* start timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
ClientLogger.log(ClientLogger.INFO, "Sending WILLTOPIC to the gateway: "+ Utils.hexString(msg.toBytes()));
udpInterface.sendMsg(msg);
}
/**
send CONNECT message
*/
private void mqtts_connecting(String clientid , boolean will, boolean cleanstart, int keepalive) {
MqttsConnect msg = new MqttsConnect();
msg.setCleanSession(cleanstart);
msg.setClientId(clientid);
msg.setDuration(keepalive);
msg.setWill(will);
/* set the value of the keep_alive timer */
clientParms.setKeepAlivePeriod(keepalive);
/* backup the message for the ack */
mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
/* start ack and keep_alive timers */
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
udpInterface.sendMsg(msg);
ClientLogger.log(ClientLogger.INFO, "CONNECT sent to gateway: "+ Utils.hexString(msg.toBytes()));
callback.connectSent();
return;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while (running) {
readMsg();
}
}
private void readMsg() {
//read the next available Message from queue
Message msg = null;
try {
msg = (Message)queue.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (msg == null) {
return;
}
//get the type of the message that "internal" message carries
int type = msg.getType();
switch(type){
case Message.MQTTS_MSG:
ClientLogger.log(ClientLogger.INFO, "Processing an mqtts message ...");
handleMqttsMessage(msg.getMqttsMessage());
break;
case Message.CONTROL_MSG:
ClientLogger.log(ClientLogger.INFO, "Processing a control message ...");
handleControlMessage(msg.getControlMessage());
break;
default:
ClientLogger.log(ClientLogger.WARN,"Internal message of unknown type \"" + msg.getType()+"\" received.");
break;
}
}
private void handleControlMessage(ControlMessage controlMessage) {
switch (controlMessage.getMsgType()) {
case ControlMessage.ACK:
ClientLogger.log(ClientLogger.INFO, "ACK timeout");
ackMissedCounter++;
if (ackMissedCounter > clientParms.getMaxRetries()) {
//We log a warnning and inform app
ClientLogger.log(ClientLogger.WARN, "Too many ACKs missed, lost gw ...");
callback.disconnected(MqttsCallback.MQTTS_LOST_GATEWAY);
if (autoReconnect) {
mqtts_backup(MQTTS_BACKUP_SEND_MESSAGE, null);
ackMissedCounter= 0;
ClientLogger.log(ClientLogger.WARN, "will try re-connecting ...");
} else {
timer.unregister(ControlMessage.ACK);
timer.unregister(ControlMessage.KEEP_ALIVE);
clState = ClientState.WAITING_CONNECT;
ClientLogger.log(ClientLogger.WARN, "Waiting for new connect from application ...");
}
} else mqtts_backup(MQTTS_BACKUP_SEND_MESSAGE, null);
break;
case ControlMessage.KEEP_ALIVE:
ClientLogger.log(ClientLogger.INFO, "Keep Alive timeout, send PINGREQ");
mqtts_pingreq();
break;
default:
break;
}
}
private void mqtts_backup(int action, MqttsMessage msg) {
switch (action) {
case MQTTS_BACKUP_MESSAGE: /* back up message */
this.message = msg;
ClientLogger.log(ClientLogger.INFO, "Message backup for retransmission");
break;
case MQTTS_BACKUP_SEND_MESSAGE: /* resend message stored in backup */
timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
ClientLogger.log(ClientLogger.INFO, "Backup message resent:"+ Utils.hexString(this.message.toBytes()));
udpInterface.sendMsg(this.message);
break;
default:
break;
}
return;
}
/********************************************************************************/
private enum ClientState {
NOT_ACTIVE,
WAITING_CONNECT,
CONNECTING_TO_GW,
READY,
WAITING_ACK,
SEARCHING_GW,
DISCONNECTING;
}
}