blob: 04d86fd6a356e16e0580185d38eff28f24714cb9 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2007, 2010 Wind River Systems, Inc. and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
/*
* Implements UDP based service discovery.
*
* The discovery protocol uses unicast and multicast UDP packets to propagate information
* about available TCF peers. The protocol is truly distributed - all participants have
* same functionality and no central authority is defined.
*
* TCF discovery scope is one subnet. Access across subnets is supported by TCF proxy.
*
* TCF discovery participants use a dedicated UDP port - 1534, however discovery will
* work fine if the port is not available for some participants, but at least one
* participant on a subnet must be able to bind itself to the default port, otherwise the protocol
* will not function properly. An agent that owns a default port is called "master",
* an agent that owns non-default port is called "slave".
*
* Every slave will check periodically availability of default port, and can become a master if
* the port becomes available.
*
* Since slaves cannot receive multicast packets, each agent maintains a list of slaves,
* and uses unicast packets to sent info to agents from the list.
*/
#include <config.h>
#if ENABLE_Discovery
#include <stddef.h>
#include <errno.h>
#include <assert.h>
#include <string.h>
#include <framework/tcf.h>
#include <framework/myalloc.h>
#include <framework/events.h>
#include <framework/errors.h>
#include <framework/trace.h>
#include <framework/peer.h>
#include <framework/ip_ifc.h>
#include <framework/asyncreq.h>
#include <services/discovery.h>
#include <services/discovery_udp.h>
#define MAX_IFC 10
#define MAX_RECV_ERRORS 8
static int ifc_cnt;
static ip_ifc_info ifc_list[MAX_IFC];
static time_t last_req_slaves_time[MAX_IFC];
static int send_all_ok[MAX_IFC];
static int udp_server_port = 0;
static int udp_server_socket = -1;
static int udp_server_generation = 0;
static AsyncReqInfo recvreq;
static int recvreq_pending = 0;
static int recvreq_error_cnt = 0;
static int recvreq_generation = 0;
static struct sockaddr_in recvreq_addr;
static char recv_buf[MAX_PACKET_SIZE];
static char send_buf[MAX_PACKET_SIZE];
static int send_size;
static time_t last_master_packet_time = 0;
typedef struct SlaveInfo {
struct sockaddr_in addr;
time_t last_packet_time; /* Time of last packet from this slave */
time_t last_req_slaves_time; /* Time of last UDP_REQ_SLAVES packet from this slave */
} SlaveInfo;
static SlaveInfo * slave_info = NULL;
static int slave_cnt = 0;
static int slave_max = 0;
static void app_char(char ch) {
if (send_size < (int)sizeof(send_buf)) {
send_buf[send_size++] = ch;
}
}
static void app_str(const char * str) {
while (*str && send_size < (int)sizeof(send_buf)) {
send_buf[send_size++] = *str++;
}
}
static void app_strz(const char * str) {
app_str(str);
app_char(0);
}
static int get_slave_addr(char * buf, ssize_t * pos, struct sockaddr_in * addr, uint64_t * timestamp) {
char * port = buf + *pos;
char * stmp = buf + *pos;
char * host = buf + *pos;
size_t len = strlen(buf + *pos);
uint64_t ts = 0;
int n = 0;
while (*port && *port != ':') port++;
if (*port == ':') *port++ = 0;
host = port;
while (*host && *host != ':') host++;
if (*host == ':') *host++ = 0;
*pos += len + 1;
memset(addr, 0, sizeof(*addr));
addr->sin_family = AF_INET;
if (inet_pton(AF_INET, host, &addr->sin_addr) <= 0) return 0;
n = atoi(port);
if (n == DISCOVERY_TCF_PORT) return 0;
addr->sin_port = htons((unsigned short)n);
while (*stmp >= '0' && *stmp <= '9') {
ts = (ts * 10) + (*stmp++ - '0');
}
*timestamp = ts;
return 1;
}
static void trigger_recv(void);
static void udp_server_recv(void * x);
static void delayed_server_recv(void * x) {
assert(recvreq_pending);
if (recvreq_generation != udp_server_generation) {
/* Cancel and restart */
recvreq_pending = 0;
trigger_recv();
}
else {
async_req_post(&recvreq);
}
}
static void trigger_recv(void) {
if (recvreq_pending || udp_server_socket < 0) return;
recvreq_pending = 1;
recvreq_generation = udp_server_generation;
recvreq.done = udp_server_recv;
recvreq.client_data = NULL;
recvreq.type = AsyncReqRecvFrom;
recvreq.u.sio.sock = udp_server_socket;
recvreq.u.sio.flags = 0;
recvreq.u.sio.bufp = recv_buf;
recvreq.u.sio.bufsz = sizeof recv_buf;
recvreq.u.sio.addr = (struct sockaddr *)&recvreq_addr;
recvreq.u.sio.addrlen = sizeof recvreq_addr;
memset(&recvreq_addr, 0, sizeof recvreq_addr);
if (recvreq_error_cnt >= MAX_RECV_ERRORS) {
/* Delay the request to avoid flooding with error reports */
trace(LOG_ALWAYS, "delayed_server_recv error occured: %d", recvreq_error_cnt);
post_event_with_delay(delayed_server_recv, NULL, 1000000);
}
else {
async_req_post(&recvreq);
}
}
static int create_server_socket(void) {
int sock = -1;
int error = 0;
const char * reason = NULL;
const int i = 1;
struct addrinfo hints;
struct addrinfo * reslist = NULL;
struct addrinfo * res = NULL;
struct sockaddr_in local_addr;
#if defined(_WRS_KERNEL)
int local_addr_size = sizeof(local_addr);
#else
socklen_t local_addr_size = sizeof(local_addr);
#endif
char port_str[16];
sprintf(port_str, "%d", DISCOVERY_TCF_PORT);
memset(&local_addr, 0, sizeof(local_addr));
memset(&hints, 0, sizeof hints);
hints.ai_family = PF_INET;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
hints.ai_flags = AI_PASSIVE;
error = loc_getaddrinfo(NULL, port_str, &hints, &reslist);
if (error) {
trace(LOG_ALWAYS, "getaddrinfo error: %s", loc_gai_strerror(error));
return set_gai_errno(error);
}
for (res = reslist; res != NULL; res = res->ai_next) {
sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sock < 0) {
error = errno;
reason = "create";
continue;
}
if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char *)&i, sizeof(i)) < 0) {
error = errno;
reason = "setsockopt(SO_BROADCAST)";
closesocket(sock);
sock = -1;
continue;
}
if (bind(sock, res->ai_addr, res->ai_addrlen)) {
error = errno;
if (res->ai_addr->sa_family == AF_INET) {
struct sockaddr_in addr;
trace(LOG_DISCOVERY, "Cannot bind to default UDP port %d: %s",
DISCOVERY_TCF_PORT, errno_to_str(error));
assert(sizeof(addr) >= res->ai_addrlen);
memset(&addr, 0, sizeof(addr));
memcpy(&addr, res->ai_addr, res->ai_addrlen);
addr.sin_port = 0;
error = 0;
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr))) {
error = errno;
if (udp_server_socket >= 0 &&
recvreq_error_cnt < MAX_RECV_ERRORS) {
loc_freeaddrinfo(reslist);
closesocket(sock);
return 0;
}
}
}
if (error) {
reason = "bind";
closesocket(sock);
sock = -1;
continue;
}
}
if (getsockname(sock, (struct sockaddr *)&local_addr, &local_addr_size)) {
error = errno;
reason = "getsockname";
closesocket(sock);
sock = -1;
continue;
}
/* Only bind once - don't see how getaddrinfo with the given
* arguments could return more then one anyway */
break;
}
if (sock < 0) {
assert(error);
loc_freeaddrinfo(reslist);
if (udp_server_socket >= 0 && recvreq_error_cnt < MAX_RECV_ERRORS) {
return 0;
}
trace(LOG_ALWAYS, "Discovery service socket %s error: %s",
reason, errno_to_str(error));
return error;
}
if (udp_server_socket >= 0) closesocket(udp_server_socket);
udp_server_port = ntohs(local_addr.sin_port);
udp_server_socket = sock;
udp_server_generation++;
loc_freeaddrinfo(reslist);
trace(LOG_DISCOVERY, "UDP discovery server created at port %d", udp_server_port);
trigger_recv();
return 0;
}
static int send_packet(ip_ifc_info * ifc, struct sockaddr_in * addr) {
if (addr == NULL) {
/* Broadcast */
int n = 0;
static struct sockaddr_in buf;
/* Send to all slaves */
while (n < slave_cnt) {
SlaveInfo * s = slave_info + n++;
send_packet(ifc, &s->addr);
}
/* Send to all masters by using interface broadcast address */
memset(&buf, 0, sizeof(buf));
addr = &buf;
addr->sin_family = AF_INET;
addr->sin_port = htons(DISCOVERY_TCF_PORT);
addr->sin_addr.s_addr = ifc->addr;
if (*(uint8_t *)&ifc->addr != 127) addr->sin_addr.s_addr |= ~ifc->mask;
}
/* Don't send if address does not belong to subnet of the interface */
if ((ifc->addr & ifc->mask) != (addr->sin_addr.s_addr & ifc->mask)) return 0;
/* Don't send to ourselves */
if (ifc->addr == addr->sin_addr.s_addr && udp_server_port == ntohs(addr->sin_port)) return 0;
#if ENABLE_Trace
if (log_file != NULL && (log_mode & LOG_DISCOVERY) != 0) {
int i;
char buf[sizeof(send_buf) + 32];
size_t pos = 0;
char ch;
switch (send_buf[4]) {
case UDP_ACK_INFO:
pos = strlcpy(buf, "ACK_INFO", sizeof(buf));
i = 8;
while (i < send_size) {
if (strncmp(send_buf + i, "ID=", 3) == 0) {
if (pos < sizeof(buf) - 1) buf[pos++] = ' ';
while (i < send_size && (ch = send_buf[i++]) != 0) {
if (pos < sizeof(buf) - 1) buf[pos++] = ch;
}
break;
}
else {
while (i < send_size && send_buf[i++]) {}
}
}
break;
case UDP_ACK_SLAVES:
pos = strlcpy(buf, "ACK_SLAVES", sizeof(buf));
i = 8;
while (i < send_size) {
if (pos < sizeof(buf) - 1) buf[pos++] = ' ';
while (i < send_size && (ch = send_buf[i++]) != 0) {
if (pos < sizeof(buf) - 1) buf[pos++] = ch;
}
}
break;
case UDP_REQ_INFO:
pos = strlcpy(buf, "REQ_INFO", sizeof(buf));
break;
case UDP_REQ_SLAVES:
pos = strlcpy(buf, "REQ_SLAVES", sizeof(buf));
break;
default:
pos = strlcpy(buf, "???", sizeof(buf));
break;
}
buf[pos] = 0;
trace(LOG_DISCOVERY, "%s to %s:%d", buf, inet_ntoa(addr->sin_addr), ntohs(addr->sin_port));
}
#endif
if (sendto(udp_server_socket, send_buf, send_size, 0, (struct sockaddr *)addr, sizeof(*addr)) >= 0) return 1;
trace(LOG_ALWAYS, "Can't send UDP discovery packet to %s:%d %s",
inet_ntoa(addr->sin_addr), ntohs(addr->sin_port), errno_to_str(errno));
return 0;
}
static int udp_send_peer_info(PeerServer * ps, void * arg) {
struct sockaddr_in * addr = (struct sockaddr_in *)arg;
const char * host = NULL;
const char * prot = NULL;
struct in_addr peer_addr;
int n;
if ((ps->flags & PS_FLAG_PRIVATE) != 0) return 0;
if ((ps->flags & PS_FLAG_DISCOVERABLE) == 0) return 0;
memset(&peer_addr, 0, sizeof(peer_addr));
prot = peer_server_getprop(ps, "TransportName", NULL);
if (prot != NULL && (strcmp(prot, "TCP") == 0 || strcmp(prot, "SSL") == 0)) {
host = peer_server_getprop(ps, "Host", NULL);
if (host == NULL || inet_pton(AF_INET, host, &peer_addr) <= 0) return 0;
}
send_size = 8;
for (n = 0; n < ifc_cnt; n++) {
ip_ifc_info * ifc = ifc_list + n;
if ((ps->flags & PS_FLAG_LOCAL) == 0) {
/* Info about non-local peers is sent only by master */
if (udp_server_port != DISCOVERY_TCF_PORT) return 0;
if (host == NULL) return 0;
if (ifc->addr != htonl(INADDR_LOOPBACK) && ifc->addr != peer_addr.s_addr) continue;
}
if (ifc->addr != htonl(INADDR_LOOPBACK)) {
if (host == NULL) continue;
assert(peer_addr.s_addr != INADDR_ANY);
if ((ifc->addr & ifc->mask) != (peer_addr.s_addr & ifc->mask)) {
/* Peer address does not belong to subnet of this interface */
continue;
}
}
if (send_size == 8) {
int i;
send_buf[4] = UDP_ACK_INFO;
app_str("ID=");
app_strz(ps->id);
for (i = 0; i < ps->ind; i++) {
const char * name = ps->list[i].name;
assert(strcmp(name, "ID") != 0);
app_str(name);
app_char('=');
app_strz(ps->list[i].value);
}
}
send_all_ok[n] = 1;
send_packet(ifc, addr);
}
return 0;
}
static void udp_send_ack_info(struct sockaddr_in * addr) {
assert(is_dispatch_thread());
peer_server_iter(udp_send_peer_info, addr);
}
static void udp_send_req_info(struct sockaddr_in * addr) {
int n;
for (n = 0; n < ifc_cnt; n++) {
ip_ifc_info * ifc = ifc_list + n;
send_size = 8;
send_buf[4] = UDP_REQ_INFO;
send_packet(ifc, addr);
}
}
static void udp_send_empty_packet(struct sockaddr_in * addr) {
int n;
for (n = 0; n < ifc_cnt; n++) {
ip_ifc_info * ifc = ifc_list + n;
if (send_all_ok[n]) continue;
send_size = 8;
send_buf[4] = UDP_ACK_SLAVES;
send_packet(ifc, addr);
}
}
static void udp_send_req_slaves(ip_ifc_info * ifc, struct sockaddr_in * addr) {
send_size = 8;
send_buf[4] = UDP_REQ_SLAVES;
send_packet(ifc, addr);
}
static void udp_send_ack_slaves_one(SlaveInfo * s) {
ip_ifc_info * ifc;
time_t timenow = time(NULL);
int ttl = (int)(s->last_packet_time + PEER_DATA_RETENTION_PERIOD - timenow) * 1000;
if (ttl <= 0) return;
for (ifc = ifc_list; ifc < &ifc_list[ifc_cnt]; ifc++) {
int n = 0;
char str[256];
if ((ifc->addr & ifc->mask) != (s->addr.sin_addr.s_addr & ifc->mask)) continue;
send_size = 8;
send_buf[4] = UDP_ACK_SLAVES;
snprintf(str, sizeof(str), "%u:%u:%s", ttl, ntohs(s->addr.sin_port), inet_ntoa(s->addr.sin_addr));
app_strz(str);
while (n < slave_cnt) {
SlaveInfo * s = slave_info + n++;
if (s->last_req_slaves_time + PEER_DATA_RETENTION_PERIOD < timenow) continue;
send_packet(ifc, &s->addr);
}
}
}
static void udp_send_ack_slaves_all(struct sockaddr_in * addr, time_t timenow) {
int k;
for (k = 0; k < ifc_cnt; k++) {
int n = 0;
ip_ifc_info * ifc = ifc_list + k;
if ((ifc->addr & ifc->mask) != (addr->sin_addr.s_addr & ifc->mask)) continue;
send_size = 8;
send_buf[4] = UDP_ACK_SLAVES;
while (n < slave_cnt) {
char str[256];
SlaveInfo * s = slave_info + n++;
int ttl = (int)(s->last_packet_time + PEER_DATA_RETENTION_PERIOD - timenow) * 1000;
if (ttl <= 0) continue;
if (addr->sin_addr.s_addr == s->addr.sin_addr.s_addr && addr->sin_port == s->addr.sin_port) continue;
if (ifc->addr != htonl(INADDR_LOOPBACK)) {
if ((ifc->addr & ifc->mask) != (s->addr.sin_addr.s_addr & ifc->mask)) {
/* Slave address does not belong to subnet of this interface */
continue;
}
}
snprintf(str, sizeof(str), "%u:%u:%s", ttl, ntohs(s->addr.sin_port), inet_ntoa(s->addr.sin_addr));
if (send_size + strlen(str) >= PREF_PACKET_SIZE) {
send_packet(ifc, addr);
send_size = 8;
}
app_strz(str);
send_all_ok[k] = 1;
}
if (send_size > 8) send_packet(ifc, addr);
}
}
static void udp_send_all(struct sockaddr_in * addr, SlaveInfo * s) {
memset(send_all_ok, 0, sizeof(send_all_ok));
udp_send_ack_info(addr);
if (addr != NULL && s != NULL) {
time_t timenow = time(NULL);
if (s->last_req_slaves_time + PEER_DATA_RETENTION_PERIOD >= timenow) {
udp_send_ack_slaves_all(addr, timenow);
}
}
udp_send_empty_packet(addr);
}
static SlaveInfo * add_slave(struct sockaddr_in * addr, time_t timestamp) {
int i = 0;
SlaveInfo * s = NULL;
while (i < slave_cnt) {
s = slave_info + i++;
if (memcmp(&s->addr, addr, sizeof(struct sockaddr_in)) == 0) {
if (s->last_packet_time < timestamp) s->last_packet_time = timestamp;
return s;
}
}
if (slave_max == 0) {
assert(slave_cnt == 0);
slave_max = 16;
slave_info = (SlaveInfo *)loc_alloc(sizeof(SlaveInfo) * slave_max);
}
else if (slave_cnt >= slave_max) {
assert(slave_cnt == slave_max);
slave_max *= 2;
slave_info = (SlaveInfo *)loc_realloc(slave_info, sizeof(SlaveInfo) * slave_max);
}
s = slave_info + slave_cnt++;
s->addr = *addr;
s->last_packet_time = timestamp;
s->last_req_slaves_time = 0;
udp_send_req_info(addr);
udp_send_all(addr, s);
udp_send_ack_slaves_one(s);
return s;
}
static void udp_refresh_timer(void * arg) {
time_t timenow = time(NULL);
if (slave_cnt > 0) {
/* Cleanup slave table */
int i = 0;
int j = 0;
while (i < slave_cnt) {
SlaveInfo * s = slave_info + i++;
if (s->last_packet_time + PEER_DATA_RETENTION_PERIOD >= timenow) {
if (j < i) slave_info[j] = *s;
j++;
}
}
slave_cnt = j;
}
if (udp_server_port != DISCOVERY_TCF_PORT && last_master_packet_time + PEER_DATA_RETENTION_PERIOD / 2 <= timenow) {
/* No master reponces, try to become a master */
create_server_socket();
}
/* Refresh list of network interfaces */
ifc_cnt = build_ifclist(udp_server_socket, MAX_IFC, ifc_list);
if (udp_server_port != DISCOVERY_TCF_PORT) {
int i;
for (i = 0; i < ifc_cnt; i++) {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons((short)udp_server_port);
addr.sin_addr.s_addr = ifc_list[i].addr;
add_slave(&addr, timenow);
}
}
/* Broadcast peer info */
udp_send_all(NULL, NULL);
post_event_with_delay(udp_refresh_timer, NULL, PEER_DATA_REFRESH_PERIOD * 1000000);
}
static int is_remote(struct sockaddr_in * addr) {
int i;
if (ntohs(addr->sin_port) != udp_server_port) return 1;
for (i = 0; i < ifc_cnt; i++) {
if (addr->sin_addr.s_addr == ifc_list[i].addr) return 0;
}
return 1;
}
static void udp_receive_req_info(SlaveInfo * s) {
trace(LOG_DISCOVERY, "REQ_INFO from %s:%d",
inet_ntoa(recvreq_addr.sin_addr), ntohs(recvreq_addr.sin_port));
udp_send_all(&recvreq_addr, s);
}
static void udp_receive_ack_info(void) {
PeerServer * ps = peer_server_alloc();
char * p = recv_buf + 8;
char * e = recv_buf + recvreq.u.sio.rval;
assert(is_dispatch_thread());
while (p < e) {
char * name = p;
char * value = NULL;
while (p < e && *p != '\0' && *p != '=') p++;
if (p >= e || *p != '=') {
p = NULL;
break;
}
*p++ = '\0';
value = p;
while (p < e && *p != '\0') p++;
if (p >= e) {
p = NULL;
break;
}
peer_server_addprop(ps, loc_strdup(name), loc_strdup(value));
p++;
}
if (p != NULL && ps->id != NULL) {
/* TODO: should ignore peer info if peer host does not belong to one of known subnets */
trace(LOG_DISCOVERY, "ACK_INFO from %s:%d, ID=%s",
inet_ntoa(recvreq_addr.sin_addr), ntohs(recvreq_addr.sin_port), ps->id);
ps->flags = PS_FLAG_DISCOVERABLE;
peer_server_add(ps, PEER_DATA_RETENTION_PERIOD);
}
else {
trace(LOG_ALWAYS, "Received malformed UDP discovery packet from %s:%d",
inet_ntoa(recvreq_addr.sin_addr), ntohs(recvreq_addr.sin_port));
peer_server_free(ps);
}
}
static void udp_receive_req_slaves(SlaveInfo * s, time_t timenow) {
trace(LOG_DISCOVERY, "REQ_SLAVES from %s:%d",
inet_ntoa(recvreq_addr.sin_addr), ntohs(recvreq_addr.sin_port));
if (s != NULL) s->last_req_slaves_time = timenow;
udp_send_ack_slaves_all(&recvreq_addr, timenow);
}
static void udp_receive_ack_slaves(time_t timenow) {
ssize_t pos = 8;
ssize_t len = recvreq.u.sio.rval;
while (pos < len) {
struct sockaddr_in addr;
uint64_t timestamp;
if (get_slave_addr(recv_buf, &pos, &addr, &timestamp)) {
time_t delta = 60 * 10; /* 10 minutes */
time_t timeval = 0;
if (timestamp < 3600000) {
/* Timestamp is "time to live" in milliseconds */
timeval = timenow + (time_t)(timestamp / 1000) - PEER_DATA_RETENTION_PERIOD;
}
else if (timestamp < (uint64_t)timenow + 50000000) {
/* Timestamp is in seconds */
timeval = (time_t)timestamp;
}
else {
/* Timestamp is in milliseconds */
timeval = (time_t)(timestamp / 1000);
}
if (log_mode & LOG_DISCOVERY) {
char buf[64];
snprintf(buf, sizeof(buf), "%s:%d", inet_ntoa(recvreq_addr.sin_addr), ntohs(recvreq_addr.sin_port));
trace(LOG_DISCOVERY, "ACK_SLAVES %"PRId64":%u:%s from %s",
timestamp, ntohs(addr.sin_port), inet_ntoa(addr.sin_addr), buf);
}
if (timeval < timenow - delta || timeval > timenow + delta) {
trace(LOG_DISCOVERY, "Discovery: invalid slave info timestamp %"PRId64" from %s:%d",
timestamp, inet_ntoa(recvreq_addr.sin_addr), ntohs(recvreq_addr.sin_port));
timeval = timenow - PEER_DATA_RETENTION_PERIOD / 2;
}
add_slave(&addr, timeval);
}
}
}
static void udp_server_recv(void * x) {
assert(recvreq_pending != 0);
assert(x == &recvreq);
recvreq_pending = 0;
if (recvreq.error != 0) {
if (recvreq_generation != udp_server_generation) {
recvreq_error_cnt = 0;
}
else {
recvreq_error_cnt++;
trace(LOG_ALWAYS, "UDP socket receive failed: %s", errno_to_str(recvreq.error));
}
}
else {
recvreq_error_cnt = 0;
if (recvreq.u.sio.rval >= 8 &&
recv_buf[0] == 'T' &&
recv_buf[1] == 'C' &&
recv_buf[2] == 'F' &&
recv_buf[3] == UDP_VERSION &&
is_remote(&recvreq_addr)) {
int n = 0;
time_t timenow = time(NULL);
SlaveInfo * s = NULL;
if (ntohs(recvreq_addr.sin_port) != DISCOVERY_TCF_PORT) {
/* Packet from a slave, save its address */
s = add_slave(&recvreq_addr, timenow);
}
switch (recv_buf[4]) {
case UDP_REQ_INFO:
udp_receive_req_info(s);
break;
case UDP_ACK_INFO:
udp_receive_ack_info();
break;
case UDP_REQ_SLAVES:
udp_receive_req_slaves(s, timenow);
break;
case UDP_ACK_SLAVES:
udp_receive_ack_slaves(timenow);
break;
}
for (n = 0; n < ifc_cnt; n++) {
ip_ifc_info * ifc = ifc_list + n;
if ((ifc->addr & ifc->mask) == (recvreq_addr.sin_addr.s_addr & ifc->mask)) {
time_t delay = PEER_DATA_RETENTION_PERIOD / 3;
if (ntohs(recvreq_addr.sin_port) != DISCOVERY_TCF_PORT) delay = PEER_DATA_RETENTION_PERIOD / 3 * 2;
else if (recvreq_addr.sin_addr.s_addr != ifc->addr) delay = PEER_DATA_RETENTION_PERIOD / 2;
if (last_req_slaves_time[n] + delay <= timenow) {
udp_send_req_slaves(ifc, &recvreq_addr);
last_req_slaves_time[n] = timenow;
}
/* Remember time only if local host master */
if (ifc->addr == recvreq_addr.sin_addr.s_addr && ntohs(recvreq_addr.sin_port) == DISCOVERY_TCF_PORT) {
last_master_packet_time = timenow;
}
}
}
}
}
trigger_recv();
}
static void local_peer_changed(PeerServer * ps, int type, void * arg) {
trace(LOG_DISCOVERY, "Peer changed: ps=0x%x, type=%d", ps, type);
switch (type) {
case PS_EVENT_ADDED:
case PS_EVENT_CHANGED:
udp_send_peer_info(ps, NULL);
break;
}
}
int discovery_start_udp(void) {
int error = create_server_socket();
if (error) return error;
peer_server_add_listener(local_peer_changed, NULL);
post_event_with_delay(udp_refresh_timer, NULL, PEER_DATA_REFRESH_PERIOD * 1000000);
ifc_cnt = build_ifclist(udp_server_socket, MAX_IFC, ifc_list);
memset(send_buf, 0, sizeof(send_buf));
send_buf[0] = 'T';
send_buf[1] = 'C';
send_buf[2] = 'F';
send_buf[3] = UDP_VERSION;
udp_send_req_info(NULL);
udp_send_all(NULL, NULL);
return 0;
}
#endif /* ENABLE_Discovery */