| /******************************************************************************* |
| * Copyright (c) 2014-2015 IBM Corp. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Allan Stockdill-Mander |
| * Seth Hoenig |
| *******************************************************************************/ |
| |
| package gateway |
| |
| import ( |
| "bytes" |
| "net" |
| "os" |
| "sync" |
| |
| P "git.eclipse.org/gitroot/paho.incubator/smidge.git/packets" |
| |
| //MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" |
| ) |
| |
| type TGateway struct { |
| stopsig chan os.Signal |
| port int |
| mqttBroker string |
| clients Clients |
| tIndex topicNames |
| } |
| |
| func NewTGateway(gc *GatewayConfig, stopsig chan os.Signal) *TGateway { |
| t := &TGateway{ |
| stopsig, |
| gc.port, |
| gc.mqttbroker, |
| Clients{ |
| sync.RWMutex{}, |
| make(map[string]SNClient), |
| }, |
| topicNames{ |
| sync.RWMutex{}, |
| make(map[uint16]string), |
| 0, |
| }, |
| } |
| return t |
| } |
| |
| func (t *TGateway) Port() int { |
| return t.port |
| } |
| |
| func (t *TGateway) Start() { |
| go t.awaitStop() |
| INFO.Println("Transparent Gataway is started") |
| listen(t) |
| } |
| |
| func (t *TGateway) awaitStop() { |
| <-t.stopsig |
| INFO.Println("Transparent Gateway is stopped") |
| os.Exit(0) |
| } |
| |
| func (t *TGateway) OnPacket(nbytes int, buffer []byte, con *net.UDPConn, addr *net.UDPAddr) { |
| INFO.Println("TG OnPacket!") |
| INFO.Printf("bytes: %s\n", string(buffer[0:nbytes])) |
| |
| buf := bytes.NewBuffer(buffer) |
| rawmsg, _ := P.ReadPacket(buf) |
| |
| INFO.Printf("rawmsg.MessageType(): %s\n", P.MessageNames[rawmsg.MessageType()]) |
| |
| switch msg := rawmsg.(type) { |
| case *P.AdvertiseMessage: |
| t.handle_ADVERTISE(msg, addr) |
| case *P.SearchGwMessage: |
| t.handle_SEARCHGW(msg, addr) |
| case *P.GwInfoMessage: |
| t.handle_GWINFO(msg, addr) |
| case *P.ConnectMessage: |
| t.handle_CONNECT(msg, con, addr) |
| case *P.ConnackMessage: |
| t.handle_CONNACK(msg, addr) |
| case *P.WillTopicReqMessage: |
| t.handle_WILLTOPICREQ(msg, addr) |
| case *P.WillTopicMessage: |
| t.handle_WILLTOPIC(msg, addr) |
| case *P.WillMsgReqMessage: |
| t.handle_WILLMSGREQ(msg, addr) |
| case *P.WillMsgMessage: |
| t.handle_WILLMSG(msg, addr) |
| case *P.RegisterMessage: |
| t.handle_REGISTER(msg, con, addr) |
| case *P.RegackMessage: |
| t.handle_REGACK(msg, addr) |
| case *P.PublishMessage: |
| t.handle_PUBLISH(msg, addr) |
| case *P.PubackMessage: |
| t.handle_PUBACK(msg, addr) |
| case *P.PubcompMessage: |
| t.handle_PUBCOMP(msg, addr) |
| case *P.PubrecMessage: |
| t.handle_PUBREC(msg, addr) |
| case *P.PubrelMessage: |
| t.handle_PUBREL(msg, addr) |
| case *P.SubscribeMessage: |
| t.handle_SUBSCRIBE(msg, addr) |
| case *P.SubackMessage: |
| t.handle_SUBACK(msg, addr) |
| case *P.UnsubackMessage: |
| t.handle_UNSUBACK(msg, addr) |
| case *P.PingreqMessage: |
| t.handle_PINGREQ(msg, con, addr) |
| case *P.DisconnectMessage: |
| t.handle_DISCONNECT(msg, addr) |
| case *P.WillTopicUpdateMessage: |
| t.handle_WILLTOPICUPD(msg, addr) |
| case *P.WillTopicRespMessage: |
| t.handle_WILLTOPICRESP(msg, addr) |
| case *P.WillMsgUpdateMessage: |
| t.handle_WILLMSGUPD(msg, addr) |
| case *P.WillMsgRespMessage: |
| t.handle_WILLMSGRESP(msg, addr) |
| default: |
| ERROR.Printf("Unknown Message Type %T\n", msg) |
| } |
| } |
| |
| func (t *TGateway) handle_ADVERTISE(m *P.AdvertiseMessage, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| } |
| |
| func (t *TGateway) handle_SEARCHGW(m *P.SearchGwMessage, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| } |
| |
| func (t *TGateway) handle_GWINFO(m *P.GwInfoMessage, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| } |
| |
| func (t *TGateway) handle_CONNECT(m *P.ConnectMessage, c *net.UDPConn, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| INFO.Println(m.ProtocolId, m.Duration, m.ClientId) |
| if clientid, err := validateClientId(m.ClientId); err != nil { |
| ERROR.Println(err) |
| } else { |
| INFO.Printf("clientid: %s\n", clientid) |
| INFO.Printf("remoteaddr: %s\n", a) |
| INFO.Printf("will: %v\n", m.Will) |
| if m.Will { |
| // todo: will msg |
| } |
| if tClient, err := NewTClient(string(clientid), t.mqttBroker, c, a); err != nil { |
| ERROR.Println(err) |
| } else { |
| t.clients.AddClient(tClient) |
| |
| // establish connection to mqtt broker |
| |
| ca := P.NewMessage(P.CONNACK).(*P.ConnackMessage) |
| ca.ReturnCode = 0 |
| if err = tClient.Write(ca); err != nil { |
| ERROR.Println(err) |
| } else { |
| INFO.Println("CONNACK was sent") |
| } |
| } |
| } |
| } |
| |
| func (t *TGateway) handle_CONNACK(m *P.ConnackMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLTOPICREQ(m *P.WillTopicReqMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLTOPIC(m *P.WillTopicMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLMSGREQ(m *P.WillMsgReqMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLMSG(m *P.WillMsgMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_REGISTER(m *P.RegisterMessage, c *net.UDPConn, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| topic := string(m.TopicName) |
| var topicid uint16 |
| if !t.tIndex.containsTopic(topic) { |
| topicid = t.tIndex.putTopic(topic) |
| } else { |
| topicid = t.tIndex.getId(topic) |
| } |
| |
| INFO.Printf("t topicid: %d\n", topicid) |
| |
| tclient := t.clients.GetClient(r).(*TClient) |
| tclient.Register(topicid, topic) |
| |
| ra := P.NewRegackMessage(topicid, m.MessageId, 0) |
| INFO.Printf("ra.Msgid: %d\n", ra.MessageId) |
| |
| if err := tclient.Write(ra); err != nil { |
| ERROR.Println(err) |
| } else { |
| INFO.Println("REGACK sent") |
| } |
| } |
| |
| func (t *TGateway) handle_REGACK(m *P.RegackMessage, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| } |
| |
| func (t *TGateway) handle_PUBLISH(m *P.PublishMessage, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| tclient := t.clients.GetClient(a).(*TClient) |
| |
| topic := t.tIndex.getTopic(m.TopicId) |
| |
| INFO.Println(topic, m.Qos, m.Retain, m.Data) |
| if token := tclient.mqttClient.Publish(topic, m.Qos, m.Retain, m.Data); token.WaitTimeout(2000) && token.Error() != nil { |
| ERROR.Println("Error publishing message", token.Error()) |
| return |
| } |
| INFO.Println("PUBLISH published") |
| } |
| |
| func (t *TGateway) handle_PUBACK(m *P.PubackMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_PUBCOMP(m *P.PubcompMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_PUBREC(m *P.PubrecMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_PUBREL(m *P.PubrelMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_SUBSCRIBE(m *P.SubscribeMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| topic := "" |
| if m.TopicIdType == 0 { // todo: other topic id types, also use enum |
| topic = string(m.TopicName) |
| } else { |
| ERROR.Println("other topic id types not supported yet") |
| topic = "not_implemented" |
| } |
| tclient := t.clients.GetClient(r).(*TClient) |
| INFO.Printf("subscribe, qos: %d, topic: %s\n", m.Qos, topic) |
| tclient.subscribeMQTT(m.Qos, topic, &t.tIndex) |
| |
| suba := P.NewSubackMessage(0, m.MessageId, m.Qos, 0) |
| |
| if err := tclient.Write(suba); err != nil { |
| ERROR.Println(err) |
| } else { |
| INFO.Println("SUBACK sent") |
| } |
| } |
| |
| func (t *TGateway) handle_SUBACK(m *P.SubackMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_UNSUBSCRIBE(m *P.UnsubscribeMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_UNSUBACK(m *P.UnsubackMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_PINGREQ(m *P.PingreqMessage, c *net.UDPConn, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| tclient := t.clients.GetClient(a).(*TClient) |
| |
| resp := P.NewMessage(P.PINGRESP) |
| |
| if err := tclient.Write(resp); err != nil { |
| ERROR.Println(err) |
| } else { |
| INFO.Println("PINGRESP sent") |
| } |
| } |
| |
| func (t *TGateway) handle_PINGRESP(m *P.PingrespMessage, a *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], a) |
| } |
| |
| func (t *TGateway) handle_DISCONNECT(m *P.DisconnectMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| tclient := t.clients.GetClient(r).(*TClient) |
| tclient.disconnectMQTT() |
| t.clients.RemoveClient(tclient.ClientId) |
| } |
| |
| func (t *TGateway) handle_WILLTOPICUPD(m *P.WillTopicUpdateMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLTOPICRESP(m *P.WillTopicRespMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLMSGUPD(m *P.WillMsgUpdateMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |
| |
| func (t *TGateway) handle_WILLMSGRESP(m *P.WillMsgRespMessage, r *net.UDPAddr) { |
| INFO.Printf("handle_%s from %v\n", P.MessageNames[m.MessageType()], r) |
| } |