| /******************************************************************************* |
| * Copyright (c) 2007, 2013 Wind River Systems, Inc. and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * You may elect to redistribute this code under either of these licenses. |
| * |
| * Contributors: |
| * Wind River Systems - initial API and implementation |
| *******************************************************************************/ |
| |
| /* |
| * TCF communication protocol. |
| * This module handles registration of command and event handlers. |
| * It is called when new messages are received and will dispatch |
| * messages to the appropriate handler. It has no knowledge of what transport |
| * protocol is used and what services do. |
| */ |
| |
| #include <tcf/config.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <assert.h> |
| #include <assert.h> |
| #include <ctype.h> |
| #include <tcf/framework/protocol.h> |
| #include <tcf/framework/trace.h> |
| #include <tcf/framework/events.h> |
| #include <tcf/framework/events.h> |
| #include <tcf/framework/exceptions.h> |
| #include <tcf/framework/json.h> |
| #include <tcf/framework/myalloc.h> |
| |
| static const char * LOCATOR = "Locator"; |
| |
| struct ServiceInfo { |
| void * owner; |
| char * name; |
| struct ServiceInfo * next; |
| }; |
| |
| struct MessageHandlerInfo { |
| Protocol * p; |
| ServiceInfo * service; |
| const char * name; |
| ProtocolCommandHandler2 handler; |
| void * client_data; |
| struct MessageHandlerInfo * next; |
| }; |
| |
| typedef struct MessageHandlerInfo MessageHandlerInfo; |
| |
| struct EventHandlerInfo { |
| Channel * c; |
| ServiceInfo * service; |
| const char * name; |
| ProtocolEventHandler2 handler; |
| void * client_data; |
| struct EventHandlerInfo * next; |
| }; |
| |
| typedef struct EventHandlerInfo EventHandlerInfo; |
| |
| struct ReplyHandlerInfo { |
| unsigned long tokenid; |
| Channel * c; |
| ReplyHandlerCB handler; |
| ProgressHandlerCB progress; |
| void * client_data; |
| struct ReplyHandlerInfo * next; |
| }; |
| |
| #define MESSAGE_HASH_SIZE (5 * MEM_USAGE_FACTOR - 1) |
| #define EVENT_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1) |
| #define REPLY_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1) |
| |
| static MessageHandlerInfo * message_handlers[MESSAGE_HASH_SIZE]; |
| static EventHandlerInfo * event_handlers[EVENT_HASH_SIZE]; |
| static ReplyHandlerInfo * reply_handlers[REPLY_HASH_SIZE]; |
| static ServiceInfo * services; |
| static int ini_done = 0; |
| static int proto_cnt = 0; |
| static char * agent_id = NULL; |
| |
| struct Protocol { |
| int id; |
| int lock_cnt; /* Lock count, cannot delete when > 0 */ |
| unsigned long tokenid; |
| ProtocolMessageHandler2 default_handler; |
| void * client_data; |
| }; |
| |
| static void read_stringz(InputStream * inp, char * str, size_t size) { |
| unsigned len = 0; |
| for (;;) { |
| int ch = read_stream(inp); |
| if (ch <= 0) { |
| if (ch == 0) break; |
| trace(LOG_ALWAYS, "Unexpected end of message"); |
| exception(ERR_PROTOCOL); |
| } |
| if (len < size - 1) str[len++] = (char)ch; |
| } |
| str[len] = 0; |
| } |
| |
| ServiceInfo * protocol_get_service(void * owner, const char * name) { |
| ServiceInfo * s = services; |
| |
| while (s != NULL && (s->owner != owner || strcmp(s->name, name) != 0)) s = s->next; |
| if (s == NULL) { |
| assert(strcmp(name, "ZeroCopy") != 0); |
| s = (ServiceInfo *)loc_alloc(sizeof(ServiceInfo)); |
| s->owner = owner; |
| s->name = loc_strdup(name); |
| s->next = services; |
| services = s; |
| } |
| return s; |
| } |
| |
| static void free_services(void * owner) { |
| ServiceInfo ** sp = &services; |
| ServiceInfo * s; |
| |
| while ((s = *sp) != NULL) { |
| if (s->owner == owner) { |
| *sp = s->next; |
| loc_free(s->name); |
| loc_free(s); |
| } |
| else { |
| sp = &s->next; |
| } |
| } |
| } |
| |
| static unsigned message_hash(Protocol * p, const char * service, const char * name) { |
| int i; |
| unsigned h = (unsigned)(uintptr_t)p >> 4; |
| for (i = 0; service[i]; i++) h += service[i]; |
| for (i = 0; name[i]; i++) h += name[i]; |
| h = h + h / MESSAGE_HASH_SIZE; |
| return h % MESSAGE_HASH_SIZE; |
| } |
| |
| static MessageHandlerInfo * find_message_handler(Protocol * p, char * service, char * name) { |
| MessageHandlerInfo * mh = message_handlers[message_hash(p, service, name)]; |
| while (mh != NULL) { |
| if (mh->p == p && !strcmp(mh->service->name, service) && !strcmp(mh->name, name)) return mh; |
| mh = mh->next; |
| } |
| return NULL; |
| } |
| |
| static unsigned event_hash(Channel * c, const char * service, const char * name) { |
| int i; |
| unsigned h = (unsigned)(uintptr_t)c >> 4; |
| for (i = 0; service[i]; i++) h += service[i]; |
| for (i = 0; name[i]; i++) h += name[i]; |
| h = h + h / EVENT_HASH_SIZE; |
| return h % EVENT_HASH_SIZE; |
| } |
| |
| static EventHandlerInfo * find_event_handler(Channel * c, char * service, char * name) { |
| EventHandlerInfo * mh = event_handlers[event_hash(c, service, name)]; |
| while (mh != NULL) { |
| if (mh->c == c && !strcmp(mh->service->name, service) && !strcmp(mh->name, name)) return mh; |
| mh = mh->next; |
| } |
| return NULL; |
| } |
| |
| #define reply_hash(c, tokenid) ((((unsigned)(uintptr_t)(c) >> 4) + (unsigned)(tokenid)) % REPLY_HASH_SIZE) |
| |
| static ReplyHandlerInfo * find_reply_handler(Channel * c, unsigned long tokenid, int take) { |
| ReplyHandlerInfo ** rhp = &reply_handlers[reply_hash(c, tokenid)]; |
| ReplyHandlerInfo * rh; |
| while ((rh = *rhp) != NULL) { |
| if (rh->c == c && rh->tokenid == tokenid) { |
| if (take) *rhp = rh->next; |
| return rh; |
| } |
| rhp = &rh->next; |
| } |
| return NULL; |
| } |
| |
| static void skip_until_EOM(Channel * c) { |
| for (;;) { |
| int ch = read_stream(&c->inp); |
| if (ch == MARKER_EOM) return; |
| if (ch == MARKER_EOS) return; |
| } |
| } |
| |
| static void event_locator_hello(Channel * c); |
| |
| void handle_protocol_message(Channel * c) { |
| char type[8]; |
| char token[256]; |
| char service[256]; |
| char name[256]; |
| char * args[4]; |
| int error = 0; |
| Protocol * p = c->protocol; |
| |
| assert(is_dispatch_thread()); |
| |
| read_stringz(&c->inp, type, sizeof(type)); |
| if (strlen(type) != 1) { |
| trace(LOG_ALWAYS, "Invalid TCF message: %s ...", type); |
| error = ERR_PROTOCOL; |
| } |
| else if (type[0] == 'C') { |
| Trap trap; |
| read_stringz(&c->inp, token, sizeof(token)); |
| read_stringz(&c->inp, service, sizeof(service)); |
| read_stringz(&c->inp, name, sizeof(name)); |
| trace(LOG_PROTOCOL, "Peer %s: Command: C %s %s %s ...", c->peer_name, token, service, name); |
| if (c->state != ChannelStateConnected) { |
| trace(LOG_PROTOCOL, "Wrong channel state for commands"); |
| skip_until_EOM(c); |
| write_stringz(&c->out, "N"); |
| write_stringz(&c->out, token); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| else if (set_trap(&trap)) { |
| MessageHandlerInfo * mh = find_message_handler(p, service, name); |
| if (mh != NULL) { |
| mh->handler(token, c, mh->client_data); |
| } |
| else if (p->default_handler != NULL) { |
| args[0] = type; |
| args[1] = token; |
| args[2] = service; |
| args[3] = name; |
| p->default_handler(c, args, 4, p->client_data); |
| } |
| else { |
| trace(LOG_PROTOCOL, "Command is not recognized: %s %s ...", service, name); |
| skip_until_EOM(c); |
| write_stringz(&c->out, "N"); |
| write_stringz(&c->out, token); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception handling command %s.%s: %d %s", |
| service, name, trap.error, errno_to_str(trap.error)); |
| error = trap.error; |
| } |
| } |
| else if (type[0] == 'R' || type[0] == 'P' || type[0] == 'N') { |
| Trap trap; |
| read_stringz(&c->inp, token, sizeof(token)); |
| trace(LOG_PROTOCOL, "Peer %s: Reply: %c %s ...", c->peer_name, type[0], token); |
| if (set_trap(&trap)) { |
| ReplyHandlerInfo * rh = NULL; |
| char * endptr = NULL; |
| unsigned long tokenid; |
| errno = 0; |
| tokenid = strtoul(token, &endptr, 10); |
| if (errno != 0 || *endptr != '\0' || |
| (rh = find_reply_handler(c, tokenid, type[0] != 'P')) == NULL) { |
| if (p->default_handler != NULL) { |
| args[0] = type; |
| args[1] = token; |
| p->default_handler(c, args, 2, p->client_data); |
| } |
| else { |
| trace(LOG_ALWAYS, "Reply with unexpected token: %s", token); |
| exception(ERR_PROTOCOL); |
| } |
| } |
| else if (type[0] == 'P') { |
| if (rh->progress) { |
| rh->progress(c, rh->client_data); |
| } |
| else { |
| skip_until_EOM(c); |
| } |
| } |
| else { |
| int n = 0; |
| if (type[0] == 'N') { |
| skip_until_EOM(c); |
| n = ERR_INV_COMMAND; |
| } |
| rh->handler(c, rh->client_data, n); |
| loc_free(rh); |
| } |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception handling reply %s: %d %s", |
| token, trap.error, errno_to_str(trap.error)); |
| error = trap.error; |
| } |
| } |
| else if (type[0] == 'E') { |
| Trap trap; |
| read_stringz(&c->inp, service, sizeof(service)); |
| read_stringz(&c->inp, name, sizeof(name)); |
| trace(LOG_PROTOCOL, "Peer %s: Event: E %s %s ...", c->peer_name, service, name); |
| if (set_trap(&trap)) { |
| if ((c->state == ChannelStateStarted || c->state == ChannelStateHelloSent) && |
| strcmp(service, LOCATOR) == 0 && strcmp(name, "Hello") == 0) { |
| event_locator_hello(c); |
| } |
| else { |
| EventHandlerInfo * eh = find_event_handler(c, service, name); |
| if (eh != NULL) { |
| eh->handler(c, eh->client_data); |
| } |
| else if (p->default_handler != NULL) { |
| args[0] = type; |
| args[1] = service; |
| args[2] = name; |
| p->default_handler(c, args, 3, p->client_data); |
| } |
| else { |
| skip_until_EOM(c); |
| } |
| } |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception handling event %s.%s: %d %s", |
| service, name, trap.error, errno_to_str(trap.error)); |
| error = trap.error; |
| } |
| } |
| else if (type[0] == 'F') { |
| int n = 0; |
| int s = 0; |
| int ch = read_stream(&c->inp); |
| if (ch == '-') { |
| s = 1; |
| ch = read_stream(&c->inp); |
| } |
| while (ch >= '0' && ch <= '9') { |
| n = n * 10 + (ch - '0'); |
| ch = read_stream(&c->inp); |
| } |
| if (ch == 0) { |
| ch = read_stream(&c->inp); |
| } |
| else { |
| trace(LOG_ALWAYS, "Received F with no zero termination."); |
| } |
| if (ch != MARKER_EOM) error = ERR_PROTOCOL; |
| else c->congestion_level = s ? -n : n; |
| } |
| else if (p->default_handler != NULL) { |
| args[0] = type; |
| p->default_handler(c, args, 1, p->client_data); |
| } |
| else { |
| trace(LOG_ALWAYS, "Invalid TCF message: %s ...", type); |
| error = ERR_PROTOCOL; |
| } |
| if (error != 0) exception(error); |
| } |
| |
| static void message_handler_old(Channel * c, char ** args, int nargs, void * client_data) { |
| ProtocolMessageHandler handler = (ProtocolMessageHandler)client_data; |
| handler(c, args, nargs); |
| } |
| |
| void set_default_message_handler(Protocol * p, ProtocolMessageHandler handler) { |
| set_default_message_handler2(p, (ProtocolMessageHandler2)message_handler_old, (void *)handler); |
| } |
| |
| void set_default_message_handler2(Protocol * p, ProtocolMessageHandler2 handler, void * client_data) { |
| p->default_handler = handler; |
| p->client_data = client_data; |
| } |
| |
| static void command_handler_old(char * token, Channel * c, void * client_data) { |
| ProtocolCommandHandler handler = (ProtocolCommandHandler)client_data; |
| handler(token, c); |
| } |
| |
| void add_command_handler(Protocol * p, const char * service, const char * name, ProtocolCommandHandler handler) { |
| add_command_handler2(p, service, name, (ProtocolCommandHandler2)command_handler_old, (void *)handler); |
| } |
| |
| void add_command_handler2(Protocol * p, const char * service, const char * name, ProtocolCommandHandler2 handler, void * client_data) { |
| unsigned h = message_hash(p, service, name); |
| MessageHandlerInfo * mh = (MessageHandlerInfo *)loc_alloc(sizeof(MessageHandlerInfo)); |
| mh->p = p; |
| mh->service = protocol_get_service(p, service); |
| mh->name = name; |
| mh->handler = handler; |
| mh->client_data = client_data; |
| mh->next = message_handlers[h]; |
| message_handlers[h] = mh; |
| } |
| |
| static void event_handler_old(Channel * c, void * client_data) { |
| ProtocolEventHandler handler = (ProtocolEventHandler)client_data; |
| handler(c); |
| } |
| |
| void add_event_handler(Channel * c, const char * service, const char * name, ProtocolEventHandler handler) { |
| add_event_handler2(c, service, name, (ProtocolEventHandler2)event_handler_old, (void *)handler); |
| } |
| |
| void add_event_handler2(Channel * c, const char * service, const char * name, ProtocolEventHandler2 handler, void * client_data) { |
| unsigned h = event_hash(c, service, name); |
| EventHandlerInfo * eh = (EventHandlerInfo *)loc_alloc(sizeof(EventHandlerInfo)); |
| eh->c = c; |
| eh->service = protocol_get_service(c, service); |
| eh->name = name; |
| eh->handler = handler; |
| eh->client_data = client_data; |
| eh->next = event_handlers[h]; |
| event_handlers[h] = eh; |
| } |
| |
| static void send_command_failed(void * args) { |
| ReplyHandlerInfo * rh = (ReplyHandlerInfo *)args; |
| rh->handler(rh->c, rh->client_data, ERR_CHANNEL_CLOSED); |
| loc_free(rh); |
| } |
| |
| ReplyHandlerInfo * protocol_send_command_with_progress(Channel * c, const char * service, const char * name, ReplyHandlerCB handler, ProgressHandlerCB progress, void * client_data) { |
| Protocol * p = c->protocol; |
| ReplyHandlerInfo * rh = (ReplyHandlerInfo *)loc_alloc(sizeof(ReplyHandlerInfo)); |
| |
| rh->c = c; |
| rh->handler = handler; |
| rh->progress = progress; |
| rh->client_data = client_data; |
| if (is_channel_closed(c) || c->peer_service_list == NULL) { |
| post_event(send_command_failed, rh); |
| } |
| else { |
| unsigned h; |
| unsigned long tokenid; |
| do tokenid = p->tokenid++; |
| while (find_reply_handler(c, tokenid, 0) != NULL); |
| write_stringz(&c->out, "C"); |
| json_write_ulong(&c->out, tokenid); |
| write_stream(&c->out, 0); |
| write_stringz(&c->out, service); |
| write_stringz(&c->out, name); |
| rh->tokenid = tokenid; |
| h = reply_hash(c, tokenid); |
| rh->next = reply_handlers[h]; |
| reply_handlers[h] = rh; |
| } |
| return rh; |
| } |
| |
| ReplyHandlerInfo * protocol_send_command(Channel * c, const char * service, const char * name, ReplyHandlerCB handler, void * client_data) { |
| return protocol_send_command_with_progress(c, service, name, handler, NULL, client_data); |
| } |
| |
| struct sendRedirectInfo { |
| ReplyHandlerCB handler; |
| void * client_data; |
| }; |
| |
| static void redirect_done(Channel * c, void * client_data, int error) { |
| struct sendRedirectInfo * info = (struct sendRedirectInfo *)client_data; |
| |
| if (!error) { |
| assert(c->state == ChannelStateRedirectSent); |
| error = read_errno(&c->inp); |
| json_test_char(&c->inp, MARKER_EOM); |
| if (!error) { |
| c->state = ChannelStateHelloSent; |
| } |
| else { |
| c->state = ChannelStateConnected; |
| } |
| } |
| else if (c->state == ChannelStateRedirectSent) { |
| c->state = ChannelStateConnected; |
| } |
| else { |
| assert(c->state == ChannelStateDisconnected); |
| } |
| info->handler(c, info->client_data, error); |
| } |
| |
| ReplyHandlerInfo * send_redirect_command_by_id(Channel * c, const char * peerId, ReplyHandlerCB handler, void * client_data) { |
| struct sendRedirectInfo * info = (struct sendRedirectInfo *)loc_alloc_zero(sizeof *info); |
| ReplyHandlerInfo * rh; |
| |
| assert(c->state == ChannelStateConnected); |
| c->state = ChannelStateRedirectSent; |
| info->handler = handler; |
| info->client_data = client_data; |
| rh = protocol_send_command(c, LOCATOR, "redirect", redirect_done, info); |
| json_write_string(&c->out, peerId); |
| write_stream(&c->out, 0); |
| write_stream(&c->out, MARKER_EOM); |
| return rh; |
| } |
| |
| ReplyHandlerInfo * send_redirect_command_by_props(Channel * c, const PeerServer * ps, ReplyHandlerCB handler, void * client_data) { |
| struct sendRedirectInfo * info = (struct sendRedirectInfo *)loc_alloc_zero(sizeof *info); |
| ReplyHandlerInfo * rh; |
| unsigned i; |
| |
| assert(c->state == ChannelStateConnected); |
| c->state = ChannelStateRedirectSent; |
| info->handler = handler; |
| info->client_data = client_data; |
| rh = protocol_send_command(c, LOCATOR, "redirect", redirect_done, info); |
| write_stream(&c->out, '{'); |
| for (i = 0; i < ps->ind; i++) { |
| if (i > 0) { |
| write_stream(&c->out, ','); |
| } |
| json_write_string(&c->out, ps->list[i].name); |
| write_stream(&c->out, ':'); |
| json_write_string(&c->out, ps->list[i].value); |
| } |
| write_stream(&c->out, '}'); |
| write_stream(&c->out, 0); |
| write_stream(&c->out, MARKER_EOM); |
| return rh; |
| } |
| |
| static void connect_done(Channel * c) { |
| assert(c->state == ChannelStateConnected); |
| notify_channel_opened(c); |
| if (c->connected) { |
| c->connected(c); |
| } |
| else { |
| int i; |
| trace(LOG_PROTOCOL, "channel server connected, remote services:"); |
| for (i = 0; i < c->peer_service_cnt; i++) { |
| trace(LOG_PROTOCOL, " %s", c->peer_service_list[i]); |
| } |
| } |
| } |
| |
| void send_hello_message(Channel * c) { |
| Protocol * p = c->protocol; |
| ServiceInfo * s = services; |
| int cnt = 0; |
| |
| assert(c->state == ChannelStateStarted || c->state == ChannelStateHelloReceived); |
| write_stringz(&c->out, "E"); |
| write_stringz(&c->out, LOCATOR); |
| write_stringz(&c->out, "Hello"); |
| write_stream(&c->out, '['); |
| #if ENABLE_ZeroCopy |
| if (!c->disable_zero_copy) { |
| json_write_string(&c->out, "ZeroCopy"); |
| cnt++; |
| } |
| #endif |
| while (s) { |
| if (s->owner == p) { |
| if (cnt != 0) write_stream(&c->out, ','); |
| json_write_string(&c->out, s->name); |
| cnt++; |
| } |
| s = s->next; |
| } |
| write_stream(&c->out, ']'); |
| write_stream(&c->out, 0); |
| write_stream(&c->out, MARKER_EOM); |
| if (c->state == ChannelStateStarted) { |
| c->state = ChannelStateHelloSent; |
| } |
| else { |
| c->state = ChannelStateConnected; |
| connect_done(c); |
| } |
| } |
| |
| static void free_string_list(int cnt, char **list) { |
| while (cnt > 0) loc_free(list[--cnt]); |
| loc_free(list); |
| } |
| |
| static void event_locator_hello(Channel * c) { |
| int ch; |
| int cnt = 0; |
| char **list = NULL; |
| |
| c->out.supports_zero_copy = 0; |
| do ch = read_stream(&c->inp); |
| while (ch > 0 && isspace(ch)); |
| if (ch != '[') exception(ERR_PROTOCOL); |
| if (peek_stream(&c->inp) == ']') { |
| read_stream(&c->inp); |
| } |
| else { |
| int max = 4; |
| list = (char **)loc_alloc(max * sizeof *list); |
| for (;;) { |
| char * service = json_read_alloc_string(&c->inp); |
| if (strcmp(service, "ZeroCopy") == 0) c->out.supports_zero_copy = 1; |
| if (cnt == max) { |
| max *= 2; |
| list = (char **)loc_realloc(list, max * sizeof *list); |
| } |
| list[cnt++] = service; |
| do ch = read_stream(&c->inp); |
| while (ch > 0 && isspace(ch)); |
| if (ch == ',') continue; |
| if (ch == ']') break; |
| free_string_list(cnt, list); |
| exception(ERR_JSON_SYNTAX); |
| } |
| } |
| if (read_stream(&c->inp) != 0 || read_stream(&c->inp) != MARKER_EOM) { |
| free_string_list(cnt, list); |
| exception(ERR_JSON_SYNTAX); |
| } |
| if (c->state != ChannelStateStarted && c->state != ChannelStateHelloSent) { |
| free_string_list(cnt, list); |
| /* TODO: should this be a protocol error? */ |
| return; |
| } |
| if (c->peer_service_list != NULL) { |
| free_string_list(c->peer_service_cnt, c->peer_service_list); |
| } |
| c->peer_service_cnt = cnt; |
| c->peer_service_list = list; |
| if (c->state == ChannelStateStarted) { |
| c->state = ChannelStateHelloReceived; |
| } |
| else { |
| c->state = ChannelStateConnected; |
| connect_done(c); |
| } |
| } |
| |
| int protocol_cancel_command(ReplyHandlerInfo * rh) { |
| /* TODO: protocol_cancel_command() */ |
| return 0; |
| } |
| |
| static void channel_closed(Channel * c) { |
| unsigned i; |
| |
| assert(is_dispatch_thread()); |
| for (i = 0; i < EVENT_HASH_SIZE; i++) { |
| EventHandlerInfo ** ehp = &event_handlers[i]; |
| EventHandlerInfo * eh; |
| |
| while ((eh = *ehp) != NULL) { |
| if (eh->c == c) { |
| *ehp = eh->next; |
| loc_free(eh); |
| } |
| else { |
| ehp = &eh->next; |
| } |
| } |
| } |
| free_services(c); |
| |
| for (i = 0; i < REPLY_HASH_SIZE; i++) { |
| ReplyHandlerInfo ** rhp = &reply_handlers[i]; |
| ReplyHandlerInfo * rh; |
| while ((rh = *rhp) != NULL) { |
| if (rh->c == c) { |
| Trap trap; |
| *rhp = rh->next; |
| if (set_trap(&trap)) { |
| rh->handler(c, rh->client_data, ERR_CHANNEL_CLOSED); |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception handling reply %ul: %d %s", |
| rh->tokenid, trap.error, errno_to_str(trap.error)); |
| } |
| loc_free(rh); |
| } |
| else { |
| rhp = &rh->next; |
| } |
| } |
| } |
| if (c->peer_service_list) { |
| free_string_list(c->peer_service_cnt, c->peer_service_list); |
| c->peer_service_cnt = 0; |
| c->peer_service_list = NULL; |
| } |
| } |
| |
| static void ini_protocol(void) { |
| assert(!ini_done); |
| agent_id = loc_strdup(create_uuid()); |
| add_channel_close_listener(channel_closed); |
| ini_done = 1; |
| } |
| |
| Protocol * protocol_alloc(void) { |
| Protocol * p = (Protocol *)loc_alloc_zero(sizeof *p); |
| |
| assert(is_dispatch_thread()); |
| if (!ini_done) ini_protocol(); |
| p->id = proto_cnt++; |
| p->lock_cnt = 1; |
| p->tokenid = 1; |
| return p; |
| } |
| |
| void protocol_reference(Protocol * p) { |
| assert(is_dispatch_thread()); |
| assert(p->lock_cnt > 0); |
| p->lock_cnt++; |
| } |
| |
| void protocol_release(Protocol * p) { |
| MessageHandlerInfo ** mhp; |
| MessageHandlerInfo * mh; |
| int i; |
| |
| assert(is_dispatch_thread()); |
| assert(p->lock_cnt > 0); |
| if (--p->lock_cnt != 0) return; |
| for (i = 0; i < MESSAGE_HASH_SIZE; i++) { |
| mhp = &message_handlers[i]; |
| while ((mh = *mhp) != NULL) { |
| if (mh->p == p) { |
| *mhp = mh->next; |
| loc_free(mh); |
| } |
| else { |
| mhp = &mh->next; |
| } |
| } |
| } |
| free_services(p); |
| loc_free(p); |
| } |
| |
| const char * get_agent_id(void) { |
| if (!ini_done) ini_protocol(); |
| return agent_id; |
| } |
| |
| const char * get_service_manager_id(Protocol * p) { |
| static char buf[256]; |
| snprintf(buf, sizeof(buf), "%s-%d", get_agent_id(), p->id); |
| return buf; |
| } |