blob: 759afb4b34a3d59e691e88d399dc813616b93a66 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014-2015 IBM Corp.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation
*******************************************************************************/
package smidge
import (
P "git.eclipse.org/gitroot/paho.incubator/smidge.git/packets"
)
func (c *SNClient) receive() {
var err error
var m P.Message
DEBUG.Println(NET, "started receive()")
for {
if m, err = P.ReadPacket(c.conn); err != nil {
break
}
DEBUG.Println(NET, "Received", P.MessageNames[m.MessageType()])
c.incoming <- m
}
select {
case <-c.stop:
DEBUG.Println(NET, "stopped receive()")
default:
ERROR.Println(NET, "stopped receive() due to error")
}
return
}
func (c *SNClient) send() {
DEBUG.Println(NET, "started send()")
for {
mt := <-c.outgoing
DEBUG.Println(NET, "sending message", P.MessageNames[mt.m.MessageType()])
switch mt.m.MessageType() {
case P.REGISTER:
mt.m.(*P.RegisterMessage).MessageId = c.MessageIds.getId(mt.t)
case P.PUBLISH:
mt.m.(*P.PublishMessage).MessageId = c.MessageIds.getId(mt.t)
case P.SUBSCRIBE:
mt.m.(*P.SubscribeMessage).MessageId = c.MessageIds.getId(mt.t)
case P.UNSUBSCRIBE:
mt.m.(*P.UnsubscribeMessage).MessageId = c.MessageIds.getId(mt.t)
}
mt.m.Write(c.conn)
if mt.m.MessageType() == P.DISCONNECT {
DEBUG.Println(NET, "Sent DISCONNECT, closing connection")
c.conn.Close()
return
}
}
}
func (c *SNClient) handle() {
DEBUG.Println(NET, "started handle()")
for {
select {
case m := <-c.incoming:
DEBUG.Println(NET, "got message off <-incoming", P.MessageNames[m.MessageType()])
switch m.MessageType() {
case P.CONNACK:
ca := m.(*P.ConnackMessage)
c.setState(CONNECTED)
ct := c.suTokens[P.CONNECT].(*ConnectToken)
ct.ReturnCode = ca.ReturnCode
ct.flowComplete()
case P.REGISTER:
r := m.(*P.RegisterMessage)
c.RegisteredTopics[string(r.TopicName)] = r.TopicId
ra := P.NewMessage(P.REGACK).(*P.RegackMessage)
ra.MessageId = r.MessageId
ra.TopicId = r.TopicId
ra.ReturnCode = P.ACCEPTED
c.outgoing <- &MessageAndToken{m: ra, t: nil}
case P.REGACK:
ra := m.(*P.RegackMessage)
t := c.MessageIds.getToken(ra.MessageId).(*RegisterToken)
t.ReturnCode = ra.ReturnCode
switch ra.ReturnCode {
case P.ACCEPTED:
DEBUG.Println(NET, t.TopicName, "registered as", ra.TopicId)
c.RegisteredTopics[t.TopicName] = ra.TopicId
t.TopicId = ra.TopicId
default:
ERROR.Println(NET, ra.ReturnCode, "for REGISTER for", string(t.TopicName))
}
t.flowComplete()
case P.SUBACK:
sa := m.(*P.SubackMessage)
t := c.MessageIds.getToken(sa.MessageId).(*SubscribeToken)
t.ReturnCode = sa.ReturnCode
switch sa.ReturnCode {
case P.ACCEPTED:
t.Qos = sa.Qos
if sa.TopicId > 0x00 {
t.TopicId = sa.TopicId
switch t.topicType {
case 0x01:
c.PredefinedMessageHandlers[sa.TopicId] = t.handler
case 0x00, 0x02:
c.RegisteredTopics[t.TopicName] = sa.TopicId
c.MessageHandlers[sa.TopicId] = t.handler
}
}
default:
ERROR.Println(NET, sa.ReturnCode, "for SUBSCRIBE to", t.TopicName)
}
t.flowComplete()
case P.PUBLISH:
p := m.(*P.PublishMessage)
switch p.TopicIdType {
case 0x00:
if handler, ok := c.MessageHandlers[p.TopicId]; ok {
go handler(c, p)
}
default:
if c.DefaultMessageHandler != nil {
go c.DefaultMessageHandler(c, p)
}
}
case P.PUBACK:
pa := m.(*P.PubackMessage)
t := c.MessageIds.getToken(pa.MessageId).(*PublishToken)
t.ReturnCode = pa.ReturnCode
t.TopicId = pa.TopicId
t.flowComplete()
case P.DISCONNECT:
c.setState(UNCONNECTED)
DEBUG.Println(NET, "Received DISCONNECT, closing socket")
c.conn.Close()
case P.WILLTOPICREQ:
wm := P.NewMessage(P.WILLTOPIC).(*P.WillTopicMessage)
wm.WillTopic = []byte(c.will.Topic)
wm.Qos = c.will.Qos
wm.Retain = c.will.Retain
c.outgoing <- &MessageAndToken{m: wm, t: nil}
case P.WILLMSGREQ:
wm := P.NewMessage(P.WILLMSG).(*P.WillMsgMessage)
wm.WillMsg = c.will.Data
c.outgoing <- &MessageAndToken{m: wm, t: nil}
}
}
}
}