blob: 44480b6341a53a95df981f390365ad384d1aac02 [file] [log] [blame]
# *****************************************************************************
# * Copyright (c) 2011, 2016 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
# * http://www.eclipse.org/legal/epl-v10.html
# *
# * Contributors:
# * Wind River Systems - initial API and implementation
# *****************************************************************************
"""
Locator service uses transport layer to search
for peers and to collect and maintain up-to-date
data about peer's attributes.
"""
import platform
import threading
import time
import socket
import cStringIO
from .. import locator
from ...util import logging
from ...channel import fromJSONSequence, toJSONSequence
from ...channel.ChannelProxy import ChannelProxy
from ... import protocol, services, channel, peer, errors
# Flag indicating whether tracing of the the discovery activity is enabled.
__TRACE_DISCOVERY__ = False
class SubNet(object):
def __init__(self, prefix_length, address, broadcast):
self.prefix_length = prefix_length
self.address = address
self.broadcast = broadcast
self.last_slaves_req_time = 0
def contains(self, addr):
if addr is None or self.address is None:
return False
a1 = addr.getAddress()
a2 = self.address.getAddress()
if len(a1) != len(a2):
return False
i = 0
if self.prefix_length <= len(a1) * 8:
l = self.prefix_length
else:
l = len(a1) * 8
while i + 8 <= l:
n = i / 8
if a1[n] != a2[n]:
return False
i += 8
while i < l:
n = i / 8
m = 1 << (7 - i % 8)
if (a1[n] & m) != (a2[n] & m):
return False
i += 1
return True
def __eq__(self, o):
if not isinstance(o, SubNet):
return False
return self.prefix_length == o.prefix_length and \
self.broadcast == o.broadcast and \
self.address == o.address
def __hash__(self):
return hash(self.address)
def __str__(self):
return "%s/%d" % (self.address.getHostAddress(), self.prefix_length)
class Slave(object):
# Time of last packet receiver from self slave
last_packet_time = 0
# Time of last REQ_SLAVES packet received from self slave
last_req_slaves_time = 0
def __init__(self, address, port):
self.address = address
self.port = port
def __str__(self):
return "%s/%d" % (self.address.getHostAddress(), self.port)
class AddressCacheItem(object):
address = None
time_stamp = 0
used = False
def __init__(self, host):
self.host = host
class InetAddress(object):
"Mimicking Java InetAddress class"
def __init__(self, host, addr):
self.host = host
self.addr = addr
def getAddress(self):
return socket.inet_aton(self.addr)
def getHostAddress(self):
return self.addr
def __eq__(self, other):
if not isinstance(other, InetAddress):
return False
return self.addr == other.addr
def __str__(self):
return "%s/%s" % (self.host or "", self.addr)
class InputPacket(object):
"Wrapper for UDP packet data."
def __init__(self, data, addr, port):
self.data = data
self.addr = addr
self.port = port
def getLength(self):
return len(self.data)
def getData(self):
return self.data
def getPort(self):
return self.port
def getAddress(self):
return self.addr
def __str__(self):
return "[address=%s,port=%d,data=\"%s\"]" % \
(self.getAddress(), self.getPort(), self.data)
DISCOVEY_PORT = 1534
MAX_PACKET_SIZE = 9000 - 40 - 8
PREF_PACKET_SIZE = 1500 - 40 - 8
# TODO: research usage of DNS-SD (DNS Service Discovery) to discover TCF peers
class LocatorService(locator.LocatorService):
locator = None
peers = {} # str->Peer
listeners = [] # list of LocatorListener
error_log = set() # set of str
addr_list = []
addr_cache = {} # str->AddressCacheItem
addr_request = False
local_peer = None
last_master_packet_time = 0
@classmethod
def getLocalPeer(cls):
return cls.local_peer
@classmethod
def getListeners(cls):
return cls.listeners[:]
@classmethod
def startup(cls):
if cls.locator:
cls.locator._startup()
@classmethod
def shutdown(cls):
if cls.locator:
cls.locator._shutdown()
def __init__(self):
self._error_log_lock = threading.RLock()
self._alive = False
LocatorService.locator = self
LocatorService.local_peer = peer.LocalPeer()
def _startup(self):
if self._alive:
return
self._alive = True
self._addr_cache_lock = threading.Condition()
self.subnets = set()
self.slaves = []
self.inp_buf = bytearray(MAX_PACKET_SIZE)
self.out_buf = bytearray(MAX_PACKET_SIZE)
service = self
class TimerThread(threading.Thread):
def __init__(self, _callable):
self._callable = _callable
super(TimerThread, self).__init__()
def run(self):
while service._alive:
try:
time.sleep(locator.DATA_RETENTION_PERIOD / 4 / 1000.)
protocol.invokeAndWait(self._callable)
except RuntimeError:
# TCF event dispatch is shut down
return
except Exception as x:
service._log("Unhandled exception in TCF discovery " +
"timer thread", x)
self.timer_thread = TimerThread(self.__refresh_timer)
class DNSLookupThread(threading.Thread):
def run(self):
while service._alive:
try:
itemSet = None
with service._addr_cache_lock:
period = locator.DATA_RETENTION_PERIOD
if not LocatorService.addr_request:
service._addr_cache_lock.wait(period)
msec = int(time.time() * 1000)
for host, a in LocatorService.addr_cache.items():
if a.time_stamp + period * 10 < msec:
if a.used:
if itemSet is None:
itemSet = set()
itemSet.add(a)
else:
del LocatorService.addr_cache[host]
LocatorService.addr_request = False
if itemSet is not None:
for a in itemSet:
addr = None
try:
addr = socket.gethostbyname(a.host)
except socket.gaierror:
pass
with service._addr_cache_lock:
if addr is None:
a.address = None
else:
a.address = InetAddress(a.host, addr)
a.time_stamp = msec
a.used = False
except Exception as x:
service._log("Unhandled exception in TCF discovery " +
"DNS lookup thread", x)
self.dns_lookup_thread = DNSLookupThread()
class InputThread(threading.Thread):
def __init__(self, _callable):
self._callable = _callable
super(InputThread, self).__init__()
def run(self):
try:
while service._alive:
sock = service.socket
try:
data, addr = sock.recvfrom(MAX_PACKET_SIZE)
p = InputPacket(data, InetAddress(None, addr[0]),
addr[1])
protocol.invokeAndWait(self._callable, p)
except RuntimeError:
# TCF event dispatch is shutdown
return
except socket.error as x:
if sock != service.socket:
continue
# frequent error on windows, unknown reason
if x.errno == 10054:
continue
port = sock.getsockname()[1]
service._log("Cannot read from datagram socket " +
"at port %d" % port, x)
time.sleep(2)
except Exception as x:
service._log("Unhandled exception in socket reading " +
"thread", x)
self.input_thread = InputThread(self.__handleDatagramPacket)
try:
self.loopback_addr = InetAddress(None, "127.0.0.1")
self.out_buf[0:8] = 'TCF%s\0\0\0\0' % locator.CONF_VERSION
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
self.socket.bind(('', DISCOVEY_PORT))
if __TRACE_DISCOVERY__:
logging.trace("Became the master agent (bound to port " +
"%d)" % self.socket.getsockname()[1])
except socket.error as x:
self.socket.bind(('', 0))
if __TRACE_DISCOVERY__:
logging.trace("Became a slave agent (bound to port " +
"%d)" % self.socket.getsockname()[1])
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.input_thread.setName("TCF Locator Receiver")
self.timer_thread.setName("TCF Locator Timer")
self.dns_lookup_thread.setName("TCF Locator DNS Lookup")
self.input_thread.setDaemon(True)
self.timer_thread.setDaemon(True)
self.dns_lookup_thread.setDaemon(True)
self.input_thread.start()
self.timer_thread.start()
self.dns_lookup_thread.start()
class LocatorListener(locator.LocatorListener):
def peerAdded(self, peer):
service._sendPeerInfo(peer, None, 0)
def peerChanged(self, peer):
service._sendPeerInfo(peer, None, 0)
self.listeners.append(LocatorListener())
self.__refreshSubNetList()
self.__sendPeersRequest(None, 0)
self.__sendAll(None, 0, None, int(time.time() * 1000))
except Exception as x:
self._log("Cannot open UDP socket for TCF discovery protocol", x)
def _shutdown(self):
if self._alive:
self._alive = False
def __makeErrorReport(self, code, msg):
err = {}
err[errors.ERROR_TIME] = int(time.time() * 1000)
err[errors.ERROR_CODE] = code
err[errors.ERROR_FORMAT] = msg
return err
def _command(self, channel, token, name, data):
try:
if name == "redirect":
peer_id = fromJSONSequence(data)[0]
_peer = self.peers.get(peer_id)
if _peer is None:
errNum = errors.TCF_ERROR_UNKNOWN_PEER
error = self.__makeErrorReport(errNum, "Unknown peer ID")
channel.sendResult(token, toJSONSequence((error,)))
return
channel.sendResult(token, toJSONSequence((None,)))
if isinstance(_peer, peer.LocalPeer):
seq = (channel.getLocalServices(),)
channel.sendEvent(protocol.getLocator(), "Hello",
toJSONSequence(seq))
return
ChannelProxy(channel, _peer.openChannel())
elif name == "sync":
channel.sendResult(token, None)
elif name == "getPeers":
arr = []
for p in self.peers.values():
arr.append(p.getAttributes())
channel.sendResult(token, toJSONSequence((None, arr)))
else:
channel.rejectCommand(token)
except Exception as x:
channel.terminate(x)
def _log(self, msg, x):
if not self._alive:
return
# Don't report same error multiple times to avoid filling up the log
# file.
with self._error_log_lock:
if msg in self.error_log:
return
self.error_log.add(msg)
protocol.log(msg, x)
def __getInetAddress(self, host):
if not host:
return None
with self._addr_cache_lock:
i = self.addr_cache.get(host)
if i is None:
i = AddressCacheItem(host)
ch = host[0]
if ch == '[' or ch == ':' or ch >= '0' and ch <= '9':
try:
addr = socket.gethostbyname(host)
i.address = InetAddress(host, addr)
except socket.gaierror:
pass
i.time_stamp = int(time.time() * 1000)
else:
# socket.gethostbyname() can cause long delay - delegate to
# background thread
LocatorService.addr_request = True
self._addr_cache_lock.notify()
self.addr_cache[host] = i
i.used = True
return i.address
def __refresh_timer(self):
tm = int(time.time() * 1000)
# Cleanup slave table
if self.slaves:
i = 0
while i < len(self.slaves):
s = self.slaves[i]
if s.last_packet_time + locator.DATA_RETENTION_PERIOD < tm:
del self.slaves[i]
else:
i += 1
# Cleanup peers table
stale_peers = None
for p in self.peers.values():
if isinstance(p, peer.RemotePeer):
if p.getLastUpdateTime() + locator.DATA_RETENTION_PERIOD < tm:
if stale_peers == None:
stale_peers = []
stale_peers.append(p)
if stale_peers is not None:
for p in stale_peers:
p.dispose()
# Try to become a master
port = self.socket.getsockname()[1]
period = locator.DATA_RETENTION_PERIOD / 2
if port != DISCOVEY_PORT and \
self.last_master_packet_time + period <= tm:
s0 = self.socket
s1 = None
try:
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.bind(DISCOVEY_PORT)
s1.setsockopt(socket.SOL_UDP, socket.SO_BROADCAST, 1)
self.socket = s1
s0.close()
except:
pass
self.__refreshSubNetList()
if port != DISCOVEY_PORT:
for subnet in self.subnets:
self.__addSlave(subnet.address, port, tm, tm)
self.__sendAll(None, 0, None, tm)
def __addSlave(self, addr, port, timestamp, time_now):
for s in self.slaves:
if s.port == port and s.address == addr:
if s.last_packet_time < timestamp:
s.last_packet_time = timestamp
return s
s = Slave(addr, port)
s.last_packet_time = timestamp
self.slaves.append(s)
self.__sendPeersRequest(addr, port)
self.__sendAll(addr, port, s, time_now)
self.__sendSlaveInfo(s, time_now)
return s
def __refreshSubNetList(self):
subNetSet = set()
try:
self.__getSubNetList(subNetSet)
except Exception as x:
self._log("Cannot get list of network interfaces", x)
for s in tuple(self.subnets):
if s in subNetSet:
continue
self.subnets.remove(s)
for s in subNetSet:
if s in self.subnets:
continue
self.subnets.add(s)
if __TRACE_DISCOVERY__:
buf = cStringIO.StringIO()
buf.write("Refreshed subnet list:")
for subnet in self.subnets:
buf.write("\n\t* address=%s, broadcast=%s" % \
(subnet.address, subnet.broadcast))
logging.trace(buf.getvalue())
def __getAllIpAddresses(self):
import fcntl # @UnresolvedImport
import struct
import array
nBytes = 8192
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
names = array.array('B', '\0' * nBytes)
ifcfg = struct.unpack('iL',
fcntl.ioctl(s.fileno(), 0x8912,
struct.pack('iL', nBytes,
names.buffer_info()[0])))[0]
namestr = names.tostring()
res = []
# the ipconf structure changed at a time, check if there are more than
# 40 bytes
ifconfSz = 32
sz = 32
altSz = 40
if len(namestr) > sz:
# check for name at 32->32+16
secondName = str(namestr[sz:sz + 16].split('\0', 1)[0])
secondAltName = str(namestr[altSz:altSz + 16].split('\0', 1)[0])
if (not secondName.isalnum()) and (secondAltName.isalnum()):
ifconfSz = 40
for ix in range(0, ifcfg, ifconfSz):
ipStartIx = ix + 20
ipEndIx = ix + 24
ip = namestr[ipStartIx:ipEndIx]
res.append(str(ord(ip[0])) + '.' + str(ord(ip[1])) + '.' + \
str(ord(ip[2])) + '.' + str(ord(ip[3])))
return (res)
def __getSubNetList(self, _set):
hostname = socket.gethostname()
if len(self.addr_list) == 0:
# Create the list of IP address for this host
_, _, self.addr_list = socket.gethostbyname_ex(hostname)
if not "127.0.0.1" in self.addr_list:
self.addr_list.append("127.0.0.1")
# On unix hosts, use sockets to get the other interfaces IPs
if (platform.system() != 'Windows'):
for ip_addr in self.__getAllIpAddresses():
if not ip_addr in self.addr_list:
self.addr_list.append(ip_addr)
for address in self.addr_list:
rawaddr = socket.inet_aton(address)
if len(rawaddr) != 4:
continue
rawaddr = rawaddr[:3] + '\xFF'
broadcast = socket.inet_ntoa(rawaddr)
_set.add(SubNet(24, InetAddress(hostname, address),
InetAddress(None, broadcast)))
def __getUTF8Bytes(self, s):
return s.encode("UTF-8")
# Used for tracing
packetTypes = [
None,
"CONF_REQ_INFO",
"CONF_PEER_INFO",
"CONF_REQ_SLAVES",
"CONF_SLAVES_INFO",
"CONF_PEERS_REMOVED"
]
def __sendDatagramPacket(self, subnet, size, addr, port):
try:
if addr is None:
addr = subnet.broadcast
port = DISCOVEY_PORT
for slave in self.slaves:
self.__sendDatagramPacket(subnet, size, slave.address,
slave.port)
if not subnet.contains(addr):
return False
if port == self.socket.getsockname()[1] and \
addr == subnet.address:
return False
self.socket.sendto(str(self.out_buf[:size]),
(addr.getHostAddress(), port))
if __TRACE_DISCOVERY__:
attrs = None
if self.out_buf[4] == locator.CONF_PEER_INFO:
attrs = self.__parsePeerAttributes(self.out_buf, 8)
elif self.out_buf[4] == locator.CONF_SLAVES_INFO:
attrs = self.__parseIDs(self.out_buf, size)
elif self.out_buf[4] == locator.CONF_PEERS_REMOVED:
attrs = self.__parseIDs(self.out_buf, size)
self.__traceDiscoveryPacket(False,
self.packetTypes[self.out_buf[4]],
attrs, addr, port)
except Exception as x:
self._log("Cannot send datagram packet to %s" % addr, x)
return False
return True
def __parsePeerAttributes(self, data, size):
"""
Parse peer attributes in CONF_PEER_INFO packet data
@param data - the packet data
@param size - the packet size
@return a map containing the attributes
"""
attrs = {}
# Remove packet header
s = data[8:size].decode("UTF-8")
l = len(s)
i = 0
while i < l:
i0 = i
while i < l and s[i] != '=' and s[i] != '\0':
i += 1
i1 = i
if i < l and s[i] == '=':
i += 1
i2 = i
while i < l and s[i] != '\0':
i += 1
i3 = i
if i < l and s[i] == '\0':
i += 1
key = s[i0:i1]
val = s[i2:i3]
attrs[key] = val
return attrs
def __parseIDs(self, data, size):
"""
Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet
data.
@param data - the packet data
@param size - the packet size
@return a map containing the IDs
"""
cnt = 0
attrs = {}
s = data[8:size].decode("UTF-8")
l = len(s)
i = 0
while i < l:
i0 = i
while i < l and s[i] != '\0':
i += 1
if i > i0:
_id = s[i0:i]
attrs[str(cnt)] = _id
cnt += 1
while i < l and s[i] == '\0':
i += 1
return attrs
def __sendPeersRequest(self, addr, port):
self.out_buf[4] = locator.CONF_REQ_INFO
for subnet in self.subnets:
self.__sendDatagramPacket(subnet, 8, addr, port)
def _sendPeerInfo(self, _peer, addr, port):
attrs = _peer.getAttributes()
peer_addr = self.__getInetAddress(attrs.get(peer.ATTR_IP_HOST))
if peer_addr is None:
return
if attrs.get(peer.ATTR_IP_PORT) is None:
return
self.out_buf[4] = locator.CONF_PEER_INFO
i = 8
for subnet in self.subnets:
if isinstance(_peer, peer.RemotePeer):
if self.socket.getsockname()[1] != DISCOVEY_PORT:
return
if not subnet.address == self.loopback_addr and \
not subnet.address == peer_addr:
continue
if not subnet.address == self.loopback_addr:
if not subnet.contains(peer_addr):
continue
if i == 8:
sb = cStringIO.StringIO()
for key in attrs.keys():
sb.write(key)
sb.write('=')
sb.write(attrs.get(key))
sb.write('\0')
bt = self.__getUTF8Bytes(sb.getvalue())
if i + len(bt) > len(self.out_buf):
return
self.out_buf[i:i + len(bt)] = bt
i += len(bt)
if self.__sendDatagramPacket(subnet, i, addr, port):
subnet.send_all_ok = True
def __sendEmptyPacket(self, addr, port):
self.out_buf[4] = locator.CONF_SLAVES_INFO
for subnet in self.subnets:
if subnet.send_all_ok:
continue
self.__sendDatagramPacket(subnet, 8, addr, port)
def __sendAll(self, addr, port, sl, tm):
for subnet in self.subnets:
subnet.send_all_ok = False
for peer in self.peers.values():
self._sendPeerInfo(peer, addr, port)
if addr is not None and sl is not None and \
sl.last_req_slaves_time + locator.DATA_RETENTION_PERIOD >= tm:
self.__sendSlavesInfo(addr, port, tm)
self.__sendEmptyPacket(addr, port)
def __sendSlavesRequest(self, subnet, addr, port):
self.out_buf[4] = locator.CONF_REQ_SLAVES
self.__sendDatagramPacket(subnet, 8, addr, port)
def __sendSlaveInfo(self, x, tm):
ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
if ttl <= 0:
return
self.out_buf[4] = locator.CONF_SLAVES_INFO
for subnet in self.subnets:
if not subnet.contains(x.address):
continue
i = 8
s = "%d:%s:%s" % (ttl, x.port, x.address.getHostAddress())
bt = self.__getUTF8Bytes(s)
self.out_buf[i:i + len(bt)] = bt
i += len(bt)
self.out_buf[i] = 0
i += 1
for y in self.slaves:
if not subnet.contains(y.address):
continue
if y.last_req_slaves_time + locator.DATA_RETENTION_PERIOD < tm:
continue
self.__sendDatagramPacket(subnet, i, y.address, y.port)
def __sendSlavesInfo(self, addr, port, tm):
self.out_buf[4] = locator.CONF_SLAVES_INFO
for subnet in self.subnets:
if not subnet.contains(addr):
continue
i = 8
for x in self.slaves:
ttl = x.last_packet_time + locator.DATA_RETENTION_PERIOD - tm
if ttl <= 0:
return
if x.port == port and x.address == addr:
continue
if not subnet.address == self.loopback_addr:
if not subnet.contains(x.address):
continue
subnet.send_all_ok = True
s = "%d:%s:%s" % \
(x.last_packet_time, x.port, x.address.getHostAddress())
bt = self.__getUTF8Bytes(s)
if i > 8 and i + len(bt) >= PREF_PACKET_SIZE:
self.__sendDatagramPacket(subnet, i, addr, port)
i = 8
self.out_buf[i:i+len(bt)] = bt
i += len(bt)
self.out_buf[i] = 0
i += 1
if i > 8:
self.__sendDatagramPacket(subnet, i, addr, port)
def __isRemote(self, address, port):
if port != self.socket.getsockname()[1]:
return True
for s in self.subnets:
if s.address == address:
return False
return True
def __handleDatagramPacket(self, p):
try:
tm = int(time.time() * 1000)
buf = p.getData()
length = p.getLength()
if length < 8:
return
if buf[0] != 'T':
return
if buf[1] != 'C':
return
if buf[2] != 'F':
return
if buf[3] != locator.CONF_VERSION:
return
remote_port = p.getPort()
remote_address = p.getAddress()
if self.__isRemote(remote_address, remote_port):
code = ord(buf[4])
if code == locator.CONF_PEERS_REMOVED:
self.__handlePeerRemovedPacket(p)
else:
sl = None
if remote_port != DISCOVEY_PORT:
sl = self.__addSlave(remote_address, remote_port, tm,
tm)
if code == locator.CONF_PEER_INFO:
self.__handlePeerInfoPacket(p)
elif code == locator.CONF_REQ_INFO:
self.__handleReqInfoPacket(p, sl, tm)
elif code == locator.CONF_SLAVES_INFO:
self.__handleSlavesInfoPacket(p, tm)
elif code == locator.CONF_REQ_SLAVES:
self.__handleReqSlavesPacket(p, sl, tm)
for subnet in self.subnets:
if not subnet.contains(remote_address):
continue
delay = locator.DATA_RETENTION_PERIOD / 3
if remote_port != DISCOVEY_PORT:
delay = locator.DATA_RETENTION_PERIOD / 32
elif subnet.address != remote_address:
delay = locator.DATA_RETENTION_PERIOD / 2
if subnet.last_slaves_req_time + delay <= tm:
self.__sendSlavesRequest(subnet, remote_address,
remote_port)
subnet.last_slaves_req_time = tm
if subnet.address == remote_address and \
remote_port == DISCOVEY_PORT:
self.last_master_packet_time = tm
except Exception as x:
self._log("Invalid datagram packet received from %s/%s" % \
(p.getAddress(), p.getPort()), x)
def __handlePeerInfoPacket(self, p):
try:
attrs = self.__parsePeerAttributes(p.getData(), p.getLength())
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", attrs, p)
_id = attrs.get(peer.ATTR_ID)
if _id is None:
raise RuntimeError("Invalid peer info: no ID")
ok = True
host = attrs.get(peer.ATTR_IP_HOST)
if host is not None:
ok = False
peer_addr = self.__getInetAddress(host)
if peer_addr is not None:
for subnet in self.subnets:
if subnet.contains(peer_addr):
ok = True
break
if ok:
_peer = self.peers.get(_id)
if isinstance(_peer, peer.RemotePeer):
_peer.updateAttributes(attrs)
elif _peer is None:
peer.RemotePeer(attrs)
except Exception as x:
self._log("Invalid datagram packet received from %s/%s" % \
(p.getAddress(), p.getPort()), x)
def __handleReqInfoPacket(self, p, sl, tm):
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_REQ_INFO", None, p)
self.__sendAll(p.getAddress(), p.getPort(), sl, tm)
def __handleSlavesInfoPacket(self, p, time_now):
try:
attrs = self.__parseIDs(p.getData(), p.getLength())
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", attrs, p)
for s in attrs.values():
i = 0
l = len(s)
time0 = i
while i < l and s[i] != ':' and s[i] != '\0':
i += 1
time1 = i
if i < l and s[i] == ':':
i += 1
port0 = i
while i < l and s[i] != ':' and s[i] != '\0':
i += 1
port1 = i
if i < l and s[i] == ':':
i += 1
host0 = i
while i < l and s[i] != '\0':
i += 1
host1 = i
port = int(s[port0:port1])
timestamp = s[time0:time1]
host = s[host0:host1]
if port != DISCOVEY_PORT:
addr = self.__getInetAddress(host)
if addr is not None:
delta = 10006030 # 30 minutes
if len(timestamp) > 0:
time_val = int(timestamp)
else:
time_val = time_now
if time_val < 3600000:
# Time stamp is "time to live" in milliseconds
time_val = time_now + time_val / 1000 - \
locator.DATA_RETENTION_PERIOD
elif time_val < time_now / 1000 + 50000000:
# Time stamp is in seconds
time_val = 1000
else:
# Time stamp is in milliseconds
pass
if time_val < time_now - delta or \
time_val > time_now + delta:
msg = "Invalid slave info timestamp: %s -> %s" % (
timestamp,
time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(time_val /
1000.)))
self._log("Invalid datagram packet received " +
"from %s/%s" % (
p.getAddress(), p.getPort()),
Exception(msg))
time_val = time_now - \
locator.DATA_RETENTION_PERIOD / 2
self.__addSlave(addr, port, time_val, time_now)
except Exception as x:
self._log("Invalid datagram packet received from " +
"%s/%s" % (p.getAddress(), p.getPort()), x)
def __handleReqSlavesPacket(self, p, sl, tm):
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_REQ_SLAVES", None, p)
if sl is not None:
sl.last_req_slaves_time = tm
self.__sendSlavesInfo(p.getAddress(), p.getPort(), tm)
def __handlePeerRemovedPacket(self, p):
try:
attrs = self.__parseIDs(p.getData(), p.getLength())
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_PEERS_REMOVED", attrs,
p)
for _id in attrs.values():
_peer = self.peers.get(_id)
if isinstance(_peer, peer.RemotePeer):
_peer.dispose()
except Exception as x:
self._log("Invalid datagram packet received from %s/%s" % (
p.getAddress(), p.getPort()), x)
@classmethod
def getLocator(cls):
return cls.locator
def getPeers(self):
assert protocol.isDispatchThread()
return self.peers
def redirect(self, peer, done):
raise RuntimeError("Channel redirect cannot be done on local peer")
def sync(self, done):
raise RuntimeError("Channel sync cannot be done on local peer")
def addListener(self, listener):
assert listener is not None
assert protocol.isDispatchThread()
self.listeners.append(listener)
def removeListener(self, listener):
assert protocol.isDispatchThread()
self.listeners.remove(listener)
@classmethod
def __traceDiscoveryPacket(cls, received, packet_type, attrs, addr,
port=None):
"""
Log that a TCF Discovery packet has be sent or received. The trace is
sent to stdout. This should be called only if the tracing has been
turned on.
@param received
True if the packet was sent, otherwise it was received
@param packet_type
a string specifying the type of packet, e.g.,
"CONF_PEER_INFO"
@param attrs
a set of attributes relevant to the type of packet
(typically a peer's attributes)
@param addr
the network address the packet is being sent to
@param port
the port the packet is being sent to
"""
assert __TRACE_DISCOVERY__
if port is None:
# addr is a InputPacket
port = addr.getPort()
addr = addr.getAddress()
buf = cStringIO.StringIO()
buf.write(packet_type)
buf.write((" sent to ", " received from ")[received])
buf.write("%s/%s" % (addr, port))
if attrs is not None:
for key, value in attrs.items():
buf.write("\n\t%s=%s" % (key, value))
logging.trace(buf.getvalue())
class LocatorServiceProvider(services.ServiceProvider):
def getLocalService(self, _channel):
class CommandServer(channel.CommandServer):
def command(self, token, name, data):
LocatorService.locator._command(_channel, token, name, data)
_channel.addCommandServer(LocatorService.locator, CommandServer())
return (LocatorService.locator,)
services.addServiceProvider(LocatorServiceProvider())