| /******************************************************************************* |
| * Copyright (c) 2007, 2014 Wind River Systems, Inc. and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * You may elect to redistribute this code under either of these licenses. |
| * |
| * Contributors: |
| * Wind River Systems - initial API and implementation |
| *******************************************************************************/ |
| |
| /* |
| * This module implements tunneling of TCF messages to another target on behalf of a client |
| * This service intended to be used when a client has no direct access to a target. |
| */ |
| |
| #include <tcf/config.h> |
| #include <assert.h> |
| #include <string.h> |
| #include <tcf/framework/json.h> |
| #include <tcf/framework/proxy.h> |
| #include <tcf/framework/protocol.h> |
| #include <tcf/framework/trace.h> |
| #include <tcf/framework/errors.h> |
| #include <tcf/framework/exceptions.h> |
| #include <tcf/framework/myalloc.h> |
| |
| typedef struct Proxy { |
| Channel * c; |
| Protocol * proto; |
| int other; |
| int instance; |
| } Proxy; |
| |
| typedef struct RedirectInfo { |
| Channel * host; |
| char token[256]; |
| } RedirectInfo; |
| |
| static ChannelRedirectionListener redirection_listeners[16]; |
| static int redirection_listeners_cnt = 0; |
| |
| static ProxyLogFilterListener proxy_log_filter_listener; |
| static ProxyLogFilterListener2 proxy_log_filter_listener2; |
| |
| static const char * channel_lock_msg = "Proxy lock"; |
| |
| static void proxy_update(Channel * c1, Channel * c2); |
| |
| static void proxy_connecting(Channel * c) { |
| int i; |
| Proxy * target = (Proxy *)c->client_data; |
| Proxy * host = target + target->other; |
| |
| assert(c == target->c); |
| assert(target->other == -1); |
| assert(c->state == ChannelStateStarted); |
| assert(host->c->state == ChannelStateHelloReceived); |
| |
| for (i = 0; i < redirection_listeners_cnt; i++) { |
| redirection_listeners[i](host->c, target->c); |
| } |
| |
| target->c->disable_zero_copy = !host->c->out.supports_zero_copy; |
| send_hello_message(target->c); |
| |
| trace(LOG_PROXY, "Proxy waiting Hello from target"); |
| } |
| |
| static void command_redirect_done(Channel * c, void * client_data, int error) { |
| RedirectInfo * info = (RedirectInfo *)client_data; |
| |
| if (!is_channel_closed(info->host)) { |
| int err = error; |
| |
| if (err == 0) proxy_update (info->host, c); |
| |
| write_stringz(&info->host->out, "R"); |
| write_stringz(&info->host->out, info->token); |
| write_errno(&info->host->out, err); |
| write_stream(&info->host->out, MARKER_EOM); |
| |
| #if ENABLE_Trace |
| if (log_mode & LOG_TCFLOG) { |
| Proxy * proxy = (Proxy *)info->host->client_data; |
| trace(LOG_TCFLOG, "%d: R %s %s", proxy->instance, info->token, errno_to_str(err)); |
| } |
| #endif |
| } |
| |
| channel_unlock_with_msg(info->host, channel_lock_msg); |
| loc_free(info); |
| } |
| |
| static void read_peer_attr(InputStream * inp, const char * name, void * x) { |
| peer_server_addprop((PeerServer *)x, loc_strdup(name), json_read_alloc_string(inp)); |
| } |
| |
| static void command_locator_redirect(char * token, Channel * c, void * args) { |
| char id[256]; |
| PeerServer * ps = NULL; |
| Channel * target = (Channel *)args; |
| RedirectInfo * info = (RedirectInfo *)loc_alloc_zero(sizeof(RedirectInfo)); |
| |
| if (peek_stream(&c->inp) == '{') { |
| ps = peer_server_alloc(); |
| json_read_struct(&c->inp, read_peer_attr, ps); |
| } |
| else { |
| json_read_string(&c->inp, id, sizeof(id)); |
| } |
| |
| json_test_char(&c->inp, MARKER_EOA); |
| json_test_char(&c->inp, MARKER_EOM); |
| |
| #if ENABLE_Trace |
| if (log_mode & LOG_TCFLOG) { |
| Proxy * proxy = (Proxy *)c->client_data; |
| if (ps != NULL) { |
| char * server_properties = channel_peer_to_json(ps); |
| trace(LOG_TCFLOG, "%d: C %s Locator redirect %s", proxy->instance, token, server_properties); |
| loc_free(server_properties); |
| } else { |
| trace(LOG_TCFLOG, "%d: C %s Locator redirect %s", proxy->instance, token, id); |
| } |
| } |
| #endif |
| |
| channel_lock_with_msg(c, channel_lock_msg); |
| info->host = c; |
| strlcpy(info->token, token, sizeof(info->token)); |
| |
| /* Send the redirect command to the next TCF entity */ |
| |
| if (ps != NULL) { |
| send_redirect_command_by_props(target, ps, command_redirect_done, info); |
| } |
| else { |
| send_redirect_command_by_id(target, id, command_redirect_done, info); |
| } |
| |
| if (ps != NULL) peer_server_free(ps); |
| } |
| |
| static void proxy_connected(Channel * c) { |
| int i; |
| Proxy * target = (Proxy *)c->client_data; |
| Proxy * host = target + target->other; |
| |
| assert(target->c == c); |
| if (target->other == 1) { |
| /* We get here after sending hello to host */ |
| return; |
| } |
| assert(c->state == ChannelStateConnected); |
| assert(host->c->state == ChannelStateHelloReceived); |
| |
| host->c->disable_zero_copy = !target->c->out.supports_zero_copy; |
| |
| trace(LOG_PROXY, "Proxy connected, target services:"); |
| for (i = 0; i < target->c->peer_service_cnt; i++) { |
| char * nm = target->c->peer_service_list[i]; |
| trace(LOG_PROXY, " %s", nm); |
| if (strcmp(nm, "ZeroCopy") == 0) continue; |
| protocol_get_service(host->proto, nm); |
| } |
| |
| /* |
| * Intercept the Locator.redirect command to update the local list of |
| * services with the ones from the next TCF entity (agent), and send a |
| * consolidate list to the previous TCF entity (client). This is |
| * required in the case of more than one server between the client and |
| * the agent. |
| */ |
| |
| add_command_handler2(host->c->protocol, "Locator", "redirect", |
| command_locator_redirect, target->c); |
| |
| for (i = 0; i < redirection_listeners_cnt; i++) { |
| redirection_listeners[i](host->c, target->c); |
| } |
| |
| send_hello_message(host->c); |
| } |
| |
| static void proxy_disconnected(Channel * c) { |
| Proxy * proxy = (Proxy *)c->client_data; |
| |
| assert(c == proxy->c); |
| if (proxy[proxy->other].c->state == ChannelStateDisconnected) { |
| trace(LOG_PROXY, "Proxy disconnected"); |
| if (proxy->other == -1) proxy--; |
| broadcast_group_free(c->bcg); |
| assert(proxy[0].c->bcg == NULL); |
| assert(proxy[1].c->bcg == NULL); |
| proxy[0].c->client_data = NULL; |
| proxy[1].c->client_data = NULL; |
| protocol_release(proxy[0].proto); |
| protocol_release(proxy[1].proto); |
| channel_unlock_with_msg(proxy[0].c, channel_lock_msg); |
| channel_unlock_with_msg(proxy[1].c, channel_lock_msg); |
| loc_free(proxy); |
| } |
| else { |
| channel_close(proxy[proxy->other].c); |
| } |
| } |
| |
| #if ENABLE_Trace |
| |
| static char log_buf[1024]; |
| static size_t log_pos = 0; |
| |
| static void log_chr(int c) { |
| if (log_pos + 2 < sizeof log_buf) log_buf[log_pos++] = (char)c; |
| } |
| |
| static void log_str(const char * s) { |
| char c; |
| while ((c = *s++) != '\0') { |
| if (log_pos + 2 < sizeof log_buf) log_buf[log_pos++] = c; |
| } |
| } |
| |
| static void log_byte_func(int i) { |
| if (i >= ' ' && i < 127) { |
| /* Printable ASCII */ |
| log_chr(i); |
| } |
| else if (i == 0) { |
| log_chr(' '); |
| } |
| else if (i > 0) { |
| char buf[16]; |
| snprintf(buf, sizeof buf, "\\x%02x", i); |
| log_str(buf); |
| } |
| else if (i == MARKER_EOM) { |
| log_str("<eom>"); |
| } |
| else if (i == MARKER_EOS) { |
| log_str("<eom>"); |
| } |
| else { |
| log_str("<?>"); |
| } |
| } |
| |
| #define log_byte(b) { if (log_mode & LOG_TCFLOG) log_byte_func(b); } |
| |
| static int log_start(Proxy * proxy, char ** argv, int argc, int * limit) { |
| int i; |
| int res = PROXY_FILTER_NOT_FILTERED; |
| log_pos = 0; |
| *limit = 0; |
| |
| if (log_mode & LOG_TCFLOG) { |
| if (proxy_log_filter_listener) { |
| res = proxy_log_filter_listener(proxy->c, proxy[proxy->other].c, argc, argv); |
| if (res) return PROXY_FILTER_FILTERED; |
| } |
| else if (proxy_log_filter_listener2) { |
| res = proxy_log_filter_listener2(proxy->c, proxy[proxy->other].c, argc, argv, limit); |
| /* If we have PROXY_FILTER_LIMIT, we want to see --> or <-- */ |
| if (res == PROXY_FILTER_FILTERED) return res; |
| } |
| log_str(proxy->other > 0 ? "---> " : "<--- "); |
| for (i = 0; i < argc; i++) { |
| log_str(argv[i]); |
| log_chr(' '); |
| } |
| } |
| return res; |
| } |
| |
| static void log_flush(Proxy * proxy) { |
| if (log_mode & LOG_TCFLOG) { |
| log_chr(0); |
| trace(LOG_TCFLOG, "%d: %s", proxy->instance, log_buf); |
| } |
| } |
| |
| #else |
| |
| #define log_start(a, b, c, d) 0 |
| #define log_byte(a) do {} while(0) |
| #define log_flush(a) do {} while(0) |
| |
| #endif |
| |
| static void proxy_default_message_handler(Channel * c, char ** argv, int argc) { |
| /* TODO: if proxy is connected to itself, it can deadlock when retransmitting a long message */ |
| Proxy * proxy = (Proxy *)c->client_data; |
| Channel * otherc = proxy[proxy->other].c; |
| InputStream * inp = &c->inp; |
| OutputStream * out = &otherc->out; |
| int i = 0; |
| int filtered = 0; |
| int filter_cnt = 0; |
| int limit = 0; |
| |
| assert(c == proxy->c); |
| assert(argc > 0 && strlen(argv[0]) == 1); |
| if (proxy[proxy->other].c->state == ChannelStateDisconnected) return; |
| |
| if (argv[0][0] == 'C') { |
| write_stringz(out, argv[0]); |
| /* Prefix token with 'R'emote to distinguish from locally generated commands */ |
| write_stream(out, 'R'); |
| i = 1; |
| } |
| else if (argv[0][0] == 'R' || argv[0][0] == 'P' || argv[0][0] == 'N') { |
| if (argv[1][0] != 'R') { |
| trace(LOG_ALWAYS, "Reply with unexpected token: %s", argv[1]); |
| exception(ERR_PROTOCOL); |
| } |
| argv[1]++; |
| } |
| |
| while (i < argc) write_stringz(out, argv[i++]); |
| |
| filtered = log_start(proxy, argv, argc, &limit); |
| /* Copy body of message */ |
| do { |
| if (out->supports_zero_copy && |
| #if ENABLE_Trace |
| (log_mode & LOG_TCFLOG) == 0 && |
| #endif |
| inp->end - inp->cur >= 0x100) { |
| write_block_stream(out, (char *)inp->cur, inp->end - inp->cur); |
| inp->cur = inp->end; |
| } |
| else { |
| i = read_stream(inp); |
| if ((filtered == PROXY_FILTER_NOT_FILTERED) || |
| (filtered == PROXY_FILTER_LIMIT && filter_cnt < limit)) { |
| log_byte(i); |
| filter_cnt++; |
| #if ENABLE_Trace |
| if (filtered == PROXY_FILTER_LIMIT && filter_cnt == limit) { |
| log_str("..."); |
| /* Don't quit the loop, we need to write the entire message */ |
| } |
| #endif |
| } |
| write_stream(out, i); |
| } |
| } |
| while (i != MARKER_EOM && i != MARKER_EOS); |
| if (filtered == PROXY_FILTER_NOT_FILTERED || |
| filtered == PROXY_FILTER_LIMIT) log_flush(proxy); |
| } |
| |
| static void proxy_update(Channel * c1, Channel * c2) { |
| Proxy * proxy; |
| |
| /* c1 is host */ |
| assert (c1->state == ChannelStateConnected); |
| /* c2 is target */ |
| assert (c2->state == ChannelStateHelloSent); |
| |
| /* Check that both channels form a proxy */ |
| assert(proxy_get_host_channel(c1) == c1); |
| assert(proxy_get_target_channel(c1) == c2); |
| assert(proxy_get_host_channel(c2) == c1); |
| assert(proxy_get_target_channel(c2) == c2); |
| |
| proxy = (Proxy *)c1->client_data; |
| if (proxy->other == -1) proxy--; |
| |
| /* Create new protocol object for the host channel. Do this |
| * before call to notify_channel_closed() below to be consistent |
| * with proxy_create(). */ |
| proxy[0].proto = protocol_alloc(); |
| |
| /* Update the state of the host channel to react correctly on the |
| * hello message from the redirected target and notify listeners |
| * of the new state to give them a chance to cleanup and be ready |
| * for the upcoming channel redirection listener callback in |
| * proxy_connected() when target hello message arrives. */ |
| c1->state = ChannelStateHelloReceived; |
| notify_channel_closed(c1); |
| |
| /* Replace protocol object for the host channel to make sure it |
| * does not contain any services or command handlers from before |
| * the redirect. */ |
| protocol_release(c1->protocol); |
| c1->protocol = proxy[0].proto; |
| set_default_message_handler(proxy[0].proto, proxy_default_message_handler); |
| } |
| |
| void proxy_create(Channel * c1, Channel * c2) { |
| TCFBroadcastGroup * bcg = broadcast_group_alloc(); |
| Proxy * proxy = (Proxy *)loc_alloc_zero(2 * sizeof *proxy); |
| int i; |
| |
| static int instance; |
| |
| assert(c1->state == ChannelStateRedirectReceived); |
| assert(c2->state == ChannelStateStartWait); |
| |
| /* Host */ |
| channel_lock_with_msg(c1, channel_lock_msg); |
| proxy[0].c = c1; |
| proxy[0].proto = protocol_alloc(); |
| proxy[0].other = 1; |
| proxy[0].instance = instance; |
| |
| /* Target */ |
| channel_lock_with_msg(c2, channel_lock_msg); |
| proxy[1].c = c2; |
| proxy[1].proto = protocol_alloc(); |
| proxy[1].other = -1; |
| proxy[1].instance = instance++; |
| |
| trace(LOG_PROXY, "Proxy created, host services:"); |
| for (i = 0; i < c1->peer_service_cnt; i++) { |
| char * nm = c1->peer_service_list[i]; |
| trace(LOG_PROXY, " %s", nm); |
| if (strcmp(nm, "ZeroCopy") == 0) continue; |
| protocol_get_service(proxy[1].proto, nm); |
| } |
| c1->state = ChannelStateHelloReceived; |
| notify_channel_closed(c1); |
| protocol_release(c1->protocol); |
| c1->client_data = NULL; |
| assert(c2->protocol == NULL); |
| |
| c1->connecting = proxy_connecting; |
| c1->connected = proxy_connected; |
| c1->disconnected = proxy_disconnected; |
| c1->client_data = proxy; |
| c1->protocol = proxy[0].proto; |
| set_default_message_handler(proxy[0].proto, proxy_default_message_handler); |
| |
| c2->connecting = proxy_connecting; |
| c2->connected = proxy_connected; |
| c2->disconnected = proxy_disconnected; |
| c2->client_data = proxy + 1; |
| c2->protocol = proxy[1].proto; |
| set_default_message_handler(proxy[1].proto, proxy_default_message_handler); |
| |
| channel_set_broadcast_group(c1, bcg); |
| channel_set_broadcast_group(c2, bcg); |
| channel_start(c2); |
| } |
| |
| Channel * proxy_get_host_channel(Channel * c) { |
| Proxy * proxy = (Proxy *)c->client_data; |
| |
| if (c->connecting != proxy_connecting || proxy == NULL || c != proxy->c) return NULL; |
| if (proxy->other == -1) proxy--; |
| return proxy[0].c; |
| } |
| |
| Channel * proxy_get_target_channel(Channel * c) { |
| Proxy * proxy = (Proxy *)c->client_data; |
| |
| if (c->connecting != proxy_connecting || proxy == NULL || c != proxy->c) return NULL; |
| if (proxy->other == -1) proxy--; |
| return proxy[1].c; |
| } |
| |
| void add_channel_redirection_listener(ChannelRedirectionListener listener) { |
| assert(redirection_listeners_cnt < (int)(sizeof(redirection_listeners) / sizeof(ChannelRedirectionListener))); |
| redirection_listeners[redirection_listeners_cnt++] = listener; |
| } |
| |
| ProxyLogFilterListener set_proxy_log_filter_listener(ProxyLogFilterListener listener) { |
| ProxyLogFilterListener old = proxy_log_filter_listener; |
| proxy_log_filter_listener = listener; |
| return old; |
| } |
| |
| ProxyLogFilterListener2 set_proxy_log_filter_listener2(ProxyLogFilterListener2 listener) { |
| ProxyLogFilterListener2 old = proxy_log_filter_listener2; |
| proxy_log_filter_listener2 = listener; |
| return old; |
| } |