blob: 9020bc0d986e5e44a208f3fd038a73698690351e [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014-2015 IBM Corp.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
* Seth Hoenig
*******************************************************************************/
package gateway
import (
"bytes"
"log"
"net"
"os"
"sync"
"time"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
. "smidge/packets"
)
type AGateway struct {
mqttclient *MQTT.Client
stopsig chan os.Signal
port int
tIndex topicNames
tTree *TopicTree
clients Clients
handler MQTT.MessageHandler
}
func NewAGateway(gc *GatewayConfig, stopsig chan os.Signal) *AGateway {
MQTT.WARN = log.New(os.Stdout, "", 0)
MQTT.DEBUG = log.New(os.Stdout, "", 0)
MQTT.CRITICAL = log.New(os.Stdout, "", 0)
MQTT.ERROR = log.New(os.Stdout, "", 0)
opts := MQTT.NewClientOptions()
opts.AddBroker(gc.mqttbroker)
if gc.mqttuser != "" {
opts.SetUsername(gc.mqttuser)
}
if gc.mqttpassword != "" {
opts.SetPassword(gc.mqttpassword)
}
if gc.mqttclientid != "" {
opts.SetClientID(gc.mqttclientid)
}
if gc.mqtttimeout > 0 {
opts.SetKeepAlive(time.Duration(gc.mqtttimeout))
}
client := MQTT.NewClient(opts)
ag := &AGateway{
client,
stopsig,
gc.port,
topicNames{
sync.RWMutex{},
make(map[uint16]string),
0,
},
NewTopicTree(),
Clients{
sync.RWMutex{},
make(map[string]SNClient),
},
nil,
}
ag.handler = func(client *MQTT.Client, msg MQTT.Message) {
ag.distribute(msg)
}
return ag
}
func (ag *AGateway) Port() int {
return ag.port
}
func (ag *AGateway) Start() {
go ag.awaitStop()
INFO.Println("Aggregating Gateway is starting")
if token := ag.mqttclient.Connect(); token.Wait() && token.Error() != nil {
ERROR.Println(token.Error())
return
}
INFO.Println("Aggregating Gateway is started")
listen(ag)
}
// This does NOT WORK on Windows using Cygwin, however
// it does work using cmd.exe
func (ag *AGateway) awaitStop() {
<-ag.stopsig
INFO.Println("Aggregating Gateway is stopping")
ag.mqttclient.Disconnect(500)
time.Sleep(500) //give broker some time to process DISCONNECT
INFO.Println("Aggregating Gateway is stopped")
// TODO: cleanly close down other goroutines
os.Exit(0)
}
func (ag *AGateway) distribute(msg MQTT.Message) {
topic := msg.Topic()
INFO.Printf("AG distributing a msg for topic \"%s\"\n", topic)
// collect a list of clients to which msg should be
// published
// then publish msg to those clients (async)
if clients, e := ag.tTree.SubscribersOf(topic); e != nil {
ERROR.Println(e)
} else {
for _, client := range clients {
go ag.publish(msg, client)
}
}
}
func (ag *AGateway) publish(msg MQTT.Message, client *Client) {
INFO.Printf("publish to client \"%s\"... ", client.ClientId)
topicid := ag.tIndex.getId(msg.Topic())
// topicidtype := byte(0x00) // todo: pre-defined (1) and shortname (2)
// msgid := uint16(0x00) // todo: what should this be??
pm := NewPublishMessage(topicid, 0x00, msg.Payload(), msg.Qos(), 0x00, msg.Retained(), msg.Duplicate())
if client.Registered(topicid) {
INFO.Printf("client \"%s\" already registered to %d, publish ahoy!\n", client, topicid)
if err := client.Write(pm); err != nil {
ERROR.Println(err)
} else {
INFO.Printf("published a message to \"%s\"\n", client)
}
} else {
INFO.Printf("client \"%s\" is not registered to %d, must REGISTER first\n", client, topicid)
rm := NewRegisterMessage(topicid, 0x00, []byte(msg.Topic()))
client.AddPendingMessage(pm)
if err := client.Write(rm); err != nil {
ERROR.Printf("error writing REGISTER to \"%s\"\n", client)
} else {
INFO.Printf("sent REGISTER to \"%s\" for %d (%d bytes)\n", client, topicid, rm.Length)
}
}
}
func (ag *AGateway) OnPacket(nbytes int, buffer []byte, con *net.UDPConn, addr *net.UDPAddr) {
INFO.Printf("OnPacket! - bytes: %s\n", string(buffer[0:nbytes]))
buf := bytes.NewBuffer(buffer)
rawmsg, _ := ReadPacket(buf)
INFO.Printf("rawmsg.MessageType(): %s\n", rawmsg.MessageType())
switch msg := rawmsg.(type) {
case *AdvertiseMessage:
ag.handle_ADVERTISE(msg, addr)
case *SearchGwMessage:
ag.handle_SEARCHGW(msg, addr)
case *GwInfoMessage:
ag.handle_GWINFO(msg, addr)
case *ConnectMessage:
ag.handle_CONNECT(msg, con, addr)
case *ConnackMessage:
ag.handle_CONNACK(msg, addr)
case *WillTopicReqMessage:
ag.handle_WILLTOPICREQ(msg, addr)
case *WillTopicMessage:
ag.handle_WILLTOPIC(msg, addr)
case *WillMsgReqMessage:
ag.handle_WILLMSGREQ(msg, addr)
case *WillMsgMessage:
ag.handle_WILLMSG(msg, addr)
case *RegisterMessage:
ag.handle_REGISTER(msg, con, addr)
case *RegackMessage:
ag.handle_REGACK(msg, addr)
case *PublishMessage:
ag.handle_PUBLISH(msg, addr)
case *PubackMessage:
ag.handle_PUBACK(msg, addr)
case *PubcompMessage:
ag.handle_PUBCOMP(msg, addr)
case *PubrecMessage:
ag.handle_PUBREC(msg, addr)
case *PubrelMessage:
ag.handle_PUBREL(msg, addr)
case *SubscribeMessage:
ag.handle_SUBSCRIBE(msg, con, addr)
case *SubackMessage:
ag.handle_SUBACK(msg, addr)
case *UnsubscribeMessage:
ag.handle_UNSUBSCRIBE(msg, addr)
case *UnsubackMessage:
ag.handle_UNSUBACK(msg, addr)
case *PingreqMessage:
ag.handle_PINGREQ(msg, con, addr)
case *PingrespMessage:
ag.handle_PINGRESP(msg, addr)
case *DisconnectMessage:
ag.handle_DISCONNECT(msg, addr)
case *WillTopicUpdateMessage:
ag.handle_WILLTOPICUPD(msg, addr)
case *WillTopicRespMessage:
ag.handle_WILLTOPICRESP(msg, addr)
case *WillMsgUpdateMessage:
ag.handle_WILLMSGUPD(msg, addr)
case *WillMsgRespMessage:
ag.handle_WILLMSGRESP(msg, addr)
default:
ERROR.Printf("Unknown Message Type %T\n", msg)
}
}
func (ag *AGateway) handle_ADVERTISE(m *AdvertiseMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_SEARCHGW(m *SearchGwMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_GWINFO(m *GwInfoMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_CONNECT(m *ConnectMessage, c *net.UDPConn, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
if clientid, e := validateClientId(m.ClientId); e != nil {
ERROR.Println(e)
} else {
INFO.Printf("clientid: %s\n", clientid)
INFO.Printf("remoteaddr: %s\n", r)
INFO.Printf("will: %v\n", m.Will)
if m.Will {
// todo: do something about that
}
client := NewClient(clientid, c, r)
ag.clients.AddClient(client)
ca := NewMessage(CONNACK).(*ConnackMessage) // todo: 0 ?
ca.ReturnCode = 0
if ioerr := client.Write(ca); ioerr != nil {
ERROR.Println(ioerr)
} else {
INFO.Println("CONNACK was sent")
}
}
}
func (ag *AGateway) handle_CONNACK(m *ConnackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLTOPICREQ(m *WillTopicReqMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLTOPIC(m *WillTopicMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLMSGREQ(m *WillMsgReqMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLMSG(m *WillMsgMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_REGISTER(m *RegisterMessage, c *net.UDPConn, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
topic := string(m.TopicName)
INFO.Printf("msg id: %d\n", m.MessageId)
INFO.Printf("topic name: %s\n", topic)
var topicid uint16
if !ag.tIndex.containsTopic(topic) {
topicid = ag.tIndex.putTopic(topic)
} else {
topicid = ag.tIndex.getId(topic)
}
client := ag.clients.GetClient(r).(*Client)
client.Register(topicid, topic)
INFO.Printf("ag topicid: %d\n", topicid)
ra := NewRegackMessage(topicid, m.MessageId, 0)
INFO.Printf("ra.MsgId: %d\n", ra.MessageId)
if err := client.Write(ra); err != nil {
ERROR.Println(err)
} else {
INFO.Println("REGACK sent")
}
}
func (ag *AGateway) handle_REGACK(m *RegackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
// the gateway sends a register when there is a message
// that needs to be published, so we do that now
topicid := m.TopicId
client := ag.clients.GetClient(r).(*Client)
pm := client.FetchPendingMessage(topicid)
if pm == nil {
ERROR.Printf("no pending message for %s id %d\n", client, topicid)
} else {
if err := client.Write(pm); err != nil {
ERROR.Println(err)
} else {
INFO.Printf("published a pending message to \"%s\"\n", client)
}
}
}
func (ag *AGateway) handle_PUBLISH(m *PublishMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
INFO.Printf("m.TopicId: %d\n", m.TopicId)
INFO.Printf("m.Data: %s\n", string(m.Data))
topic := ag.tIndex.getTopic(m.TopicId)
// TODO: what should the MQTT-QoS be set as? In case of MQTTSN-QoS -1 ?
if token := ag.mqttclient.Publish(topic, m.Qos, m.Retain, m.Data); token.WaitTimeout(2000) && token.Error() != nil {
ERROR.Println("Error publishing message", token.Error())
}
INFO.Println("Message Published")
}
func (ag *AGateway) handle_PUBACK(m *PubackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_PUBCOMP(m *PubcompMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_PUBREC(m *PubrecMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_PUBREL(m *PubrelMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_SUBSCRIBE(m *SubscribeMessage, c *net.UDPConn, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
INFO.Printf("m.TopicIdType: %d\n", m.TopicIdType)
topic := string(m.TopicName)
var topicid uint16
if m.TopicIdType == 0 {
INFO.Printf("m.TopicName: %s\n", topic)
if !ContainsWildcard(topic) {
topicid = ag.tIndex.getId(topic)
if topicid == 0 {
topicid = ag.tIndex.putTopic(topic)
}
} else {
// todo: if topic contains wildcard, something about REGISTER
// at a later time, but send topic id 0x0000 for now
}
} // todo: other topic id types
client := ag.clients.GetClient(r).(*Client)
if first, err := ag.tTree.AddSubscription(client, topic); err != nil {
INFO.Println("error adding subscription: %v\n", err)
// todo: suback an error message?
} else {
if first {
INFO.Println("first subscriber of subscription, subscribbing via MQTT")
if token := ag.mqttclient.Subscribe(topic, 2, ag.handler); token.WaitTimeout(2000) && token.Error() != nil {
ERROR.Println("Error subscribing,", token.Error())
}
}
// AG is subscribed at this point
client.Register(topicid, topic)
suba := NewSubackMessage(topicid, m.MessageId, m.Qos, 0)
var buf bytes.Buffer
suba.Write(&buf)
if nbytes, err := c.WriteToUDP(buf.Bytes(), r); err != nil {
ERROR.Println(err)
} else {
INFO.Printf("SUBACK sent %d bytes\n", nbytes)
}
}
}
func (ag *AGateway) handle_SUBACK(m *SubackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_UNSUBSCRIBE(m *UnsubscribeMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_UNSUBACK(m *UnsubackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_PINGREQ(m *PingreqMessage, c *net.UDPConn, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
resp := NewMessage(PINGRESP)
var buf bytes.Buffer
resp.Write(&buf)
if nbytes, err := c.WriteToUDP(buf.Bytes(), r); err != nil {
ERROR.Println(err)
} else {
INFO.Printf("PINGRESP sent %d bytes\n", nbytes)
}
}
func (ag *AGateway) handle_PINGRESP(m *PingrespMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_DISCONNECT(m *DisconnectMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
INFO.Printf("duration: %d\n", m.Duration)
// todo: cleanup the client
}
func (ag *AGateway) handle_WILLTOPICUPD(m *WillTopicUpdateMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLTOPICRESP(m *WillTopicRespMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLMSGUPD(m *WillMsgUpdateMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}
func (ag *AGateway) handle_WILLMSGRESP(m *WillMsgRespMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", m.MessageType(), r)
}