blob: 6041eac853388844c57557bd601e53b328be1a21 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2013, 2014 ACIN
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Martin Melik Merkumians - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTHandler.h"
#include "../../core/cominfra/commfb.h"
#include <criticalregion.h>
#include <string>
#include "MQTTClientConfigParser.h"
std::string gMqttClientConfigFile;
DEFINE_HANDLER(MQTTHandler);
MQTTAsync_connectOptions MQTTHandler::smClientConnectionOptions =
MQTTAsync_connectOptions_initializer;
CSyncObject MQTTHandler::smMQTTMutex = CSyncObject();
MQTTAsync MQTTHandler::smClient = 0;
CIEC_STRING MQTTHandler::smClientId;
CIEC_STRING MQTTHandler::smAddress;
std::string MQTTHandler::scmUsername;
std::string MQTTHandler::scmPassword;
MQTTStates MQTTHandler::smMQTTS_STATE = NOT_CONNECTED;
forte::arch::CSemaphore MQTTHandler::mStateSemaphore = forte::arch::CSemaphore();
bool MQTTHandler::mIsSemaphoreEmpty = true;
MQTTHandler::MQTTHandler(CDeviceExecution& paDeviceExecution) : CExternalEventHandler(paDeviceExecution) {
if(!isAlive()){
start();
}
}
MQTTHandler::~MQTTHandler(){
CCriticalRegion sectionState(smMQTTMutex);
if(isAlive()){
setAlive(false);
resumeSelfSuspend();
}
if(0 != smClient){
MQTTAsync_disconnectOptions disconnectOptions = MQTTAsync_disconnectOptions_initializer;
disconnectOptions.timeout = 10000;
MQTTAsync_disconnect(smClient, &disconnectOptions);
MQTTAsync_destroy(&smClient);
}
}
/*
* START OF CALLBACKS
*/
/** Callback for handling message reception.
*
* For convenience and performance it would be great to have the paContext param set subscribing topic.
* However Paho only allows one callback per client. Therefore we have to search for the layers attached to this topic.
* For details see discussion in Bug 545111.
*
*/
int MQTTHandler::onMqttMessageArrived(void* paContext, char* paTopicName, int, MQTTAsync_message* paMessage){
//TODO: Check if handler allowed
if(0 != paContext){
CCriticalRegion section(smMQTTMutex);
MQTTHandler *handler = static_cast<MQTTHandler *>(paContext);
void* pPayLoad = paMessage->payload;
unsigned int payLoadSize = static_cast<unsigned int>(paMessage->payloadlen);
for(CSinglyLinkedList<MQTTComLayer*>::Iterator it = handler->mlayers.begin(); it != handler->mlayers.end(); ++it){
if(0 == strcmp((*it)->getTopicName(), paTopicName)){
if(forte::com_infra::e_Nothing != (*it)->recvData(pPayLoad, payLoadSize)){
handler->startNewEventChain((*it)->getCommFB());
}
}
}
//End critical section
}
MQTTAsync_freeMessage(&paMessage);
MQTTAsync_free(paTopicName);
return 1;
}
void MQTTHandler::onMqttConnectionLost(void* paContext, char* paCause){
DEVLOG_ERROR("MQTT: Disconnected from broker. Cause: %s\n", paCause);
if(0 != paContext){
CCriticalRegion section(smMQTTMutex);
MQTTHandler *handler = static_cast<MQTTHandler *>(paContext);
smMQTTS_STATE = NOT_CONNECTED;
handler->mToResubscribe.clearAll();
for(CSinglyLinkedList<MQTTComLayer*>::Iterator it = handler->mlayers.begin(); it != handler->mlayers.end(); ++it){
if(e_Subscriber == (*it)->getCommFB()->getComServiceType()){
handler->mToResubscribe.pushBack((*it));
}
}
handler->resumeSelfSuspend();
}
}
void MQTTHandler::onMqttConnectionSucceed(void *paContext, MQTTAsync_successData *){
DEVLOG_INFO("MQTT: successfully connected\n");
{
MQTTHandler *handler = static_cast<MQTTHandler *>(paContext);
CCriticalRegion sectionState(smMQTTMutex);
smMQTTS_STATE = SUBSCRIBING;
handler->resumeSelfSuspend();
}
}
void MQTTHandler::onMqttConnectionFailed(void *paContext, MQTTAsync_failureData *){
DEVLOG_ERROR("MQTT connection failed.\n");
{
MQTTHandler *handler = static_cast<MQTTHandler *>(paContext);
CCriticalRegion sectionState(smMQTTMutex);
smMQTTS_STATE = NOT_CONNECTED;
handler->resumeSelfSuspend();
}
}
void MQTTHandler::onSubscribeSucceed(void* paContext, MQTTAsync_successData* ){
if(0 != paContext){
CCriticalRegion sectionState(smMQTTMutex);
MQTTComLayer* layer = static_cast<MQTTComLayer*>(paContext);
MQTTHandler& handler = ::getExtEvHandler<MQTTHandler>(*layer->getCommFB());
DEVLOG_INFO("MQTT: Subscription succeed. Topic: -%s-\n", layer->getTopicName());
handler.mToResubscribe.erase(layer);
if(handler.mToResubscribe.isEmpty()){
smMQTTS_STATE = ALL_SUBSCRIBED;
}
else{
handler.resumeSelfSuspend();
}
}
}
void MQTTHandler::onSubscribeFailed(void* paContext, MQTTAsync_failureData*){
if(0 != paContext){
CCriticalRegion sectionState(smMQTTMutex);
MQTTComLayer* layer = static_cast<MQTTComLayer*>(paContext);
DEVLOG_ERROR("MQTT: Subscription failed. Topic: -%s-\n", layer->getTopicName());
::getExtEvHandler<MQTTHandler>(*layer->getCommFB()).resumeSelfSuspend();
}
}
/*
* END OF CALLBACKS AND START OF HELPER FUNCTIONS
*/
int MQTTHandler::mqttConnect(){
DEVLOG_INFO("MQTT: Requesting connection to broker\n");
int rc = MQTTAsync_connect(smClient, &smClientConnectionOptions);
if(MQTTASYNC_SUCCESS != rc){
DEVLOG_ERROR("MQTT: Request to mqtt library failed\n");
}else{
DEVLOG_INFO("MQTT: Connection to broker requested\n");
}
return rc;
}
int MQTTHandler::mqttSubscribe(const MQTTComLayer* paLayer){
DEVLOG_INFO("MQTT: subscribing to topic -%s-\n", paLayer->getTopicName());
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = onSubscribeSucceed;
opts.onFailure = onSubscribeFailed;
opts.context = (void*)paLayer;
int rc = MQTTAsync_subscribe(smClient, paLayer->getTopicName(), QOS, &opts);
if(MQTTASYNC_SUCCESS != rc){ //call failed
CCriticalRegion sectionState(smMQTTMutex);
DEVLOG_INFO("MQTT: subscribe request failed with val = %d\n", rc);
resumeSelfSuspend();
}else{
DEVLOG_INFO("MQTT: subscribe Requested\n");
}
return rc;
}
int MQTTHandler::registerLayer(const char* paAddress, const char* paClientId, MQTTComLayer* paLayer){
if(smClient == 0){
smClientId = paClientId;
smAddress = paAddress;
MQTTAsync_create(&smClient, smAddress.getValue(), smClientId.getValue(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
smClientConnectionOptions.keepAliveInterval = 20;
smClientConnectionOptions.cleansession = 1;
smClientConnectionOptions.onSuccess = onMqttConnectionSucceed;
smClientConnectionOptions.onFailure = onMqttConnectionFailed;
smClientConnectionOptions.context = this;
if("" != gMqttClientConfigFile) { //file was provided
CMQTTClientConfigFileParser::MQTTConfigFromFile result = CMQTTClientConfigFileParser::MQTTConfigFromFile(scmUsername, scmPassword);
std::string endpoint = smAddress.getValue();
if(CMQTTClientConfigFileParser::loadConfig(gMqttClientConfigFile, endpoint, result)) {
smClientConnectionOptions.username = scmUsername.c_str();
smClientConnectionOptions.password = scmPassword.c_str();
} else {
return eWrongClientID;
}
}
if(MQTTASYNC_SUCCESS != MQTTAsync_setCallbacks(smClient, this, MQTTHandler::onMqttConnectionLost, onMqttMessageArrived, NULL)){
return eConnectionFailed;
}
{
CCriticalRegion sectionState(smMQTTMutex);
if(MQTTASYNC_SUCCESS != mqttConnect()){
return eConnectionFailed;
}
smMQTTS_STATE = CONNECTION_ASKED;
}
}
else if((smClientId != paClientId) || (smAddress != paAddress)){
return eWrongClientID;
}
{
CCriticalRegion section(smMQTTMutex);
mlayers.pushBack(paLayer);
if (e_Subscriber == paLayer->getCommFB()->getComServiceType()){
mToResubscribe.pushBack(paLayer);
if(ALL_SUBSCRIBED == smMQTTS_STATE){
smMQTTS_STATE = SUBSCRIBING;
this->resumeSelfSuspend();
}
}
}
return eRegisterLayerSucceeded;
}
void MQTTHandler::unregisterLayer(MQTTComLayer* paLayer){
CCriticalRegion section(smMQTTMutex);
mlayers.erase(paLayer);
}
void MQTTHandler::enableHandler(void){
if (!isAlive()){
start();
}
}
void MQTTHandler::disableHandler(void){
if (isAlive()){
setAlive(false);
resumeSelfSuspend();
end();
}
}
void MQTTHandler::setPriority(int ){
//TODO: Should also work empty
}
int MQTTHandler::getPriority(void) const{
return 0;
}
void MQTTHandler::run(){
while(isAlive()){
bool needSleep = false;
selfSuspend();
if(!isAlive()){
break;
}
{
CCriticalRegion sectionState(smMQTTMutex);
switch(smMQTTS_STATE){
case NOT_CONNECTED:
if (MQTTASYNC_SUCCESS == mqttConnect()){
smMQTTS_STATE = CONNECTION_ASKED;
}
break;
case SUBSCRIBING:{
if(!mToResubscribe.isEmpty()){
//only try subscribe one at a time. There were some problems in some cases when subscribing one after the other, because the MQTTAsync_subscribe hangs.
const MQTTComLayer *layer = *mToResubscribe.begin();
smMQTTMutex.unlock();
if(MQTTASYNC_SUCCESS != mqttSubscribe(layer)){
needSleep = true;
}
smMQTTMutex.lock();
}
else{
smMQTTS_STATE = ALL_SUBSCRIBED;
}
break;
}
default:
break;
}
}
if(needSleep){
//sleep in case a subscribe fails while connected
needSleep = false;
CThread::sleepThread(5000);
}
}
}
void MQTTHandler::resumeSelfSuspend(){
if(mIsSemaphoreEmpty){ //avoid incrementing many times
mStateSemaphore.inc();
mIsSemaphoreEmpty = false;
}
}
void MQTTHandler::selfSuspend(){
mStateSemaphore.waitIndefinitely();
{
CCriticalRegion section(smMQTTMutex);
mIsSemaphoreEmpty = true;
}
}