blob: b3f6fd41eaae11583b4d28a3ee6df9e74c8b6e9c [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014-2015 IBM Corp.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
* Seth Hoenig
*******************************************************************************/
package gateway
import (
"bytes"
"net"
"os"
"sync"
P "git.eclipse.org/gitroot/paho.incubator/smidge.git/packets"
//MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)
type TGateway struct {
stopsig chan os.Signal
port int
mqttBroker string
clients Clients
tIndex topicNames
}
func NewTGateway(gc *GatewayConfig, stopsig chan os.Signal) *TGateway {
t := &TGateway{
stopsig,
gc.port,
gc.mqttbroker,
Clients{
sync.RWMutex{},
make(map[string]SNClient),
},
topicNames{
sync.RWMutex{},
make(map[uint16]string),
0,
},
}
return t
}
func (t *TGateway) Port() int {
return t.port
}
func (t *TGateway) Start() {
go t.awaitStop()
INFO.Println("Transparent Gataway is started")
listen(t)
}
func (t *TGateway) awaitStop() {
<-t.stopsig
INFO.Println("Transparent Gateway is stopped")
os.Exit(0)
}
func (t *TGateway) OnPacket(nbytes int, buffer []byte, con *net.UDPConn, addr *net.UDPAddr) {
INFO.Println("TG OnPacket!")
INFO.Printf("bytes: %s\n", string(buffer[0:nbytes]))
buf := bytes.NewBuffer(buffer)
rawmsg, _ := P.ReadPacket(buf)
INFO.Printf("rawmsg.MessageType(): %s\n", P.MessageNames[rawmsg.MessageType()])
switch msg := rawmsg.(type) {
case *P.AdvertiseMessage:
t.handle_ADVERTISE(msg, addr)
case *P.SearchGwMessage:
t.handle_SEARCHGW(msg, addr)
case *P.GwInfoMessage:
t.handle_GWINFO(msg, addr)
case *P.ConnectMessage:
t.handle_CONNECT(msg, con, addr)
case *P.ConnackMessage:
t.handle_CONNACK(msg, addr)
case *P.WillTopicReqMessage:
t.handle_WILLTOPICREQ(msg, addr)
case *P.WillTopicMessage:
t.handle_WILLTOPIC(msg, addr)
case *P.WillMsgReqMessage:
t.handle_WILLMSGREQ(msg, addr)
case *P.WillMsgMessage:
t.handle_WILLMSG(msg, addr)
case *P.RegisterMessage:
t.handle_REGISTER(msg, con, addr)
case *P.RegackMessage:
t.handle_REGACK(msg, addr)
case *P.PublishMessage:
t.handle_PUBLISH(msg, addr)
case *P.PubackMessage:
t.handle_PUBACK(msg, addr)
case *P.PubcompMessage:
t.handle_PUBCOMP(msg, addr)
case *P.PubrecMessage:
t.handle_PUBREC(msg, addr)
case *P.PubrelMessage:
t.handle_PUBREL(msg, addr)
case *P.SubscribeMessage:
t.handle_SUBSCRIBE(msg, addr)
case *P.SubackMessage:
t.handle_SUBACK(msg, addr)
case *P.UnsubackMessage:
t.handle_UNSUBACK(msg, addr)
case *P.PingreqMessage:
t.handle_PINGREQ(msg, con, addr)
case *P.DisconnectMessage:
t.handle_DISCONNECT(msg, addr)
case *P.WillTopicUpdateMessage:
t.handle_WILLTOPICUPD(msg, addr)
case *P.WillTopicRespMessage:
t.handle_WILLTOPICRESP(msg, addr)
case *P.WillMsgUpdateMessage:
t.handle_WILLMSGUPD(msg, addr)
case *P.WillMsgRespMessage:
t.handle_WILLMSGRESP(msg, addr)
default:
ERROR.Printf("Unknown Message Type %T\n", msg)
}
}
func (t *TGateway) handle_ADVERTISE(m *P.AdvertiseMessage, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
}
func (t *TGateway) handle_SEARCHGW(m *P.SearchGwMessage, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
}
func (t *TGateway) handle_GWINFO(m *P.GwInfoMessage, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
}
func (t *TGateway) handle_CONNECT(m *P.ConnectMessage, c *net.UDPConn, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
INFO.Println(m.ProtocolId, m.Duration, m.ClientId)
if clientid, err := validateClientId(m.ClientId); err != nil {
ERROR.Println(err)
} else {
INFO.Printf("clientid: %s\n", clientid)
INFO.Printf("remoteaddr: %s\n", a)
INFO.Printf("will: %v\n", m.Will)
if m.Will {
// todo: will msg
}
if tClient, err := NewTClient(string(clientid), t.mqttBroker, c, a); err != nil {
ERROR.Println(err)
} else {
t.clients.AddClient(tClient)
// establish connection to mqtt broker
ca := P.NewMessage(P.CONNACK).(*P.ConnackMessage)
ca.ReturnCode = 0
if err = tClient.Write(ca); err != nil {
ERROR.Println(err)
} else {
INFO.Println("CONNACK was sent")
}
}
}
}
func (t *TGateway) handle_CONNACK(m *P.ConnackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLTOPICREQ(m *P.WillTopicReqMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLTOPIC(m *P.WillTopicMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLMSGREQ(m *P.WillMsgReqMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLMSG(m *P.WillMsgMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_REGISTER(m *P.RegisterMessage, c *net.UDPConn, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
topic := string(m.TopicName)
var topicid uint16
if !t.tIndex.containsTopic(topic) {
topicid = t.tIndex.putTopic(topic)
} else {
topicid = t.tIndex.getId(topic)
}
INFO.Printf("t topicid: %d\n", topicid)
tclient := t.clients.GetClient(r).(*TClient)
tclient.Register(topicid, topic)
ra := P.NewRegackMessage(topicid, m.MessageId, 0)
INFO.Printf("ra.Msgid: %d\n", ra.MessageId)
if err := tclient.Write(ra); err != nil {
ERROR.Println(err)
} else {
INFO.Println("REGACK sent")
}
}
func (t *TGateway) handle_REGACK(m *P.RegackMessage, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
}
func (t *TGateway) handle_PUBLISH(m *P.PublishMessage, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
tclient := t.clients.GetClient(a).(*TClient)
topic := t.tIndex.getTopic(m.TopicId)
INFO.Println(topic, m.Qos, m.Retain, m.Data)
if token := tclient.mqttClient.Publish(topic, m.Qos, m.Retain, m.Data); token.WaitTimeout(2000) && token.Error() != nil {
ERROR.Println("Error publishing message", token.Error())
return
}
INFO.Println("PUBLISH published")
}
func (t *TGateway) handle_PUBACK(m *P.PubackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_PUBCOMP(m *P.PubcompMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_PUBREC(m *P.PubrecMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_PUBREL(m *P.PubrelMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_SUBSCRIBE(m *P.SubscribeMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
topic := ""
if m.TopicIdType == 0 { // todo: other topic id types, also use enum
topic = string(m.TopicName)
} else {
ERROR.Println("other topic id types not supported yet")
topic = "not_implemented"
}
tclient := t.clients.GetClient(r).(*TClient)
INFO.Printf("subscribe, qos: %d, topic: %s\n", m.Qos, topic)
tclient.subscribeMQTT(m.Qos, topic, &t.tIndex)
suba := P.NewSubackMessage(0, m.MessageId, m.Qos, 0)
if err := tclient.Write(suba); err != nil {
ERROR.Println(err)
} else {
INFO.Println("SUBACK sent")
}
}
func (t *TGateway) handle_SUBACK(m *P.SubackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_UNSUBSCRIBE(m *P.UnsubscribeMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_UNSUBACK(m *P.UnsubackMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_PINGREQ(m *P.PingreqMessage, c *net.UDPConn, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
tclient := t.clients.GetClient(a).(*TClient)
resp := P.NewMessage(P.PINGRESP)
if err := tclient.Write(resp); err != nil {
ERROR.Println(err)
} else {
INFO.Println("PINGRESP sent")
}
}
func (t *TGateway) handle_PINGRESP(m *P.PingrespMessage, a *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a)
}
func (t *TGateway) handle_DISCONNECT(m *P.DisconnectMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
tclient := t.clients.GetClient(r).(*TClient)
tclient.disconnectMQTT()
t.clients.RemoveClient(tclient.ClientId)
}
func (t *TGateway) handle_WILLTOPICUPD(m *P.WillTopicUpdateMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLTOPICRESP(m *P.WillTopicRespMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLMSGUPD(m *P.WillMsgUpdateMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}
func (t *TGateway) handle_WILLMSGRESP(m *P.WillMsgRespMessage, r *net.UDPAddr) {
INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r)
}