blob: 230ec2255d92d71bcdbb0228ad73add71e41f57d [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014-2015 IBM Corp.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
* Seth Hoenig
*******************************************************************************/
package gateway
import (
"net"
"sync"
P "git.eclipse.org/gitroot/paho.incubator/smidge.git/packets"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)
type TClient struct {
Client
mqttClient *MQTT.Client
mqttBroker string
username string
password string
}
// Do not allow the creation of an MQTT-SN client if
// a connection to the MQTT broker cannot be established
func NewTClient(ClientId, Broker string, Connection *net.UDPConn, Address *net.UDPAddr) (*TClient, error) {
INFO.Println("NewTClient, id: %s", ClientId)
t := &TClient{
Client{
sync.RWMutex{},
ClientId,
Connection,
Address,
make(map[uint16]string),
make(map[uint16]*P.PublishMessage),
},
nil,
Broker,
"",
"",
}
if err := t.connectMQTT(ClientId, Broker); err != nil {
return nil, err
}
return t, nil
}
func (t *TClient) connectMQTT(ClientId, Broker string) error {
opts := MQTT.NewClientOptions()
opts.AddBroker(Broker)
opts.SetClientID(ClientId)
if t.username != "" {
opts.SetUsername(t.username)
opts.SetPassword(t.password)
}
t.mqttClient = MQTT.NewClient(opts)
if token := t.mqttClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
INFO.Println("TClient connected to mqtt broker")
return nil
}
func (t *TClient) disconnectMQTT() {
t.mqttClient.Disconnect(100)
}
func (t *TClient) subscribeMQTT(qos byte, topic string, tIndex *topicNames) {
var handler MQTT.MessageHandler = func(client *MQTT.Client, msg MQTT.Message) {
INFO.Println("publish handler")
tid := tIndex.getId(msg.Topic())
// is topicid type always 0 coming out of tIndex ?
// todo: msgid is not always 0
pm := P.NewPublishMessage(tid, 0x00, msg.Payload(), msg.Qos(), 0x00, msg.Retained(), msg.Duplicate())
if err := t.Write(pm); err != nil {
ERROR.Println(err)
} else {
INFO.Println("incoming mqtt published to mqtt-sn")
}
}
if token := t.mqttClient.Subscribe(topic, qos, handler); token.WaitTimeout(2000) && token.Error() != nil {
ERROR.Println("Error subscribing,", token.Error())
}
INFO.Println(t.ClientId, "subscribed to", topic)
}