blob: 79e3c7143e3ad38990872bfdeef0fcb550e62b07 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2014-2015 IBM Corp.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
* Seth Hoenig
*******************************************************************************/
package gateway
import (
"sync"
)
type TopicTree struct {
sync.RWMutex
root *node
}
type node struct {
clients []*Client
children map[string]*node
}
func newNode() *node {
return &node{
make([]*Client, 0),
make(map[string]*node),
}
}
// return true if level needed to be created, false otherwise
func (n *node) goTo(level string) (*node, bool) {
created := false
if n.children[level] == nil {
n.children[level] = newNode()
created = true
}
return n.children[level], created
}
// return true if this is the first client to be added
// to this node (representing a subscription)
func (n *node) addClient(client *Client) bool {
isFirst := len(n.clients) == 0
n.clients = append(n.clients, client)
return isFirst
}
func NewTopicTree() *TopicTree {
var t TopicTree
t.root = newNode()
return &t
}
// return false if there is already a subscriber of this topic,
// true if this is the first subscriber
func (tt *TopicTree) AddSubscription(client *Client, topic string) (bool, error) {
defer tt.Unlock()
tt.Lock()
INFO.Printf("AddSubscription(\"%s\", \"%s\")\n", client.ClientId, topic)
if levels, e := ValidateTopicFilter(topic); e != nil {
return false, e
} else {
n := tt.root
// walk the tree following the path of topic, creating new
// nodes as necessary
for _, level := range levels {
n, _ = n.goTo(level)
}
return n.addClient(client), nil
}
}
// topic could contain wild cards, however we do only consider the literal
// topic string - (wilds are not evaluated for this)
func (tt *TopicTree) RemoveSubscription(s *Client, topic string) error {
defer tt.Unlock()
tt.Lock()
if levels, e := ValidateTopicFilter(topic); e != nil {
return e
} else {
n := tt.root
for _, level := range levels {
if n = n.children[level]; n == nil {
ERROR.Printf("no subscription exists \"%s\"\n", topic)
return ErrNoSuchSubscriptionExists
}
}
if len(n.clients) < 1 {
ERROR.Printf("no clients of subscription \"%s\"\n", topic)
return ErrNoSubscribers
}
for i := 0; i < len(n.clients); i++ {
if n.clients[i].ClientId == s.ClientId {
// inexpensive way of removing from a slice
n.clients[i] = n.clients[len(n.clients)-1]
n.clients = n.clients[0 : len(n.clients)-1]
INFO.Printf("deleted subscription of client \"%s\"\n", s.ClientId)
return nil
}
}
ERROR.Printf("client \"%s\" was not subscribed to \"%s\"\n", s.ClientId, topic)
return ErrClientNotSubscribed
}
}
// topic MUST be valid (ie no wild cards, no empty level, no ending slash)
func (tt *TopicTree) SubscribersOf(topic string) ([]*Client, error) {
defer tt.RUnlock()
tt.RLock()
clients := make([]*Client, 0)
n := tt.root
if levels, e := ValidateTopicName(topic); e != nil {
return nil, e
} else {
subscribers(n, levels, &clients)
return clients, nil
}
}
func subscribers(n *node, levels []string, clients *[]*Client) {
if len(levels) == 0 {
return
}
if hash := n.children["#"]; hash != nil {
*clients = append(*clients, hash.clients...)
}
if plus := n.children["+"]; plus != nil {
if len(levels) == 1 {
*clients = append(*clients, plus.clients...)
} else {
subscribers(plus, levels[1:], clients)
}
}
if match := n.children[levels[0]]; match != nil {
if len(levels) == 1 {
*clients = append(*clients, match.clients...)
} else {
subscribers(match, levels[1:], clients)
}
}
}