| /******************************************************************************* |
| * Copyright (c) 2014-2015 IBM Corp. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * |
| * Contributors: |
| * Allan Stockdill-Mander - initial API and implementation |
| *******************************************************************************/ |
| |
| package smidge |
| |
| import ( |
| P "git.eclipse.org/gitroot/paho.incubator/smidge.git/packets" |
| ) |
| |
| func (c *SNClient) receive() { |
| var err error |
| var m P.Message |
| |
| DEBUG.Println(NET, "started receive()") |
| for { |
| if m, err = P.ReadPacket(c.conn); err != nil { |
| break |
| } |
| DEBUG.Println(NET, "Received", P.MessageNames[m.MessageType()]) |
| c.incoming <- m |
| } |
| |
| select { |
| case <-c.stop: |
| DEBUG.Println(NET, "stopped receive()") |
| default: |
| ERROR.Println(NET, "stopped receive() due to error") |
| } |
| return |
| } |
| |
| func (c *SNClient) send() { |
| DEBUG.Println(NET, "started send()") |
| for { |
| mt := <-c.outgoing |
| DEBUG.Println(NET, "sending message", P.MessageNames[mt.m.MessageType()]) |
| switch mt.m.MessageType() { |
| case P.REGISTER: |
| mt.m.(*P.RegisterMessage).MessageId = c.MessageIds.getId(mt.t) |
| case P.PUBLISH: |
| mt.m.(*P.PublishMessage).MessageId = c.MessageIds.getId(mt.t) |
| case P.SUBSCRIBE: |
| mt.m.(*P.SubscribeMessage).MessageId = c.MessageIds.getId(mt.t) |
| case P.UNSUBSCRIBE: |
| mt.m.(*P.UnsubscribeMessage).MessageId = c.MessageIds.getId(mt.t) |
| } |
| mt.m.Write(c.conn) |
| if mt.m.MessageType() == P.DISCONNECT { |
| DEBUG.Println(NET, "Sent DISCONNECT, closing connection") |
| c.conn.Close() |
| return |
| } |
| } |
| } |
| |
| func (c *SNClient) handle() { |
| DEBUG.Println(NET, "started handle()") |
| for { |
| select { |
| case m := <-c.incoming: |
| DEBUG.Println(NET, "got message off <-incoming", P.MessageNames[m.MessageType()]) |
| switch m.MessageType() { |
| case P.CONNACK: |
| ca := m.(*P.ConnackMessage) |
| c.setState(CONNECTED) |
| ct := c.suTokens[P.CONNECT].(*ConnectToken) |
| ct.ReturnCode = ca.ReturnCode |
| ct.flowComplete() |
| case P.REGISTER: |
| r := m.(*P.RegisterMessage) |
| c.RegisteredTopics[string(r.TopicName)] = r.TopicId |
| ra := P.NewMessage(P.REGACK).(*P.RegackMessage) |
| ra.MessageId = r.MessageId |
| ra.TopicId = r.TopicId |
| ra.ReturnCode = P.ACCEPTED |
| c.outgoing <- &MessageAndToken{m: ra, t: nil} |
| case P.REGACK: |
| ra := m.(*P.RegackMessage) |
| t := c.MessageIds.getToken(ra.MessageId).(*RegisterToken) |
| t.ReturnCode = ra.ReturnCode |
| switch ra.ReturnCode { |
| case P.ACCEPTED: |
| DEBUG.Println(NET, t.TopicName, "registered as", ra.TopicId) |
| c.RegisteredTopics[t.TopicName] = ra.TopicId |
| t.TopicId = ra.TopicId |
| default: |
| ERROR.Println(NET, ra.ReturnCode, "for REGISTER for", string(t.TopicName)) |
| } |
| t.flowComplete() |
| case P.SUBACK: |
| sa := m.(*P.SubackMessage) |
| t := c.MessageIds.getToken(sa.MessageId).(*SubscribeToken) |
| t.ReturnCode = sa.ReturnCode |
| switch sa.ReturnCode { |
| case P.ACCEPTED: |
| t.Qos = sa.Qos |
| if sa.TopicId > 0x00 { |
| t.TopicId = sa.TopicId |
| switch t.topicType { |
| case 0x01: |
| c.PredefinedMessageHandlers[sa.TopicId] = t.handler |
| case 0x00, 0x02: |
| c.RegisteredTopics[t.TopicName] = sa.TopicId |
| c.MessageHandlers[sa.TopicId] = t.handler |
| } |
| } |
| default: |
| ERROR.Println(NET, sa.ReturnCode, "for SUBSCRIBE to", t.TopicName) |
| } |
| t.flowComplete() |
| case P.PUBLISH: |
| p := m.(*P.PublishMessage) |
| switch p.TopicIdType { |
| case 0x00: |
| if handler, ok := c.MessageHandlers[p.TopicId]; ok { |
| go handler(c, p) |
| } |
| default: |
| if c.DefaultMessageHandler != nil { |
| go c.DefaultMessageHandler(c, p) |
| } |
| } |
| case P.PUBACK: |
| pa := m.(*P.PubackMessage) |
| t := c.MessageIds.getToken(pa.MessageId).(*PublishToken) |
| t.ReturnCode = pa.ReturnCode |
| t.TopicId = pa.TopicId |
| t.flowComplete() |
| case P.DISCONNECT: |
| c.setState(UNCONNECTED) |
| DEBUG.Println(NET, "Received DISCONNECT, closing socket") |
| c.conn.Close() |
| case P.WILLTOPICREQ: |
| wm := P.NewMessage(P.WILLTOPIC).(*P.WillTopicMessage) |
| wm.WillTopic = []byte(c.will.Topic) |
| wm.Qos = c.will.Qos |
| wm.Retain = c.will.Retain |
| c.outgoing <- &MessageAndToken{m: wm, t: nil} |
| case P.WILLMSGREQ: |
| wm := P.NewMessage(P.WILLMSG).(*P.WillMsgMessage) |
| wm.WillMsg = c.will.Data |
| c.outgoing <- &MessageAndToken{m: wm, t: nil} |
| } |
| } |
| } |
| } |