blob: 788a8079fdadc379ded4c389c44850f5f3c1edbc [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014-2015 IBM Corp.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
* Seth Hoenig
*******************************************************************************/
package gateway
import (
"bytes"
"net"
"sync"
P "git.eclipse.org/gitroot/paho.incubator/smidge.git/packets"
)
type SNClient interface {
AddrString() string
}
type Client struct {
sync.RWMutex
ClientId string
Conn *net.UDPConn
Address *net.UDPAddr
registeredTopics map[uint16]string
pendingMessages map[uint16]*P.PublishMessage
}
func NewClient(ClientId string, Conn *net.UDPConn, Address *net.UDPAddr) *Client {
INFO.Printf("NewClient, id: \"%s\"\n", ClientId)
return &Client{
sync.RWMutex{},
ClientId,
Conn,
Address,
make(map[uint16]string),
make(map[uint16]*P.PublishMessage),
}
}
func (c *Client) Write(m P.Message) error {
var buf bytes.Buffer
m.Write(&buf)
_, e := c.Conn.WriteToUDP(buf.Bytes(), c.Address)
return e
}
func (c *Client) Register(topicId uint16, topic string) {
defer c.Unlock()
c.Lock()
INFO.Printf("client %s registered topicId %d\n", c.ClientId, topicId)
c.registeredTopics[topicId] = topic
}
func (c *Client) Registered(topicId uint16) bool {
defer c.RUnlock()
c.RLock()
_, ok := c.registeredTopics[topicId]
return ok
}
func (c *Client) AddPendingMessage(p *P.PublishMessage) {
defer c.Unlock()
c.Lock()
c.pendingMessages[p.TopicId] = p
}
func (c *Client) FetchPendingMessage(topicId uint16) *P.PublishMessage {
defer c.Unlock()
c.Lock()
pm := c.pendingMessages[topicId]
delete(c.pendingMessages, topicId)
return pm
}
func (c *Client) AddrString() string {
return c.Address.String()
}
func (c *Client) String() string {
return c.ClientId
}