blob: 83b1703a64e3f1c5833ac04524512a5a3956edb0 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
package org.eclipse.paho.mqttsn.gateway.core;
import java.util.StringTokenizer;
import java.util.Vector;
import org.eclipse.paho.mqttsn.gateway.broker.tcp.TCPBrokerInterface;
import org.eclipse.paho.mqttsn.gateway.client.ClientInterface;
import org.eclipse.paho.mqttsn.gateway.exceptions.MqttsException;
import org.eclipse.paho.mqttsn.gateway.messages.Message;
import org.eclipse.paho.mqttsn.gateway.messages.control.ControlMessage;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttConnack;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttConnect;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttDisconnect;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttMessage;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPingReq;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPingResp;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPubComp;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPubRec;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPubRel;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPuback;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttPublish;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttSuback;
import org.eclipse.paho.mqttsn.gateway.messages.mqtt.MqttUnsuback;
import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsAdvertise;
import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsGWInfo;
import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsMessage;
import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsPublish;
import org.eclipse.paho.mqttsn.gateway.messages.mqtts.MqttsSearchGW;
import org.eclipse.paho.mqttsn.gateway.timer.TimerService;
import org.eclipse.paho.mqttsn.gateway.utils.GWParameters;
import org.eclipse.paho.mqttsn.gateway.utils.GatewayAddress;
import org.eclipse.paho.mqttsn.gateway.utils.GatewayLogger;
import org.eclipse.paho.mqttsn.gateway.utils.Utils;
public class GatewayMsgHandler extends MsgHandler{
private GatewayAddress gatewayAddress = null;
private TCPBrokerInterface brokerInterface = null;
private TimerService timer = null;
private Dispatcher dispatcher;
private long advPeriodCounter = 0;
private long checkingCounter = 0;
private TopicMappingTable topicIdMappingTable;
private String clientId;
private boolean connected;
private Vector<ClientInterface> clientInterfacesVector;
/**
*
*/
public GatewayMsgHandler(GatewayAddress addr) {
this.gatewayAddress = addr;
}
public void initialize(){
brokerInterface = new TCPBrokerInterface(this.gatewayAddress);
timer = TimerService.getInstance();
dispatcher = Dispatcher.getInstance();
topicIdMappingTable = new TopicMappingTable();
topicIdMappingTable.initialize();
clientId = "Gateway_" + GWParameters.getGwId();
GatewayLogger.info("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Establishing TCP/IP connection with "+
GWParameters.getBrokerURL());
//open a new TCP/IP connection with the broker
try {
brokerInterface.initialize();
} catch (MqttsException e) {
e.printStackTrace();
GatewayLogger.error("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Failed to establish TCP/IP connection with " +
GWParameters.getBrokerURL()+ ". Gateway cannot start.");
System.exit(1);
}
GatewayLogger.info("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - TCP/IP connection established.");
}
/******************************************************************************************/
/** HANDLING OF MQTTS MESSAGES **/
/****************************************************************************************/
public void handleMqttsMessage(MqttsMessage receivedMsg){
//get the type of the Mqtts message and handle the message according to that type
switch(receivedMsg.getMsgType()){
case MqttsMessage.ADVERTISE:
handleMqttsAdvertise((MqttsAdvertise) receivedMsg);
break;
case MqttsMessage.SEARCHGW:
handleMqttsSearchGW((MqttsSearchGW) receivedMsg);
break;
case MqttsMessage.GWINFO:
handleMqttsGWInfo((MqttsGWInfo) receivedMsg);
break;
case MqttsMessage.CONNECT:
//we will never receive such a message
break;
case MqttsMessage.CONNACK:
//we will never receive such a message
break;
case MqttsMessage.WILLTOPICREQ:
//we will never receive such a message
break;
case MqttsMessage.WILLTOPIC:
//we will never receive such a message
break;
case MqttsMessage.WILLMSGREQ:
//we will never receive such a message
break;
case MqttsMessage.WILLMSG:
//we will never receive such a message
break;
case MqttsMessage.REGISTER:
//we will never receive such a message
break;
case MqttsMessage.REGACK:
//we will never receive such a message
break;
case MqttsMessage.PUBLISH:
handleMqttsPublish((MqttsPublish) receivedMsg);
break;
case MqttsMessage.PUBACK:
//we will never receive such a message
break;
case MqttsMessage.PUBCOMP:
//we will never receive such a message
break;
case MqttsMessage.PUBREC:
//we will never receive such a message
break;
case MqttsMessage.PUBREL:
//we will never receive such a message
break;
case MqttsMessage.SUBSCRIBE:
//we will never receive such a message
break;
case MqttsMessage.SUBACK:
//we will never receive such a message
break;
case MqttsMessage.UNSUBSCRIBE:
//we will never receive such a message
break;
case MqttsMessage.UNSUBACK:
//we will never receive such a message
break;
case MqttsMessage.PINGREQ:
//we will never receive such a message
break;
case MqttsMessage.PINGRESP:
//we will never receive such a message
break;
case MqttsMessage.DISCONNECT:
//we will never receive such a message
break;
case MqttsMessage.WILLTOPICUPD:
//we will never receive such a message
break;
case MqttsMessage.WILLTOPICRESP:
//we will never receive such a message
break;
case MqttsMessage.WILLMSGUPD:
//we will never receive such a message
break;
case MqttsMessage.WILLMSGRESP:
//we will never receive such a message
break;
default:
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtts message of unknown type \"" + receivedMsg.getMsgType()+"\" received.");
break;
}
}
/**
* @param receivedMsg
*/
private void handleMqttsAdvertise(MqttsAdvertise receivedMsg) {
// TODO implement this method for load balancing issues
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtts ADVERTISE message received.");
}
/**
* @param receivedMsg
*/
private void handleMqttsSearchGW(MqttsSearchGW receivedMsg) {
// GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtts SEARCHGW message with \"Radius\" = \""+receivedMsg.getRadius()+"\" received.");
//construct a Mqtts GWINFO message for the reply to the received Mqtts SEARCHGW message
MqttsGWInfo msg = new MqttsGWInfo();
msg.setGwId(GWParameters.getGwId());
//get the broadcast radius
byte radius = (byte)receivedMsg.getRadius();
//broadcast the Mqtts GWINFO message to the network
// GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Broadcasting Mqtts GWINFO message to the network with broadcast radius \""+radius+"\".");
Vector<?> interfaces = GWParameters.getClientInterfaces();
for(int i = 0; i < interfaces.size(); i++){
ClientInterface inter = (ClientInterface) interfaces.get(i);
inter.broadcastMsg(radius, msg);
}
}
/**
* @param receivedMsg
*/
private void handleMqttsGWInfo(MqttsGWInfo receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtts GWINFO message received.");
// TODO implement this method for load balancing issues
}
/**
* @param receivedMsg
*/
private void handleMqttsPublish(MqttsPublish receivedMsg) {
if(receivedMsg.getTopicIdType() == MqttsMessage.NORMAL_TOPIC_ID)
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Mqtts PUBLISH message with \"QoS\" = \""+receivedMsg.getQos()+"\" and \"TopicId\" = \""+receivedMsg.getTopicId()+"\" received.");
else if (receivedMsg.getTopicIdType() == MqttsMessage.PREDIFINED_TOPIC_ID)
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Mqtts PUBLISH message with \"QoS\" = \""+receivedMsg.getQos()+"\" and \"TopicId\" = \""+receivedMsg.getTopicId()+"\" (predefined topic Id) received.");
else
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Mqtts PUBLISH message with \"QoS\" = \""+receivedMsg.getQos()+"\" and \"TopicId\" = \""+receivedMsg.getShortTopicName()+"\" (short topic name) received.");
//construct a Mqtt PUBLISH message
MqttPublish publish = new MqttPublish();
//check the TopicIdType in the received Mqtts PUBLISH message
switch(receivedMsg.getTopicIdType()){
//if the TopicIdType is a normal TopicId
case MqttsMessage.NORMAL_TOPIC_ID:
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Topic Id type "+ receivedMsg.getTopicIdType()+" is invalid. Publish with \"QoS\" = \"-1\" supports only predefined topis Ids (topic Id type = \"1\") or short topic names (topic Id type = \"2\").");
return;
//if the TopicIdType is a shortTopicName then simply copy it to the topicName field of the Mqtt PUBLISH message
case MqttsMessage.SHORT_TOPIC_NAME:
publish.setTopicName(receivedMsg.getShortTopicName());
break;
//if the TopicIdType is a predifinedTopiId
case MqttsMessage.PREDIFINED_TOPIC_ID:
if(receivedMsg.getTopicId() > GWParameters.getPredfTopicIdSize()){
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Predefined topicId (\"" + receivedMsg.getTopicId() + "\") of the received Mqtts PUBLISH message is out of the range of predefined topic Ids [1,"+GWParameters.getPredfTopicIdSize()+"]. The message cannot be processed.");
return;
}
//get the predefined topic name that corresponds to the received predefined topicId
String topicName = topicIdMappingTable.getTopicName(receivedMsg.getTopicId());
//this should not happen as predefined topic ids are already stored
if(topicName == null){
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Predefined topicId (\"" + receivedMsg.getTopicId() + "\") of the received Mqtts PUBLISH message does not exist. The message cannot be processed.");
return;
}
publish.setTopicName(topicName);
break;
default:
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Unknown topicIdType (\"" + receivedMsg.getTopicIdType()+"\"). The received Mqtts PUBLISH message cannot be processed.");
return;
}
//populate the Mqtt PUBLISH message
publish.setDup(false);
//set QoS = 0
publish.setQos(0);
publish.setRetain(false);
//there is no msg id in QoS = 0 publish messages
publish.setPayload(receivedMsg.getData());
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Sending Mqtt PUBLISH message with \"QoS\" = \""+publish.getQos()+"\" and \"TopicName\" = \""+publish.getTopicName()+ "\" to the broker.");
//send the Mqtt PUBLISH message to the broker
try {
brokerInterface.sendMsg(publish);
} catch (MqttsException e) {
e.printStackTrace();
GatewayLogger.log(GatewayLogger.ERROR, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed sending Mqtt PUBLISH message to the broker.");
connectionLost();
}
}
/******************************************************************************************/
/** HANDLING OF MQTT MESSAGES FROM THE BROKER **/
/****************************************************************************************/
/**
* @param receivedMsg
*/
public void handleMqttMessage(MqttMessage receivedMsg){
//get the type of the Mqtt message and handle the message according to that type
switch(receivedMsg.getMsgType()){
case MqttMessage.CONNECT:
//we will never receive such a message from the broker
break;
case MqttMessage.CONNACK:
handleMqttConnack((MqttConnack) receivedMsg);
break;
case MqttMessage.PUBLISH:
handleMqttPublish((MqttPublish) receivedMsg);
break;
case MqttMessage.PUBACK:
handleMqttPuback((MqttPuback)receivedMsg);
break;
case MqttMessage.PUBREC:
handleMqttPubRec((MqttPubRec) receivedMsg);
break;
case MqttMessage.PUBREL:
handleMqttPubRel((MqttPubRel)receivedMsg);
break;
case MqttMessage.PUBCOMP:
handleMqttPubComp((MqttPubComp) receivedMsg);
break;
case MqttMessage.SUBSCRIBE:
//we will never receive such a message from the broker
break;
case MqttMessage.SUBACK:
handleMqttSuback((MqttSuback) receivedMsg);
break;
case MqttMessage.UNSUBSCRIBE:
//we will never receive such a message from the broker
break;
case MqttMessage.UNSUBACK:
handleMqttUnsuback((MqttUnsuback) receivedMsg);
break;
case MqttMessage.PINGREQ:
handleMqttPingReq((MqttPingReq) receivedMsg);
break;
case MqttMessage.PINGRESP:
handleMqttPingResp((MqttPingResp) receivedMsg);
break;
case MqttMessage.DISCONNECT:
//we will never receive such a message from the broker
break;
default:
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt message of unknown type \"" + receivedMsg.getMsgType()+"\" received.");
break;
}
}
/**
* @param receivedMsg
*/
private void handleMqttConnack(MqttConnack receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Mqtt CONNACK message received.");
//if the return code of the Mqtt CONNACK message is not "Connection Accepted"
if (receivedMsg.getReturnCode() != MqttMessage.RETURN_CODE_CONNECTION_ACCEPTED){
GatewayLogger.log(GatewayLogger.ERROR, "GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Return Code of Mqtt CONNACK message it is not \"Connection Accepted\".");
GatewayLogger.log(GatewayLogger.ERROR, "GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Mqtt connection with the broker cannot be established. Gateway cannot start.");
System.exit(1);
}
//the connection was accepted by the broker
GatewayLogger.info("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Mqtt connection established.");
this.connected = true;
//initialize all available client interfaces for communication with clients
GatewayLogger.info("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Initializing all available Client interfaces...");
clientInterfacesVector = new Vector<ClientInterface>();
StringTokenizer st = new StringTokenizer(GWParameters.getClientIntString(),",");
boolean init = false;
while (st.hasMoreTokens()) {
String token = st.nextToken();
String clInte = token.substring(1, token.length()-1);
ClientInterface inter = null;
try {
Class<?> cl = Class.forName(clInte);
inter = (ClientInterface)cl.newInstance();
inter.initialize();
GatewayLogger.info("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - "+inter.getClass().getName()+ " initialized.");
clientInterfacesVector.add(inter);
init = true;
}catch (IllegalAccessException e) {
GatewayLogger.warn("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed to instantiate "+clInte+".");
e.printStackTrace();
}catch (InstantiationException e) {
GatewayLogger.warn("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed to instantiate "+clInte+".");
e.printStackTrace();
}catch (ClassNotFoundException e) {
GatewayLogger.warn("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed to instantiate "+clInte+".");
e.printStackTrace();
}catch (MqttsException e) {
GatewayLogger.warn("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed to initialize "+ inter.getClass().getName());
e.printStackTrace();
}
}
if(!init){
GatewayLogger.error("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed to initialize at least one Client interface.Gateway cannot start.");
System.exit(1);
}
GWParameters.setClientInterfacesVector(clientInterfacesVector);
//broadcast the Mqtts ADVERTISE message to the clients (whole network)
GatewayLogger.info("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Broadcasting initial Mqtts ADVERTISE message...");
// sendMqttsAdvertise();
GatewayLogger.info("-------- Mqtts Gateway started --------");
//Send a initial Mqtts PINGREQ message to the broker
sendMqttPingReq();
//set a keep alive timer for sending subsequent Mqtt PINGREQ messages to the broker
timer.register(gatewayAddress, ControlMessage.SEND_KEEP_ALIVE_MSG, GWParameters.getKeepAlivePeriod());
}
/**
* @param receivedMsg
*/
private void handleMqttPingResp(MqttPingResp receivedMsg) {
// GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PINGRESP message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttPingReq(MqttPingReq receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PINGREQ message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttUnsuback(MqttUnsuback receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt UNSUBACK message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttSuback(MqttSuback receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt SUBACK message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttPubComp(MqttPubComp receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PUBCOMP message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttPubRel(MqttPubRel receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PUBREL message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttPubRec(MqttPubRec receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PUBREC message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttPuback(MqttPuback receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PUBACK message received.");
// TODO Auto-generated method stub
}
/**
* @param receivedMsg
*/
private void handleMqttPublish(MqttPublish receivedMsg) {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtt PUBLISH message received.");
// TODO Auto-generated method stub
}
/******************************************************************************************/
/** HANDLING OF CONTROL MESSAGES AND TIMEOUTS **/
/****************************************************************************************/
/**
* @param receivedMsg
*/
public void handleControlMessage(ControlMessage receivedMsg){
//get the type of the Control message and handle the message according to that type
switch(receivedMsg.getMsgType()){
case ControlMessage.CONNECTION_LOST:
connectionLost();
break;
case ControlMessage.WAITING_WILLTOPIC_TIMEOUT:
//we will never receive such a message
break;
case ControlMessage.WAITING_WILLMSG_TIMEOUT:
//we will never receive such a message
break;
case ControlMessage.WAITING_REGACK_TIMEOUT:
//we will never receive such a message
break;
case ControlMessage.CHECK_INACTIVITY:
//ignore it
break;
case ControlMessage.SEND_KEEP_ALIVE_MSG:
handleControlKeepAlive();
break;
case ControlMessage.SHUT_DOWN:
shutDown();
break;
default:
GatewayLogger.log(GatewayLogger.WARN, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Control message of unknown type \"" + receivedMsg.getMsgType()+"\" received.");
break;
}
}
/**
*
*/
private void connectionLost(){
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Control CONNECTION_LOST message received.");
GatewayLogger.error("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - TCP/IP connection with the broker was lost.");
if (this.connected){
//close the connection with the broker (if any)
brokerInterface.disconnect();
this.connected = false;
//generate a control message
ControlMessage controlMsg = new ControlMessage();
controlMsg.setMsgType(ControlMessage.SHUT_DOWN);
//construct an "internal" message and put it to dispatcher's queue
//@see org.eclipse.paho.mqttsn.gateway.core.Message
Message msg = new Message(null);
msg.setType(Message.CONTROL_MSG);
msg.setControlMessage(controlMsg);
this.dispatcher.putMessage(msg);
}else{
GatewayLogger.error("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed to establish Mqtt connection with the broker.Gateway cannot start.");
System.exit(1);
}
}
/**
*
*/
private void handleControlKeepAlive() {
// GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Control SEND_KEEP_ALIVE_MSG message received.");
//send a Mqtts PINGREQ to the broker
sendMqttPingReq();
//update the advertising period counter
advPeriodCounter = advPeriodCounter + GWParameters.getKeepAlivePeriod();
if (advPeriodCounter >= GWParameters.getAdvPeriod()){
//broadcast the Mqtts ADVERTISE message to the network
// sendMqttsAdvertise();
advPeriodCounter = 0;
}
//update the clean up period counter
checkingCounter = checkingCounter + GWParameters.getKeepAlivePeriod();
if(checkingCounter >= GWParameters.getCkeckingPeriod ()){
//send a check timeout message to all ClientMsgHandlers
sendCheckInactivity();
checkingCounter = 0;
}
}
/**
*
*/
private void shutDown() {
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Control SHUT_DOWN message received.");
//stop the reading thread of the BrokerInterface (if any)
//(this does not have any effect to the input and output streams which remain active)
brokerInterface.setRunning(false);
//construct a Mqtt DISCONNECT message
MqttDisconnect mqttDisconnect = new MqttDisconnect();
//send the Mqtt DISCONNECT message to the broker
//(don't bother if the sending of Mqtt DISCONNECT message to the broker was successful or not)
GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Sending Mqtt DISCONNECT message to the broker.");
try {
brokerInterface.sendMsg(mqttDisconnect);
} catch (MqttsException e) {
// do nothing
}
//close the connection with the broker
brokerInterface.disconnect();
}
/******************************************************************************************/
/** OTHER METHODS **/
/****************************************************************************************/
/**
* @throws MqttsException
*
*/
public void connect() {
//construct a new Mqtt CONNECT message
MqttConnect mqttConnect = new MqttConnect();
mqttConnect.setProtocolName(GWParameters.getProtocolName());
mqttConnect.setProtocolVersion (GWParameters.getProtocolVersion());
mqttConnect.setWillRetain (GWParameters.isRetain());
mqttConnect.setWillQoS (GWParameters.getWillQoS());
mqttConnect.setWill (GWParameters.isWillFlag());
mqttConnect.setCleanStart (GWParameters.isCleanSession());
mqttConnect.setKeepAlive(GWParameters.getKeepAlivePeriod());
mqttConnect.setClientId (clientId);
mqttConnect.setWillTopic (GWParameters.getWillTopic());
mqttConnect.setWillMessage (GWParameters.getWillMessage());
// GatewayLogger.info("** will= " + mqttConnect.isWill() +
// " willTopic= " + mqttConnect.getWillTopic() +
// ", willMessage= " + mqttConnect.getWillMessage());
GatewayLogger.info("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Establishing MQTT connection with the broker...");
// GatewayLogger.log(GatewayLogger.INFO, "GatewayMsgHandler ["+Utils.hexString(this.gatewayAddress.getAddress())+"]/["+clientId+"] - Sending Mqtt CONNECT message to the broker.");
//send the Mqtt CONNECT message to the broker
try {
brokerInterface.sendMsg(mqttConnect);
} catch (MqttsException e) {
e.printStackTrace();
GatewayLogger.error("GatewayMsgHandler ["+
Utils.hexString(GWParameters.getGatewayAddress().getAddress())+
"]/["+clientId+"] - Failed to establish Mqtt connection with the broker. Gateway cannot start.");
System.exit(1);
}
}
/**
*
*/
private void sendMqttPingReq() {
//construct a Mqtt PINGREQ message
MqttPingReq pingreq = new MqttPingReq();
// GatewayLogger.info("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Sending Mqtt PINGREQ message to the broker.");
//send the Mqtt PINGREQ message to the broker
try {
brokerInterface.sendMsg(pingreq);
} catch (MqttsException e) {
e.printStackTrace();
GatewayLogger.error("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Failed sending Mqtts PINGREQ message to the broker.");
connectionLost();
}
}
/**
*
*/
// private void sendMqttsAdvertise() {
// MqttsAdvertise adv = new MqttsAdvertise();
// adv.setGwId(GWParameters.getGwId());
// adv.setDuration(GWParameters.getAdvPeriod());
//
// //broadcast the message to all available interfaces
// Vector interfaces = GWParameters.getClientInterfaces();
// for(int i = 0; i < interfaces.size(); i++){
// ClientInterface inter = (ClientInterface) interfaces.get(i);
// inter.broadcastMsg(adv);
// }
//
// GatewayLogger.info("GatewayMsgHandler ["+Utils.hexString(GWParameters.getGatewayAddress().getAddress())+"]/["+clientId+"] - Mqtts ADVERTISE message was broadcasted to the network.");
// }
/**
*
*/
private void sendCheckInactivity() {
//generate a control message
ControlMessage controlMsg = new ControlMessage();
controlMsg.setMsgType(ControlMessage.CHECK_INACTIVITY);
//generate an "internal" message
//@see org.eclipse.paho.mqttsn.gateway.core.Message
//addressed to all ClientMsgHandlers
Message msg = new Message(null);
msg.setType(Message.CONTROL_MSG);
msg.setControlMessage(controlMsg);
this.dispatcher.putMessage(msg);
}
/**
* @param topicName
*/
// private void sendMqttSubscribe(String topicName){
// }
}