blob: a9c131c8493d6fc526eebb374ca89cb6d9ec5bea [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2007, 2013 Wind River Systems, Inc. and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
* You may elect to redistribute this code under either of these licenses.
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
/*
* TCF communication protocol.
* This module handles registration of command and event handlers.
* It is called when new messages are received and will dispatch
* messages to the appropriate handler. It has no knowledge of what transport
* protocol is used and what services do.
*/
#include <tcf/config.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <assert.h>
#include <ctype.h>
#include <tcf/framework/protocol.h>
#include <tcf/framework/trace.h>
#include <tcf/framework/events.h>
#include <tcf/framework/events.h>
#include <tcf/framework/exceptions.h>
#include <tcf/framework/json.h>
#include <tcf/framework/myalloc.h>
static const char * LOCATOR = "Locator";
struct ServiceInfo {
void * owner;
char * name;
struct ServiceInfo * next;
};
struct MessageHandlerInfo {
Protocol * p;
ServiceInfo * service;
const char * name;
ProtocolCommandHandler2 handler;
void * client_data;
struct MessageHandlerInfo * next;
};
typedef struct MessageHandlerInfo MessageHandlerInfo;
struct EventHandlerInfo {
Channel * c;
ServiceInfo * service;
const char * name;
ProtocolEventHandler2 handler;
void * client_data;
struct EventHandlerInfo * next;
};
typedef struct EventHandlerInfo EventHandlerInfo;
struct ReplyHandlerInfo {
unsigned long tokenid;
Channel * c;
ReplyHandlerCB handler;
ProgressHandlerCB progress;
void * client_data;
struct ReplyHandlerInfo * next;
};
#define MESSAGE_HASH_SIZE (5 * MEM_USAGE_FACTOR - 1)
#define EVENT_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1)
#define REPLY_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1)
static MessageHandlerInfo * message_handlers[MESSAGE_HASH_SIZE];
static EventHandlerInfo * event_handlers[EVENT_HASH_SIZE];
static ReplyHandlerInfo * reply_handlers[REPLY_HASH_SIZE];
static ServiceInfo * services;
static int ini_done = 0;
static int proto_cnt = 0;
static char * agent_id = NULL;
struct Protocol {
int id;
int lock_cnt; /* Lock count, cannot delete when > 0 */
unsigned long tokenid;
ProtocolMessageHandler2 default_handler;
void * client_data;
};
static void read_stringz(InputStream * inp, char * str, size_t size) {
unsigned len = 0;
for (;;) {
int ch = read_stream(inp);
if (ch <= 0) {
if (ch == 0) break;
trace(LOG_ALWAYS, "Unexpected end of message");
exception(ERR_PROTOCOL);
}
if (len < size - 1) str[len++] = (char)ch;
}
str[len] = 0;
}
ServiceInfo * protocol_get_service(void * owner, const char * name) {
ServiceInfo * s = services;
while (s != NULL && (s->owner != owner || strcmp(s->name, name) != 0)) s = s->next;
if (s == NULL) {
assert(strcmp(name, "ZeroCopy") != 0);
s = (ServiceInfo *)loc_alloc(sizeof(ServiceInfo));
s->owner = owner;
s->name = loc_strdup(name);
s->next = services;
services = s;
}
return s;
}
static void free_services(void * owner) {
ServiceInfo ** sp = &services;
ServiceInfo * s;
while ((s = *sp) != NULL) {
if (s->owner == owner) {
*sp = s->next;
loc_free(s->name);
loc_free(s);
}
else {
sp = &s->next;
}
}
}
static unsigned message_hash(Protocol * p, const char * service, const char * name) {
int i;
unsigned h = (unsigned)(uintptr_t)p >> 4;
for (i = 0; service[i]; i++) h += service[i];
for (i = 0; name[i]; i++) h += name[i];
h = h + h / MESSAGE_HASH_SIZE;
return h % MESSAGE_HASH_SIZE;
}
static MessageHandlerInfo * find_message_handler(Protocol * p, char * service, char * name) {
MessageHandlerInfo * mh = message_handlers[message_hash(p, service, name)];
while (mh != NULL) {
if (mh->p == p && !strcmp(mh->service->name, service) && !strcmp(mh->name, name)) return mh;
mh = mh->next;
}
return NULL;
}
static unsigned event_hash(Channel * c, const char * service, const char * name) {
int i;
unsigned h = (unsigned)(uintptr_t)c >> 4;
for (i = 0; service[i]; i++) h += service[i];
for (i = 0; name[i]; i++) h += name[i];
h = h + h / EVENT_HASH_SIZE;
return h % EVENT_HASH_SIZE;
}
static EventHandlerInfo * find_event_handler(Channel * c, char * service, char * name) {
EventHandlerInfo * mh = event_handlers[event_hash(c, service, name)];
while (mh != NULL) {
if (mh->c == c && !strcmp(mh->service->name, service) && !strcmp(mh->name, name)) return mh;
mh = mh->next;
}
return NULL;
}
#define reply_hash(c, tokenid) ((((unsigned)(uintptr_t)(c) >> 4) + (unsigned)(tokenid)) % REPLY_HASH_SIZE)
static ReplyHandlerInfo * find_reply_handler(Channel * c, unsigned long tokenid, int take) {
ReplyHandlerInfo ** rhp = &reply_handlers[reply_hash(c, tokenid)];
ReplyHandlerInfo * rh;
while ((rh = *rhp) != NULL) {
if (rh->c == c && rh->tokenid == tokenid) {
if (take) *rhp = rh->next;
return rh;
}
rhp = &rh->next;
}
return NULL;
}
static void skip_until_EOM(Channel * c) {
for (;;) {
int ch = read_stream(&c->inp);
if (ch == MARKER_EOM) return;
if (ch == MARKER_EOS) return;
}
}
static void event_locator_hello(Channel * c);
void handle_protocol_message(Channel * c) {
char type[8];
char token[256];
char service[256];
char name[256];
char * args[4];
int error = 0;
Protocol * p = c->protocol;
assert(is_dispatch_thread());
read_stringz(&c->inp, type, sizeof(type));
if (strlen(type) != 1) {
trace(LOG_ALWAYS, "Invalid TCF message: %s ...", type);
error = ERR_PROTOCOL;
}
else if (type[0] == 'C') {
Trap trap;
read_stringz(&c->inp, token, sizeof(token));
read_stringz(&c->inp, service, sizeof(service));
read_stringz(&c->inp, name, sizeof(name));
trace(LOG_PROTOCOL, "Peer %s: Command: C %s %s %s ...", c->peer_name, token, service, name);
if (c->state != ChannelStateConnected) {
trace(LOG_PROTOCOL, "Wrong channel state for commands");
skip_until_EOM(c);
write_stringz(&c->out, "N");
write_stringz(&c->out, token);
write_stream(&c->out, MARKER_EOM);
}
else if (set_trap(&trap)) {
MessageHandlerInfo * mh = find_message_handler(p, service, name);
if (mh != NULL) {
mh->handler(token, c, mh->client_data);
}
else if (p->default_handler != NULL) {
args[0] = type;
args[1] = token;
args[2] = service;
args[3] = name;
p->default_handler(c, args, 4, p->client_data);
}
else {
trace(LOG_PROTOCOL, "Command is not recognized: %s %s ...", service, name);
skip_until_EOM(c);
write_stringz(&c->out, "N");
write_stringz(&c->out, token);
write_stream(&c->out, MARKER_EOM);
}
clear_trap(&trap);
}
else {
trace(LOG_ALWAYS, "Exception handling command %s.%s: %d %s",
service, name, trap.error, errno_to_str(trap.error));
error = trap.error;
}
}
else if (type[0] == 'R' || type[0] == 'P' || type[0] == 'N') {
Trap trap;
read_stringz(&c->inp, token, sizeof(token));
trace(LOG_PROTOCOL, "Peer %s: Reply: %c %s ...", c->peer_name, type[0], token);
if (set_trap(&trap)) {
ReplyHandlerInfo * rh = NULL;
char * endptr = NULL;
unsigned long tokenid;
errno = 0;
tokenid = strtoul(token, &endptr, 10);
if (errno != 0 || *endptr != '\0' ||
(rh = find_reply_handler(c, tokenid, type[0] != 'P')) == NULL) {
if (p->default_handler != NULL) {
args[0] = type;
args[1] = token;
p->default_handler(c, args, 2, p->client_data);
}
else {
trace(LOG_ALWAYS, "Reply with unexpected token: %s", token);
exception(ERR_PROTOCOL);
}
}
else if (type[0] == 'P') {
if (rh->progress) {
rh->progress(c, rh->client_data);
}
else {
skip_until_EOM(c);
}
}
else {
int n = 0;
if (type[0] == 'N') {
skip_until_EOM(c);
n = ERR_INV_COMMAND;
}
rh->handler(c, rh->client_data, n);
loc_free(rh);
}
clear_trap(&trap);
}
else {
trace(LOG_ALWAYS, "Exception handling reply %s: %d %s",
token, trap.error, errno_to_str(trap.error));
error = trap.error;
}
}
else if (type[0] == 'E') {
Trap trap;
read_stringz(&c->inp, service, sizeof(service));
read_stringz(&c->inp, name, sizeof(name));
trace(LOG_PROTOCOL, "Peer %s: Event: E %s %s ...", c->peer_name, service, name);
if (set_trap(&trap)) {
if ((c->state == ChannelStateStarted || c->state == ChannelStateHelloSent) &&
strcmp(service, LOCATOR) == 0 && strcmp(name, "Hello") == 0) {
event_locator_hello(c);
}
else {
EventHandlerInfo * eh = find_event_handler(c, service, name);
if (eh != NULL) {
eh->handler(c, eh->client_data);
}
else if (p->default_handler != NULL) {
args[0] = type;
args[1] = service;
args[2] = name;
p->default_handler(c, args, 3, p->client_data);
}
else {
skip_until_EOM(c);
}
}
clear_trap(&trap);
}
else {
trace(LOG_ALWAYS, "Exception handling event %s.%s: %d %s",
service, name, trap.error, errno_to_str(trap.error));
error = trap.error;
}
}
else if (type[0] == 'F') {
int n = 0;
int s = 0;
int ch = read_stream(&c->inp);
if (ch == '-') {
s = 1;
ch = read_stream(&c->inp);
}
while (ch >= '0' && ch <= '9') {
n = n * 10 + (ch - '0');
ch = read_stream(&c->inp);
}
if (ch == 0) {
ch = read_stream(&c->inp);
}
else {
trace(LOG_ALWAYS, "Received F with no zero termination.");
}
if (ch != MARKER_EOM) error = ERR_PROTOCOL;
else c->congestion_level = s ? -n : n;
}
else if (p->default_handler != NULL) {
args[0] = type;
p->default_handler(c, args, 1, p->client_data);
}
else {
trace(LOG_ALWAYS, "Invalid TCF message: %s ...", type);
error = ERR_PROTOCOL;
}
if (error != 0) exception(error);
}
static void message_handler_old(Channel * c, char ** args, int nargs, void * client_data) {
ProtocolMessageHandler handler = (ProtocolMessageHandler)client_data;
handler(c, args, nargs);
}
void set_default_message_handler(Protocol * p, ProtocolMessageHandler handler) {
set_default_message_handler2(p, (ProtocolMessageHandler2)message_handler_old, (void *)handler);
}
void set_default_message_handler2(Protocol * p, ProtocolMessageHandler2 handler, void * client_data) {
p->default_handler = handler;
p->client_data = client_data;
}
static void command_handler_old(char * token, Channel * c, void * client_data) {
ProtocolCommandHandler handler = (ProtocolCommandHandler)client_data;
handler(token, c);
}
void add_command_handler(Protocol * p, const char * service, const char * name, ProtocolCommandHandler handler) {
add_command_handler2(p, service, name, (ProtocolCommandHandler2)command_handler_old, (void *)handler);
}
void add_command_handler2(Protocol * p, const char * service, const char * name, ProtocolCommandHandler2 handler, void * client_data) {
unsigned h = message_hash(p, service, name);
MessageHandlerInfo * mh = (MessageHandlerInfo *)loc_alloc(sizeof(MessageHandlerInfo));
mh->p = p;
mh->service = protocol_get_service(p, service);
mh->name = name;
mh->handler = handler;
mh->client_data = client_data;
mh->next = message_handlers[h];
message_handlers[h] = mh;
}
static void event_handler_old(Channel * c, void * client_data) {
ProtocolEventHandler handler = (ProtocolEventHandler)client_data;
handler(c);
}
void add_event_handler(Channel * c, const char * service, const char * name, ProtocolEventHandler handler) {
add_event_handler2(c, service, name, (ProtocolEventHandler2)event_handler_old, (void *)handler);
}
void add_event_handler2(Channel * c, const char * service, const char * name, ProtocolEventHandler2 handler, void * client_data) {
unsigned h = event_hash(c, service, name);
EventHandlerInfo * eh = (EventHandlerInfo *)loc_alloc(sizeof(EventHandlerInfo));
eh->c = c;
eh->service = protocol_get_service(c, service);
eh->name = name;
eh->handler = handler;
eh->client_data = client_data;
eh->next = event_handlers[h];
event_handlers[h] = eh;
}
static void send_command_failed(void * args) {
ReplyHandlerInfo * rh = (ReplyHandlerInfo *)args;
rh->handler(rh->c, rh->client_data, ERR_CHANNEL_CLOSED);
loc_free(rh);
}
ReplyHandlerInfo * protocol_send_command_with_progress(Channel * c, const char * service, const char * name, ReplyHandlerCB handler, ProgressHandlerCB progress, void * client_data) {
Protocol * p = c->protocol;
ReplyHandlerInfo * rh = (ReplyHandlerInfo *)loc_alloc(sizeof(ReplyHandlerInfo));
rh->c = c;
rh->handler = handler;
rh->progress = progress;
rh->client_data = client_data;
if (is_channel_closed(c) || c->peer_service_list == NULL) {
post_event(send_command_failed, rh);
}
else {
unsigned h;
unsigned long tokenid;
do tokenid = p->tokenid++;
while (find_reply_handler(c, tokenid, 0) != NULL);
write_stringz(&c->out, "C");
json_write_ulong(&c->out, tokenid);
write_stream(&c->out, 0);
write_stringz(&c->out, service);
write_stringz(&c->out, name);
rh->tokenid = tokenid;
h = reply_hash(c, tokenid);
rh->next = reply_handlers[h];
reply_handlers[h] = rh;
}
return rh;
}
ReplyHandlerInfo * protocol_send_command(Channel * c, const char * service, const char * name, ReplyHandlerCB handler, void * client_data) {
return protocol_send_command_with_progress(c, service, name, handler, NULL, client_data);
}
struct sendRedirectInfo {
ReplyHandlerCB handler;
void * client_data;
};
static void redirect_done(Channel * c, void * client_data, int error) {
struct sendRedirectInfo * info = (struct sendRedirectInfo *)client_data;
if (!error) {
assert(c->state == ChannelStateRedirectSent);
error = read_errno(&c->inp);
json_test_char(&c->inp, MARKER_EOM);
if (!error) {
c->state = ChannelStateHelloSent;
}
else {
c->state = ChannelStateConnected;
}
}
else if (c->state == ChannelStateRedirectSent) {
c->state = ChannelStateConnected;
}
else {
assert(c->state == ChannelStateDisconnected);
}
info->handler(c, info->client_data, error);
}
ReplyHandlerInfo * send_redirect_command_by_id(Channel * c, const char * peerId, ReplyHandlerCB handler, void * client_data) {
struct sendRedirectInfo * info = (struct sendRedirectInfo *)loc_alloc_zero(sizeof *info);
ReplyHandlerInfo * rh;
assert(c->state == ChannelStateConnected);
c->state = ChannelStateRedirectSent;
info->handler = handler;
info->client_data = client_data;
rh = protocol_send_command(c, LOCATOR, "redirect", redirect_done, info);
json_write_string(&c->out, peerId);
write_stream(&c->out, 0);
write_stream(&c->out, MARKER_EOM);
return rh;
}
ReplyHandlerInfo * send_redirect_command_by_props(Channel * c, const PeerServer * ps, ReplyHandlerCB handler, void * client_data) {
struct sendRedirectInfo * info = (struct sendRedirectInfo *)loc_alloc_zero(sizeof *info);
ReplyHandlerInfo * rh;
unsigned i;
assert(c->state == ChannelStateConnected);
c->state = ChannelStateRedirectSent;
info->handler = handler;
info->client_data = client_data;
rh = protocol_send_command(c, LOCATOR, "redirect", redirect_done, info);
write_stream(&c->out, '{');
for (i = 0; i < ps->ind; i++) {
if (i > 0) {
write_stream(&c->out, ',');
}
json_write_string(&c->out, ps->list[i].name);
write_stream(&c->out, ':');
json_write_string(&c->out, ps->list[i].value);
}
write_stream(&c->out, '}');
write_stream(&c->out, 0);
write_stream(&c->out, MARKER_EOM);
return rh;
}
static void connect_done(Channel * c) {
assert(c->state == ChannelStateConnected);
notify_channel_opened(c);
if (c->connected) {
c->connected(c);
}
else {
int i;
trace(LOG_PROTOCOL, "channel server connected, remote services:");
for (i = 0; i < c->peer_service_cnt; i++) {
trace(LOG_PROTOCOL, " %s", c->peer_service_list[i]);
}
}
}
void send_hello_message(Channel * c) {
Protocol * p = c->protocol;
ServiceInfo * s = services;
int cnt = 0;
assert(c->state == ChannelStateStarted || c->state == ChannelStateHelloReceived);
write_stringz(&c->out, "E");
write_stringz(&c->out, LOCATOR);
write_stringz(&c->out, "Hello");
write_stream(&c->out, '[');
#if ENABLE_ZeroCopy
if (!c->disable_zero_copy) {
json_write_string(&c->out, "ZeroCopy");
cnt++;
}
#endif
while (s) {
if (s->owner == p) {
if (cnt != 0) write_stream(&c->out, ',');
json_write_string(&c->out, s->name);
cnt++;
}
s = s->next;
}
write_stream(&c->out, ']');
write_stream(&c->out, 0);
write_stream(&c->out, MARKER_EOM);
if (c->state == ChannelStateStarted) {
c->state = ChannelStateHelloSent;
}
else {
c->state = ChannelStateConnected;
connect_done(c);
}
}
static void free_string_list(int cnt, char **list) {
while (cnt > 0) loc_free(list[--cnt]);
loc_free(list);
}
static void event_locator_hello(Channel * c) {
int ch;
int cnt = 0;
char **list = NULL;
c->out.supports_zero_copy = 0;
do ch = read_stream(&c->inp);
while (ch > 0 && isspace(ch));
if (ch != '[') exception(ERR_PROTOCOL);
if (peek_stream(&c->inp) == ']') {
read_stream(&c->inp);
}
else {
int max = 4;
list = (char **)loc_alloc(max * sizeof *list);
for (;;) {
char * service = json_read_alloc_string(&c->inp);
if (strcmp(service, "ZeroCopy") == 0) c->out.supports_zero_copy = 1;
if (cnt == max) {
max *= 2;
list = (char **)loc_realloc(list, max * sizeof *list);
}
list[cnt++] = service;
do ch = read_stream(&c->inp);
while (ch > 0 && isspace(ch));
if (ch == ',') continue;
if (ch == ']') break;
free_string_list(cnt, list);
exception(ERR_JSON_SYNTAX);
}
}
if (read_stream(&c->inp) != 0 || read_stream(&c->inp) != MARKER_EOM) {
free_string_list(cnt, list);
exception(ERR_JSON_SYNTAX);
}
if (c->state != ChannelStateStarted && c->state != ChannelStateHelloSent) {
free_string_list(cnt, list);
/* TODO: should this be a protocol error? */
return;
}
if (c->peer_service_list != NULL) {
free_string_list(c->peer_service_cnt, c->peer_service_list);
}
c->peer_service_cnt = cnt;
c->peer_service_list = list;
if (c->state == ChannelStateStarted) {
c->state = ChannelStateHelloReceived;
}
else {
c->state = ChannelStateConnected;
connect_done(c);
}
}
int protocol_cancel_command(ReplyHandlerInfo * rh) {
/* TODO: protocol_cancel_command() */
return 0;
}
static void channel_closed(Channel * c) {
unsigned i;
assert(is_dispatch_thread());
for (i = 0; i < EVENT_HASH_SIZE; i++) {
EventHandlerInfo ** ehp = &event_handlers[i];
EventHandlerInfo * eh;
while ((eh = *ehp) != NULL) {
if (eh->c == c) {
*ehp = eh->next;
loc_free(eh);
}
else {
ehp = &eh->next;
}
}
}
free_services(c);
for (i = 0; i < REPLY_HASH_SIZE; i++) {
ReplyHandlerInfo ** rhp = &reply_handlers[i];
ReplyHandlerInfo * rh;
while ((rh = *rhp) != NULL) {
if (rh->c == c) {
Trap trap;
*rhp = rh->next;
if (set_trap(&trap)) {
rh->handler(c, rh->client_data, ERR_CHANNEL_CLOSED);
clear_trap(&trap);
}
else {
trace(LOG_ALWAYS, "Exception handling reply %ul: %d %s",
rh->tokenid, trap.error, errno_to_str(trap.error));
}
loc_free(rh);
}
else {
rhp = &rh->next;
}
}
}
if (c->peer_service_list) {
free_string_list(c->peer_service_cnt, c->peer_service_list);
c->peer_service_cnt = 0;
c->peer_service_list = NULL;
}
}
static void ini_protocol(void) {
assert(!ini_done);
agent_id = loc_strdup(create_uuid());
add_channel_close_listener(channel_closed);
ini_done = 1;
}
Protocol * protocol_alloc(void) {
Protocol * p = (Protocol *)loc_alloc_zero(sizeof *p);
assert(is_dispatch_thread());
if (!ini_done) ini_protocol();
p->id = proto_cnt++;
p->lock_cnt = 1;
p->tokenid = 1;
return p;
}
void protocol_reference(Protocol * p) {
assert(is_dispatch_thread());
assert(p->lock_cnt > 0);
p->lock_cnt++;
}
void protocol_release(Protocol * p) {
MessageHandlerInfo ** mhp;
MessageHandlerInfo * mh;
int i;
assert(is_dispatch_thread());
assert(p->lock_cnt > 0);
if (--p->lock_cnt != 0) return;
for (i = 0; i < MESSAGE_HASH_SIZE; i++) {
mhp = &message_handlers[i];
while ((mh = *mhp) != NULL) {
if (mh->p == p) {
*mhp = mh->next;
loc_free(mh);
}
else {
mhp = &mh->next;
}
}
}
free_services(p);
loc_free(p);
}
const char * get_agent_id(void) {
if (!ini_done) ini_protocol();
return agent_id;
}
const char * get_service_manager_id(Protocol * p) {
static char buf[256];
snprintf(buf, sizeof(buf), "%s-%d", get_agent_id(), p->id);
return buf;
}