| /******************************************************************************* |
| * Copyright (c) 2016 Wind River Systems, Inc. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * You may elect to redistribute this code under either of these licenses. |
| * |
| * Contributors: |
| * Wind River Systems - initial API and implementation |
| *******************************************************************************/ |
| |
| /* |
| * Implements input and output stream over WebSocket (both secured and |
| * insecured). |
| * This is implemented on top of libwebsockets library; for more information |
| * about libwebsockets, look at https://libwebsockets.org/. |
| */ |
| #if defined(__GNUC__) && !defined(_GNU_SOURCE) |
| # define _GNU_SOURCE |
| #endif |
| #include <tcf/config.h> |
| |
| #if ENABLE_LibWebSockets |
| #include <fcntl.h> |
| #include <stddef.h> |
| #include <errno.h> |
| #include <assert.h> |
| #include <string.h> |
| #include <sys/stat.h> |
| #include <ctype.h> |
| #if defined(__linux__) |
| #include <sys/epoll.h> |
| #endif |
| #include <tcf/framework/mdep-threads.h> |
| #include <tcf/framework/mdep-fs.h> |
| #include <tcf/framework/mdep-inet.h> |
| #include <tcf/framework/tcf.h> |
| #include <tcf/framework/channel.h> |
| #include <tcf/framework/channel_lws.h> |
| #include <tcf/framework/channel_lws_ext.h> |
| #include <tcf/framework/myalloc.h> |
| #include <tcf/framework/protocol.h> |
| #include <tcf/framework/errors.h> |
| #include <tcf/framework/events.h> |
| #include <tcf/framework/exceptions.h> |
| #include <tcf/framework/trace.h> |
| #include <tcf/framework/json.h> |
| #include <tcf/framework/peer.h> |
| #include <tcf/framework/ip_ifc.h> |
| #include <tcf/framework/asyncreq.h> |
| #include <tcf/framework/inputbuf.h> |
| #include <tcf/framework/outputbuf.h> |
| #include <tcf/services/discovery.h> |
| #include <libwebsockets.h> |
| |
| #ifdef WIN32 |
| #ifndef ECONNREFUSED |
| #define ECONNREFUSED WSAECONNREFUSED |
| #endif |
| #endif |
| |
| #ifndef MSG_MORE |
| #define MSG_MORE 0 |
| #endif |
| |
| #if defined(__linux__) |
| #define ENABLE_Epoll 1 |
| #else |
| #define ENABLE_Epoll 0 |
| #endif |
| |
| /* It seems that epoll and poll events macros are identical. This is |
| * tested in the init routine. Let's make the conversion routine a nop. |
| */ |
| #define POLL_TO_EPOLL_EVENT(events,eevents) eevents = events; |
| #define EPOLL_TO_POLL_EVENT(events,eevents) events = eevents; |
| |
| #define BUF_SIZE OUTPUT_QUEUE_BUF_SIZE |
| #define CHANNEL_MAGIC 0x43253234 |
| #define MAX_IFC 10 |
| #define MAX_CONTEXT_THREADS 8 |
| #define MAX_CONN_REQUESTS 16 |
| |
| typedef struct ChannelConnectInfo { |
| LINK link; |
| ChannelConnectCallBack callback; |
| void * callback_args; |
| int is_ssl; |
| int self_signed; |
| int port; |
| char * host; |
| const char * get_url; |
| int error; |
| struct SessionData * data; |
| const char * ca_filepath; |
| const char * certificate; |
| const char * cipher_list; |
| const char * key; |
| unsigned int options; |
| } ChannelConnectInfo; |
| |
| typedef struct ServerCreateInfo { |
| LINK link; |
| int port; |
| int error; |
| const char * ca_filepath; |
| const char * certificate; |
| const char * key; |
| const char * cipher_list; |
| unsigned int options; |
| struct ServerWS * si; |
| } ServerCreateInfo; |
| |
| #define link2cci(A) ((ChannelConnectInfo *)((char *)(A) - offsetof(ChannelConnectInfo, link))) |
| #define link2sci(A) ((ServerCreateInfo *)((char *)(A) - offsetof(ServerCreateInfo, link))) |
| |
| typedef struct ChannelWS { |
| Channel * chan; /* Public channel information - must be first */ |
| int magic; /* Magic number */ |
| int lock_cnt; /* Stream lock count, when > 0 channel cannot be deleted */ |
| int read_pending; /* Read request is pending */ |
| int read_done_posted; |
| unsigned char * read_buf; |
| ssize_t read_buf_size; |
| int read_done; |
| |
| /* Input stream buffer */ |
| InputBuf ibuf; |
| |
| /* Output stream state */ |
| unsigned char * out_bin_block; |
| OutputBuffer * obuf; |
| int out_errno; |
| int out_flush_cnt; /* Number of posted lazy flush events */ |
| int out_eom_cnt; /* Number of end-of-message markers in the output buffer */ |
| OutputQueue out_queue; |
| int is_ssl; |
| struct { |
| char data [BUF_SIZE + LWS_SEND_BUFFER_PRE_PADDING + |
| LWS_SEND_BUFFER_POST_PADDING]; |
| ssize_t len; |
| ssize_t written; |
| int error; |
| } outbuf; |
| struct ChannelInputBuffer * inbuf; |
| struct SessionData * data; |
| int closing; |
| } ChannelWS; |
| |
| typedef struct ChannelInputBuffer ChannelInputBuffer; |
| struct ChannelInputBuffer { |
| void * in; |
| ssize_t len; |
| struct ChannelInputBuffer * next; |
| int error; |
| struct SessionData * data; |
| }; |
| |
| typedef struct ServerWS { |
| ChannelServer serv; |
| LINK servlink; |
| int is_ssl; |
| int exiting; |
| } ServerWS; |
| |
| typedef struct SessionData { |
| pthread_mutex_t mutex; |
| ChannelWS * c; |
| ServerWS * si; /* server structure (null if client connection) */ |
| struct lws * wsi; |
| struct sockaddr * addr_buf; /* Socket remote address */ |
| socklen_t addr_len; |
| char ** prop_names; |
| char ** prop_values; |
| unsigned prop_cnt; |
| } SessionData; |
| |
| typedef struct WSIUserData { |
| SessionData * data; /* session data */ |
| ChannelConnectInfo * args; /* connection args (client connection only) */ |
| void * cb_arg; |
| } WSIUserData; |
| |
| static size_t channel_lws_extension_offset = 0; |
| |
| #define EXT(ctx) ((ChannelWS **)((char *)(ctx) + channel_lws_extension_offset)) |
| |
| #define channel2ws(A) (*EXT(A)) |
| #define inp2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, inp))) |
| #define out2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, out))) |
| #define server2ws(A) ((ServerWS *)((char *)(A) - offsetof(ServerWS, serv))) |
| #define servlink2np(A) ((ServerWS *)((char *)(A) - offsetof(ServerWS, servlink))) |
| #define ibuf2ws(A) ((ChannelWS *)((char *)(A) - offsetof(ChannelWS, ibuf))) |
| #define obuf2ws(A) ((ChannelWS *)((char *)(A) - offsetof(ChannelWS, out_queue))) |
| |
| static void lws_channel_read_done(void * x); |
| static void handle_channel_msg(void * x); |
| static int lws_tcf_callback(struct lws *wsi, enum lws_callback_reasons reason, |
| void *user, void *in, size_t len); |
| static void server_lws_connect_done(void * x); |
| static void channel_lws_connect_done(void * x); |
| static void lws_shutdown(ChannelWS * c); |
| static void done_write_request(void * args); |
| static void lws_lock(Channel * channel); |
| static void lws_unlock(Channel * channel); |
| |
| static LINK server_list; |
| static pthread_mutex_t lws_list_mutex; |
| static LINK client_connect_list = TCF_LIST_INIT(client_connect_list); |
| static LINK server_create_list = TCF_LIST_INIT(server_create_list); |
| static int pending_connections = 0; /* number of simultaneous pending connections */ |
| static int dummy_socket = -1; /* dummy IP socket we use for various operations */ |
| static struct lws_vhost * client_vhost = NULL; |
| |
| #if ENABLE_Epoll |
| static int epoll_fd; /* epoll file descriptor for lws context */ |
| static uint32_t * epoll_events; /* epoll events for lws context */ |
| static int dummy_pipe_fds[2]; |
| #endif |
| static int deny_deflate; |
| static pthread_mutex_t poll_mutex; |
| static struct lws_context *lws_ctx; |
| |
| |
| /* list of supported protocols and callbacks */ |
| |
| static struct lws_protocols protocols[] = { |
| {"tcf", lws_tcf_callback, sizeof(WSIUserData), BUF_SIZE}, |
| {NULL, NULL, 0, 0 } /* end */ |
| }; |
| |
| static const struct lws_extension exts[] = { |
| { |
| "permessage-deflate", |
| lws_extension_callback_pm_deflate, |
| "permessage-deflate; client_max_window_bits" |
| }, |
| { |
| "deflate-frame", |
| lws_extension_callback_pm_deflate, |
| "deflate_frame" |
| }, |
| {NULL, NULL, NULL /* terminator */ } |
| }; |
| |
| #if ENABLE_Epoll |
| static void channel_lws_abort_epoll() { |
| char buf = 0; |
| if (write(dummy_pipe_fds[1], &buf, 1)); |
| } |
| #endif |
| |
| static void lws_channel_event_read (void * args) { |
| ChannelInputBuffer * buf = (ChannelInputBuffer *)args; |
| ChannelWS * c; |
| |
| pthread_mutex_lock(&buf->data->mutex); |
| c = buf->data->c; |
| if (buf->data->c) { |
| if (c->inbuf) { |
| ChannelInputBuffer * inbuf; |
| for (inbuf = c->inbuf; inbuf->next != NULL ; inbuf = |
| inbuf->next) |
| ; |
| assert (inbuf->next == NULL); |
| buf->next = NULL; |
| inbuf->next = buf; |
| } |
| else { |
| buf->next = NULL; |
| c->inbuf = buf; |
| } |
| if (buf->len == 0) c->closing = 1; |
| if (c->read_pending) { |
| if (!c->read_done_posted) { |
| c->read_done_posted = 1; |
| post_event(lws_channel_read_done, c); |
| } |
| } |
| if (c->closing) { |
| /* The channel is now closed; post an additional lws_unlock |
| * call to trigger the deletion of the channel. The original |
| * lock was done when the channel was established |
| */ |
| post_event((EventCallBack *)lws_unlock, c->chan); |
| } |
| } |
| pthread_mutex_unlock(&buf->data->mutex); |
| } |
| |
| static void lws_add_channel_property(SessionData * data, char * name, char * value) { |
| data->prop_names = (char **)loc_realloc(data->prop_names, (data->prop_cnt + 1) * sizeof(char *)); |
| data->prop_values = (char **)loc_realloc(data->prop_values, (data->prop_cnt + 1) * sizeof(char *)); |
| data->prop_names[data->prop_cnt] = name; |
| data->prop_values[data->prop_cnt] = value; |
| data->prop_cnt++; |
| } |
| |
| static void lws_parse_http_header(SessionData * data, struct lws *wsi) { |
| int n = 0, len; |
| char * prop_name; |
| char * prop_value; |
| |
| /* Do not extract all properties but only the one that we (currently) |
| * consider as useful; that is the URI and URI args. Extracting |
| * all properties may require a lot of memory (especially with cookies). |
| */ |
| len = lws_hdr_total_length(wsi, WSI_TOKEN_GET_URI); |
| if (len) { |
| prop_name = (char *)loc_strdup("get_uri"); |
| prop_value = (char *)loc_alloc_zero(len + 1); |
| lws_hdr_copy(wsi, prop_value, len + 1, WSI_TOKEN_GET_URI); |
| lws_add_channel_property(data, prop_name, prop_value); |
| } |
| len = lws_hdr_total_length(wsi, WSI_TOKEN_HTTP_URI_ARGS); |
| if (len) { |
| char * buf = (char *)loc_alloc_zero(len + 1); |
| n = 0; |
| while (lws_hdr_copy_fragment(wsi, buf, len + 1, |
| WSI_TOKEN_HTTP_URI_ARGS, n) > 0) { |
| prop_name = (char *)loc_alloc_zero(32); |
| sprintf(prop_name, "uri-args_%d", n); |
| prop_value = loc_strdup(buf); |
| lws_add_channel_property(data, prop_name, prop_value); |
| n++; |
| } |
| loc_free(buf); |
| } |
| } |
| |
| static int lws_tcf_callback(struct lws *wsi, enum lws_callback_reasons reason, |
| void *user, void *in, size_t len) |
| { |
| struct lws_pollargs *pa = (struct lws_pollargs *)in; |
| WSIUserData * userdata = (WSIUserData *) user; |
| |
| switch (reason) { |
| case LWS_CALLBACK_CLIENT_ESTABLISHED: |
| case LWS_CALLBACK_ESTABLISHED: |
| { |
| assert (userdata != NULL); |
| int is_server = (reason == LWS_CALLBACK_ESTABLISHED); |
| |
| /* Allocate session data. Note that the session data will |
| * be freed from the dispatch thread to avoid any race |
| * condition. Any update to the SessionData will be done |
| * while holding the session data mutex. |
| */ |
| |
| SessionData * data; |
| |
| if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1; |
| |
| data = (SessionData *)loc_alloc_zero(sizeof (SessionData)); |
| #if defined(SOCK_MAXADDRLEN) |
| data->addr_len = SOCK_MAXADDRLEN; |
| #else |
| data->addr_len = 0x1000; |
| #endif |
| pthread_mutex_init(&data->mutex, NULL); |
| data->wsi = wsi; |
| data->addr_buf = (struct sockaddr *)loc_alloc_zero(data->addr_len); |
| if (getpeername(lws_get_socket_fd(wsi), |
| data->addr_buf, &data->addr_len) != 0) { |
| data->addr_len = 0; |
| } |
| |
| userdata->data = data; |
| if (is_server) { |
| data->si = *(ServerWS **) lws_protocol_vh_priv_get(lws_vhost_get(wsi), protocols); |
| #if defined(LWS_OPENSSL_SUPPORT) |
| X509 * certificate = NULL; |
| if (in) certificate = SSL_get_peer_certificate((const SSL *)in); |
| if (certificate) { |
| char * name = X509_NAME_oneline(X509_get_subject_name(certificate), 0, 0); |
| if (name) lws_add_channel_property(data, loc_strdup("PeerCertName"), loc_strdup(name)); |
| free(name); |
| X509_free(certificate); |
| } |
| #endif |
| lws_parse_http_header(data, wsi); |
| |
| post_event(server_lws_connect_done, data); |
| } |
| else { |
| ChannelConnectInfo * args = userdata->args; |
| args->data = data; |
| userdata->args = NULL; |
| post_event(channel_lws_connect_done, args); |
| } |
| break; |
| } |
| case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: |
| { |
| assert (userdata != NULL); |
| ChannelConnectInfo * args = userdata->args; |
| |
| /* connection error hook is called multiple times; make |
| * sure we do not call the channel connection callback |
| * multiple times; use the error field to detect this. |
| */ |
| if (args) { |
| args->error = ECONNREFUSED; |
| userdata->args = NULL; |
| (void) LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len); |
| post_event(channel_lws_connect_done, args); |
| } |
| } |
| break; |
| |
| case LWS_CALLBACK_CLOSED: |
| { |
| assert (userdata != NULL); |
| SessionData * data = userdata->data; |
| |
| if (data != NULL) { |
| ChannelInputBuffer * buf = (ChannelInputBuffer *)loc_alloc_zero(sizeof(ChannelInputBuffer)); |
| |
| buf->data = data; |
| pthread_mutex_lock(&buf->data->mutex); |
| assert (buf->data->wsi != NULL); |
| buf->data->wsi = NULL; |
| /* The channel can now be closed. The channel was original locked when |
| * it was created to make sure it can be safely referenced in the service |
| * thread; at this point we know it will no longer be used; we can |
| * unlock it. |
| */ |
| if (data->c) { |
| if (data->c->outbuf.len > 0 && data->c->outbuf.written == 0) { |
| data->c->outbuf.written = -1; |
| data->c->outbuf.len = 0; |
| data->c->outbuf.error = ECONNRESET; |
| post_event(done_write_request, data->c); |
| } |
| } |
| post_event(lws_channel_event_read, buf); |
| pthread_mutex_unlock(&data->mutex); |
| |
| /* At this point, this session data will no longer be used |
| * again by the LWS context; we can safely delete it from |
| * the dispatch thread. |
| */ |
| |
| } |
| (void)LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len); |
| } |
| break; |
| |
| case LWS_CALLBACK_RECEIVE: |
| case LWS_CALLBACK_CLIENT_RECEIVE: |
| assert (userdata != NULL); |
| |
| if (len > 0) { |
| ChannelInputBuffer * buf = (ChannelInputBuffer *)loc_alloc_zero(sizeof(ChannelInputBuffer)); |
| SessionData * data = userdata->data; |
| buf->in = loc_alloc_zero(len); |
| buf->len = len; |
| buf->data = data; |
| memcpy(buf->in, in, len); |
| post_event(lws_channel_event_read, buf); |
| } |
| break; |
| |
| case LWS_CALLBACK_SERVER_WRITEABLE: |
| case LWS_CALLBACK_CLIENT_WRITEABLE: |
| { |
| assert (userdata != NULL); |
| SessionData * data = userdata->data; |
| int n; |
| pthread_mutex_lock(&data->mutex); |
| if (data->c->outbuf.written != 0 || data->c->outbuf.len == 0) { |
| pthread_mutex_unlock(&data->mutex); |
| return 0; |
| } |
| if (data->c->outbuf.len == -1) { |
| /* close connection */ |
| pthread_mutex_unlock(&data->mutex); |
| return 1; |
| } |
| n = lws_write(wsi, (unsigned char *)data->c->outbuf.data + LWS_SEND_BUFFER_PRE_PADDING, data->c->outbuf.len, LWS_WRITE_BINARY); |
| if (n < 0) { |
| data->c->outbuf.written = -1; |
| data->c->outbuf.error = errno; |
| } else { |
| data->c->outbuf.written = n; |
| data->c->outbuf.error = 0; |
| } |
| data->c->outbuf.len = 0; |
| pthread_mutex_unlock(&data->mutex); |
| post_event(done_write_request, data->c); |
| } |
| break; |
| |
| case LWS_CALLBACK_WSI_DESTROY: |
| if (lws_vhost_get(wsi) == client_vhost) { |
| loc_free(userdata); |
| } |
| break; |
| |
| case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED: |
| if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1; |
| if ((strcmp((const char *)in, "deflate-stream") == 0) && deny_deflate) { |
| lwsl_notice("denied deflate-stream extension\n"); |
| return 1; |
| } |
| if ((strcmp((const char *)in, "x-webkit-deflate-frame") == 0)) return 1; |
| if ((strcmp((const char *)in, "deflate-frame") == 0)) return 1; |
| break; |
| |
| #if defined(LWS_OPENSSL_SUPPORT) |
| case LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION: |
| { |
| int preverify_ok = len; |
| X509_STORE_CTX *ctx = (X509_STORE_CTX *)user; |
| char buf[256]; |
| int err, depth; |
| |
| if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1; |
| |
| err = X509_STORE_CTX_get_error(ctx); |
| |
| if (!preverify_ok) { |
| X509 * err_cert = X509_STORE_CTX_get_current_cert(ctx); |
| X509_NAME_oneline(X509_get_subject_name(err_cert), buf, |
| sizeof(buf)); |
| depth = X509_STORE_CTX_get_error_depth(ctx); |
| trace( LOG_PROTOCOL, "Client certificate verify error:num=%d:%s:depth=%d:%s", err, |
| X509_verify_cert_error_string(err), depth, buf); |
| } |
| |
| /* |
| * At this point, err contains the last verification error. We can use |
| * it for something special |
| */ |
| if (!preverify_ok |
| && (err == X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT)) { |
| X509_NAME_oneline(X509_get_issuer_name(ctx->current_cert), buf, |
| sizeof(buf)); |
| trace( LOG_PROTOCOL, "Client certificate issuer= %s", buf); |
| } |
| |
| if (!preverify_ok) return 1; |
| break; |
| } |
| #endif |
| |
| case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS: |
| (void)LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len); |
| break; |
| |
| case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: |
| (void)LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len); |
| break; |
| |
| /* |
| * callbacks for managing the external poll() array appear in |
| * protocol 0 callback |
| */ |
| |
| case LWS_CALLBACK_LOCK_POLL: |
| /* |
| * lock mutex to protect pollfd state |
| * called before any other POLL related callback |
| * if protecting wsi lifecycle change, len == 1 |
| */ |
| if (len) pthread_mutex_lock(&poll_mutex); |
| break; |
| |
| case LWS_CALLBACK_UNLOCK_POLL: |
| /* |
| * unlock mutex to protect pollfd state when |
| * called after any other POLL related callback |
| * if protecting wsi lifecycle change, len == 1 |
| */ |
| if (len) pthread_mutex_unlock(&poll_mutex); |
| break; |
| |
| #if ENABLE_Epoll |
| case LWS_CALLBACK_ADD_POLL_FD: |
| { |
| struct epoll_event event; |
| POLL_TO_EPOLL_EVENT(pa->events, epoll_events[pa->fd]); |
| memset(&event, 0, sizeof(struct epoll_event)); |
| event.data.fd = pa->fd; |
| event.events = epoll_events[pa->fd]; |
| if (epoll_ctl (epoll_fd, EPOLL_CTL_ADD, pa->fd, &event) != 0) return 1; |
| break; |
| } |
| |
| case LWS_CALLBACK_DEL_POLL_FD: |
| if (epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pa->fd, NULL) != 0) return 1; |
| break; |
| |
| case LWS_CALLBACK_CHANGE_MODE_POLL_FD: |
| { |
| struct epoll_event event; |
| memset(&event, 0, sizeof(struct epoll_event)); |
| POLL_TO_EPOLL_EVENT(pa->events, epoll_events[pa->fd]); |
| event.data.fd = pa->fd; |
| event.events = epoll_events[pa->fd]; |
| if (epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pa->fd, &event) != 0) return 1; |
| break; |
| } |
| #endif |
| |
| case LWS_CALLBACK_GET_THREAD_ID: |
| { |
| /* |
| * For lws multi-thread support, we need to return a different |
| * thread ID for each thread. |
| * |
| * On Windows, thread ID does not fit on a 32-bit, let's use |
| * thread index instead of thread ID. Note that we must never |
| * return 0 since this will disable locking. |
| */ |
| |
| if (is_dispatch_thread()) return 1; |
| else return 2; |
| } |
| break; |
| default: |
| if (LWS_CALLBACK_USER_HOOK(&userdata->cb_arg, wsi, reason, user, in, len)) return 1; |
| break; |
| } |
| return 0; |
| } |
| |
| static void delete_channel(ChannelWS * c) { |
| unsigned ix; |
| trace(LOG_PROTOCOL, "Deleting channel %#lx", c); |
| assert(c->lock_cnt == 0); |
| assert(c->out_flush_cnt == 0); |
| assert(c->magic == CHANNEL_MAGIC); |
| assert(c->read_pending == 0); |
| assert(c->ibuf.handling_msg != HandleMsgTriggered); |
| channel_clear_broadcast_group(c->chan); |
| list_remove(&c->chan->chanlink); |
| if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root)) |
| shutdown_set_stopped(&channel_shutdown); |
| c->magic = 0; |
| output_queue_clear(&c->out_queue); |
| output_queue_free_obuf(c->obuf); |
| loc_free(c->ibuf.buf); |
| loc_free(c->chan->peer_name); |
| loc_free(c->data->addr_buf); |
| if (c->data->prop_cnt) { |
| for (ix = 0; ix < c->data->prop_cnt; ix++) { |
| loc_free(c->data->prop_names[ix]); |
| loc_free(c->data->prop_values[ix]); |
| } |
| loc_free(c->data->prop_names); |
| loc_free(c->data->prop_values); |
| } |
| loc_free(c->data); |
| channel_free(c->chan); |
| loc_free(c); |
| } |
| |
| static void lws_lock(Channel * channel) { |
| ChannelWS * c = channel2ws(channel); |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| c->lock_cnt++; |
| } |
| |
| static void lws_unlock(Channel * channel) { |
| ChannelWS * c = channel2ws(channel); |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| assert(c->lock_cnt > 0); |
| c->lock_cnt--; |
| if (c->lock_cnt == 0) { |
| assert(!c->read_pending); |
| delete_channel(c); |
| } |
| } |
| static int lws_is_closed(Channel * channel) { |
| ChannelWS * c = channel2ws(channel); |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| assert(c->lock_cnt > 0); |
| return c->chan->state == ChannelStateDisconnected; |
| } |
| |
| static void done_write_request(void * args) { |
| ChannelWS * c = (ChannelWS *)args; |
| int size = 0; |
| int error = 0; |
| |
| assert (c->out_errno == 0); |
| pthread_mutex_lock(&c->data->mutex); |
| if (c->outbuf.written < 0) error = c->outbuf.error; |
| size = c->outbuf.written; |
| c->outbuf.written = 0; |
| c->outbuf.len = 0; |
| pthread_mutex_unlock(&c->data->mutex); |
| output_queue_done(&c->out_queue, error, size); |
| if (error) c->out_errno = error; |
| if (output_queue_is_empty(&c->out_queue) && |
| c->chan->state == ChannelStateDisconnected) lws_shutdown(c); |
| lws_unlock(c->chan); |
| } |
| |
| static void post_write_request(OutputBuffer * bf) { |
| ChannelWS * c = obuf2ws(bf->queue); |
| int posted = 0; |
| |
| pthread_mutex_lock(&c->data->mutex); |
| |
| /* We should not enter here before previous send has been handled */ |
| assert (c->outbuf.len == 0); |
| |
| if (!c->closing && c->data->wsi) { |
| int res = 0; |
| c->outbuf.written = 0; |
| c->outbuf.len = bf->buf_len - bf->buf_pos; |
| if (c->outbuf.len > BUF_SIZE) c->outbuf.len = BUF_SIZE; |
| memcpy((unsigned char*)c->outbuf.data + LWS_SEND_BUFFER_PRE_PADDING, bf->buf + bf->buf_pos, c->outbuf.len); |
| res = lws_callback_on_writable(c->data->wsi); |
| if (res >= 0) { |
| posted = 1; |
| lws_lock(c->chan); |
| } |
| } |
| |
| if (!posted) { |
| c->outbuf.written = -1; |
| c->outbuf.error = ECONNRESET; |
| post_event(done_write_request, c); |
| lws_lock(c->chan); |
| } |
| pthread_mutex_unlock(&c->data->mutex); |
| } |
| |
| static void lws_flush_with_flags(ChannelWS * c, int flags) { |
| unsigned char * p = c->obuf->buf; |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| assert(c->chan->out.end == p + sizeof(c->obuf->buf)); |
| assert(c->out_bin_block == NULL); |
| assert(c->chan->out.cur >= p); |
| assert(c->chan->out.cur <= p + sizeof(c->obuf->buf)); |
| if (c->chan->out.cur == p) return; |
| if (c->chan->state != ChannelStateDisconnected && c->out_errno == 0) { |
| c->obuf->buf_len = c->chan->out.cur - p; |
| c->out_queue.post_io_request = post_write_request; |
| trace(LOG_PROTOCOL, "Outbuf add size:%d",c->obuf->buf_len); |
| |
| output_queue_add_obuf(&c->out_queue, c->obuf); |
| c->obuf = output_queue_alloc_obuf(); |
| c->chan->out.end = c->obuf->buf + sizeof(c->obuf->buf); |
| } |
| c->chan->out.cur = c->obuf->buf; |
| c->out_eom_cnt = 0; |
| } |
| |
| static void lws_flush_event(void * x) { |
| ChannelWS * c = (ChannelWS *)x; |
| assert(c->magic == CHANNEL_MAGIC); |
| if (--c->out_flush_cnt == 0) { |
| int congestion_level = c->chan->congestion_level; |
| if (congestion_level > 0) usleep(congestion_level * 2500); |
| lws_flush_with_flags(c, 0); |
| lws_unlock(c->chan); |
| } |
| else if (c->out_eom_cnt > 3) { |
| lws_flush_with_flags(c, 0); |
| } |
| } |
| |
| static void lws_bin_block_start(ChannelWS * c) { |
| *c->chan->out.cur++ = ESC; |
| *c->chan->out.cur++ = 3; |
| #if BUF_SIZE > 0x4000 |
| *c->chan->out.cur++ = 0; |
| #endif |
| *c->chan->out.cur++ = 0; |
| *c->chan->out.cur++ = 0; |
| c->out_bin_block = c->chan->out.cur; |
| } |
| |
| static void lws_bin_block_end(ChannelWS * c) { |
| size_t len = c->chan->out.cur - c->out_bin_block; |
| if (len == 0) { |
| #if BUF_SIZE > 0x4000 |
| c->chan->out.cur -= 5; |
| #else |
| c->chan->out.cur -= 4; |
| #endif |
| } |
| else { |
| #if BUF_SIZE > 0x4000 |
| *(c->out_bin_block - 3) = (len & 0x7fu) | 0x80u; |
| *(c->out_bin_block - 2) = ((len >> 7) & 0x7fu) | 0x80u; |
| *(c->out_bin_block - 1) = (unsigned char)(len >> 14); |
| #else |
| *(c->out_bin_block - 2) = (len & 0x7fu) | 0x80u; |
| *(c->out_bin_block - 1) = (unsigned char)(len >> 7); |
| #endif |
| } |
| c->out_bin_block = NULL; |
| } |
| |
| static void lws_write_stream(OutputStream * out, int byte) { |
| ChannelWS * c = channel2ws(out2channel(out)); |
| assert(c->magic == CHANNEL_MAGIC); |
| if (!c->chan->out.supports_zero_copy || c->chan->out.cur >= c->chan->out.end - 32 || byte < 0) { |
| if (c->out_bin_block != NULL) lws_bin_block_end(c); |
| if (c->chan->out.cur == c->chan->out.end) lws_flush_with_flags(c, MSG_MORE); |
| if (byte < 0 || byte == ESC) { |
| char esc = 0; |
| *c->chan->out.cur++ = ESC; |
| if (byte == ESC) esc = 0; |
| else if (byte == MARKER_EOM) esc = 1; |
| else if (byte == MARKER_EOS) esc = 2; |
| else assert(0); |
| if (c->chan->out.cur == c->chan->out.end) lws_flush_with_flags(c, MSG_MORE); |
| *c->chan->out.cur++ = esc; |
| if (byte == MARKER_EOM) { |
| c->out_eom_cnt++; |
| if (c->out_flush_cnt < 2) { |
| if (c->out_flush_cnt++ == 0) lws_lock(c->chan); |
| /*post_event_with_delay(lws_flush_event, c, 0);*/ |
| post_event(lws_flush_event, c); |
| } |
| } |
| return; |
| } |
| } |
| else if (c->out_bin_block == NULL) { |
| lws_bin_block_start(c); |
| } |
| *c->chan->out.cur++ = (char)byte; |
| } |
| |
| static void lws_write_block_stream(OutputStream * out, const char * bytes, size_t size) { |
| unsigned char * src = (unsigned char *)bytes; |
| ChannelWS * c = channel2ws(out2channel(out)); |
| while (size > 0) { |
| size_t n = out->end - out->cur; |
| if (n > size) n = size; |
| if (n == 0) { |
| lws_write_stream(out, *src++); |
| size--; |
| } |
| else if (c->out_bin_block) { |
| memcpy(out->cur, src, n); |
| out->cur += n; |
| size -= n; |
| src += n; |
| } |
| else if (*src != ESC) { |
| unsigned char * dst = out->cur; |
| unsigned char * end = dst + n; |
| do { |
| unsigned char ch = *src; |
| if (ch == ESC) break; |
| *dst++ = ch; |
| src++; |
| } |
| while (dst < end); |
| size -= dst - out->cur; |
| out->cur = dst; |
| } |
| else { |
| lws_write_stream(out, *src++); |
| size--; |
| } |
| } |
| } |
| |
| static ssize_t lws_splice_block_stream(OutputStream * out, int fd, size_t size, int64_t * offset) { |
| assert(is_dispatch_thread()); |
| if (size == 0) return 0; |
| { |
| ssize_t rd; |
| char buffer[BUF_SIZE]; |
| if (size > BUF_SIZE) size = BUF_SIZE; |
| if (offset != NULL) { |
| rd = pread(fd, buffer, size, (off_t)*offset); |
| if (rd > 0) *offset += rd; |
| } |
| else { |
| rd = read(fd, buffer, size); |
| } |
| if (rd > 0) lws_write_block_stream(out, buffer, rd); |
| return rd; |
| } |
| } |
| |
| static void lws_post_read(InputBuf * ibuf, unsigned char * buf, size_t size) { |
| ChannelWS * c = ibuf2ws(ibuf); |
| if (c->read_pending) return; |
| c->read_pending = 1; |
| c->read_buf = buf; |
| c->read_buf_size = size; |
| if (c->inbuf) { |
| if (!c->read_done_posted) { |
| c->read_done_posted = 1; |
| post_event(lws_channel_read_done, c); |
| } |
| } |
| else if (c->closing) { |
| if (!c->read_done_posted) { |
| c->read_done_posted = 1; |
| post_event(lws_channel_read_done, c); |
| } |
| } |
| } |
| |
| static void lws_wait_read(InputBuf * ibuf) { |
| ChannelWS * c = ibuf2ws(ibuf); |
| |
| /* Wait for read to complete */ |
| assert(c->lock_cnt > 0); |
| assert(c->read_pending != 0); |
| if (c->read_done_posted) cancel_event(lws_channel_read_done, c, 1); |
| lws_channel_read_done(c); |
| } |
| |
| static int lws_read_stream(InputStream * inp) { |
| Channel * channel = inp2channel(inp); |
| ChannelWS * c = channel2ws(channel); |
| |
| assert(c->lock_cnt > 0); |
| if (inp->cur < inp->end) return *inp->cur++; |
| return ibuf_get_more(&c->ibuf, 0); |
| } |
| |
| static int lws_peek_stream(InputStream * inp) { |
| Channel * channel = inp2channel(inp); |
| ChannelWS * c = channel2ws(channel); |
| |
| assert(c->lock_cnt > 0); |
| if (inp->cur < inp->end) return *inp->cur; |
| return ibuf_get_more(&c->ibuf, 1); |
| } |
| |
| static void lws_shutdown(ChannelWS * c) { |
| pthread_mutex_lock(&c->data->mutex); |
| if (c->data->wsi) { |
| c->outbuf.len = -1; |
| /* Post a write request that will trigger a close of the channel */ |
| lws_callback_on_writable(c->data->wsi); |
| } |
| pthread_mutex_unlock(&c->data->mutex); |
| |
| } |
| |
| static void send_eof_and_close(Channel * channel, int err) { |
| ChannelWS * c = channel2ws(channel); |
| |
| assert(c->magic == CHANNEL_MAGIC); |
| if (channel->state == ChannelStateDisconnected) return; |
| ibuf_flush(&c->ibuf); |
| if (c->ibuf.handling_msg == HandleMsgTriggered) { |
| /* Cancel pending message handling */ |
| cancel_event(handle_channel_msg, c, 0); |
| c->ibuf.handling_msg = HandleMsgIdle; |
| } |
| write_stream(&c->chan->out, MARKER_EOS); |
| write_errno(&c->chan->out, err); |
| write_stream(&c->chan->out, MARKER_EOM); |
| lws_flush_with_flags(c, 0); |
| if (output_queue_is_empty(&c->out_queue)) lws_shutdown(c); |
| c->chan->state = ChannelStateDisconnected; |
| lws_post_read(&c->ibuf, c->ibuf.buf, c->ibuf.buf_size); |
| notify_channel_closed(channel); |
| if (channel->disconnected) { |
| channel->disconnected(channel); |
| } |
| else { |
| trace(LOG_PROTOCOL, "channel %#lx disconnected", c); |
| if (channel->protocol != NULL) protocol_release(channel->protocol); |
| } |
| channel->protocol = NULL; |
| } |
| |
| static void handle_channel_msg(void * x) { |
| Trap trap; |
| ChannelWS * c = (ChannelWS *)x; |
| int has_msg; |
| |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| assert(c->ibuf.handling_msg == HandleMsgTriggered); |
| assert(c->ibuf.message_count); |
| |
| has_msg = ibuf_start_message(&c->ibuf); |
| if (has_msg <= 0) { |
| if (has_msg < 0 && c->chan->state != ChannelStateDisconnected) { |
| trace(LOG_PROTOCOL, "Socket is shutdown by remote peer, channel %#lx %s", c, c->chan->peer_name); |
| channel_close(c->chan); |
| } |
| } |
| else if (set_trap(&trap)) { |
| if (c->chan->receive) { |
| c->chan->receive(c->chan); |
| } |
| else { |
| handle_protocol_message(c->chan); |
| assert(c->out_bin_block == NULL); |
| } |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception in message handler: %s", errno_to_str(trap.error)); |
| send_eof_and_close(c->chan, trap.error); |
| } |
| } |
| |
| static void channel_check_pending(Channel * channel) { |
| ChannelWS * c = channel2ws(channel); |
| |
| assert(is_dispatch_thread()); |
| if (c->ibuf.handling_msg == HandleMsgIdle && c->ibuf.message_count) { |
| post_event(handle_channel_msg, c); |
| c->ibuf.handling_msg = HandleMsgTriggered; |
| } |
| } |
| |
| static void lws_trigger_message(InputBuf * ibuf) { |
| ChannelWS * c = ibuf2ws(ibuf); |
| |
| assert(is_dispatch_thread()); |
| assert(c->ibuf.message_count > 0); |
| if (c->ibuf.handling_msg == HandleMsgIdle) { |
| post_event(handle_channel_msg, c); |
| c->ibuf.handling_msg = HandleMsgTriggered; |
| } |
| } |
| |
| static int channel_get_message_count(Channel * channel) { |
| ChannelWS * c = channel2ws(channel); |
| assert(is_dispatch_thread()); |
| if (c->ibuf.handling_msg != HandleMsgTriggered) return 0; |
| return c->ibuf.message_count; |
| } |
| |
| static void lws_channel_read_done(void * x) { |
| ChannelWS * c = (ChannelWS *)x; |
| ssize_t total_length = 0; |
| ssize_t read_length = 0; |
| |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| assert(c->read_pending != 0); |
| assert(c->lock_cnt > 0); |
| |
| |
| c->read_pending = 0; |
| c->read_done_posted = 0; |
| /* some data is available retrieve it */ |
| { |
| total_length = c->inbuf ? c->inbuf->len : 0; |
| |
| if (c->inbuf && c->inbuf->error) { |
| if (c->chan->state != ChannelStateDisconnected) { |
| trace(LOG_ALWAYS, "Can't read from Web Socket: %s", errno_to_str(c->inbuf->error)); |
| } |
| total_length = 0; /* Treat error as EOF */ |
| } |
| if (total_length > 0) { |
| read_length = c->inbuf->len > c->read_buf_size ? c->read_buf_size : c->inbuf->len; |
| memcpy(c->read_buf, c->inbuf->in, read_length); |
| c->inbuf->len -= read_length; |
| if (c->inbuf->len != 0) { |
| memmove((unsigned char *)c->inbuf->in, (unsigned char *)c->inbuf->in + read_length, c->inbuf->len); |
| } |
| } |
| else { |
| /* In case of error, free all received buffers */ |
| while (c->inbuf) { |
| ChannelInputBuffer * old_buf = c->inbuf; |
| c->inbuf = old_buf->next; |
| loc_free(old_buf->in); |
| loc_free(old_buf); |
| } |
| } |
| |
| if (c->inbuf && c->inbuf->len == 0) { |
| ChannelInputBuffer * old_buf = c->inbuf; |
| c->inbuf = old_buf->next; |
| loc_free(old_buf->in); |
| loc_free(old_buf); |
| } |
| } |
| if (c->chan->state != ChannelStateDisconnected) { |
| ibuf_read_done(&c->ibuf, read_length); |
| } |
| else if (total_length > 0) { |
| lws_post_read(&c->ibuf, c->ibuf.buf, c->ibuf.buf_size); |
| } |
| else { |
| lws_unlock(c->chan); |
| } |
| } |
| |
| static void start_channel(Channel * channel) { |
| ChannelWS * c = channel2ws(channel); |
| |
| assert(is_dispatch_thread()); |
| assert(c->magic == CHANNEL_MAGIC); |
| notify_channel_created(c->chan); |
| if (c->chan->connecting) { |
| c->chan->connecting(c->chan); |
| } |
| else { |
| trace(LOG_PROTOCOL, "channel server connecting"); |
| send_hello_message(c->chan); |
| } |
| ibuf_trigger_read(&c->ibuf); |
| } |
| |
| static ChannelWS * create_channel(int is_ssl, int server) { |
| ChannelWS * c; |
| |
| c = (ChannelWS *)loc_alloc_zero(sizeof *c); |
| c->chan = channel_alloc(); |
| channel2ws(c->chan) = c; |
| c->magic = CHANNEL_MAGIC; |
| c->is_ssl = is_ssl; |
| c->chan->inp.read = lws_read_stream; |
| c->chan->inp.peek = lws_peek_stream; |
| c->obuf = output_queue_alloc_obuf(); |
| c->chan->out.cur = c->obuf->buf; |
| c->chan->out.end = c->obuf->buf + sizeof(c->obuf->buf); |
| c->chan->out.write = lws_write_stream; |
| c->chan->out.write_block = lws_write_block_stream; |
| c->chan->out.splice_block = lws_splice_block_stream; |
| list_add_last(&c->chan->chanlink, &channel_root); |
| shutdown_set_normal(&channel_shutdown); |
| c->chan->state = ChannelStateStartWait; |
| c->chan->incoming = server; |
| c->chan->start_comm = start_channel; |
| c->chan->check_pending = channel_check_pending; |
| c->chan->message_count = channel_get_message_count; |
| c->chan->lock = lws_lock; |
| c->chan->unlock = lws_unlock; |
| c->chan->is_closed = lws_is_closed; |
| c->chan->close = send_eof_and_close; |
| ibuf_init(&c->ibuf, &c->chan->inp); |
| c->ibuf.post_read = lws_post_read; |
| c->ibuf.wait_read = lws_wait_read; |
| c->ibuf.trigger_message = lws_trigger_message; |
| c->lock_cnt = 1; |
| output_queue_ini(&c->out_queue); |
| return c; |
| } |
| |
| static void refresh_peer_server(int sock, PeerServer * ps) { |
| unsigned i; |
| const char * transport = peer_server_getprop(ps, "TransportName", NULL); |
| assert(transport != NULL); |
| const char *str_port = peer_server_getprop(ps, "Port", NULL); |
| |
| int ifcind; |
| struct in_addr src_addr; |
| ip_ifc_info ifclist[MAX_IFC]; |
| |
| /* For now, we do not support binding WebSocket to a specific interface; |
| * let's return all interfaces. |
| */ |
| ifcind = build_ifclist(sock, MAX_IFC, ifclist); |
| while (ifcind-- > 0) { |
| char str_host[64]; |
| char str_id[64]; |
| PeerServer * ps2; |
| src_addr.s_addr = ifclist[ifcind].addr; |
| ps2 = peer_server_alloc(); |
| ps2->flags = ps->flags | PS_FLAG_LOCAL | PS_FLAG_DISCOVERABLE; |
| for (i = 0; i < ps->ind; i++) { |
| peer_server_addprop(ps2, loc_strdup(ps->list[i].name), |
| loc_strdup(ps->list[i].value)); |
| } |
| inet_ntop(AF_INET, &src_addr, str_host, sizeof(str_host)); |
| snprintf(str_id, sizeof(str_id), "%s:%s:%s", transport, str_host, str_port); |
| peer_server_addprop(ps2, loc_strdup("ID"), loc_strdup(str_id)); |
| peer_server_addprop(ps2, loc_strdup("Host"), loc_strdup(str_host)); |
| peer_server_addprop(ps2, loc_strdup("Port"), loc_strdup(str_port)); |
| peer_server_add(ps2, PEER_DATA_RETENTION_PERIOD * 2); |
| } |
| } |
| |
| static void refresh_all_peer_servers(void * x) { |
| LINK * l = server_list.next; |
| while (l != &server_list) { |
| ServerWS * si = servlink2np(l); |
| refresh_peer_server(dummy_socket, si->serv.ps); |
| l = l->next; |
| } |
| post_event_with_delay(refresh_all_peer_servers, NULL, PEER_DATA_REFRESH_PERIOD * 1000000); |
| } |
| |
| static void set_peer_addr(ChannelWS * c, struct sockaddr * addr, int addr_len) { |
| char nbuf[128]; |
| /* Create a human readable channel name that uniquely identifies remote peer */ |
| char name[128]; |
| if (addr_len == 0) return; |
| assert(addr->sa_family == AF_INET); |
| snprintf(name, sizeof(name), "%s:%s:%d", |
| c->is_ssl ? "WSS" : "WS", |
| inet_ntop(addr->sa_family, |
| &((struct sockaddr_in *)addr)->sin_addr, |
| nbuf, sizeof(nbuf)), |
| ntohs(((struct sockaddr_in *)addr)->sin_port)); |
| c->chan->peer_name = loc_strdup(name); |
| } |
| |
| |
| static void lws_server_exit(void * x) { |
| ServerWS * s = (ServerWS *) x; |
| if (s->exiting) return; |
| s->exiting = 1; |
| list_remove(&s->serv.servlink); |
| if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root)) |
| shutdown_set_stopped(&channel_shutdown); |
| list_remove(&s->servlink); |
| peer_server_free(s->serv.ps); |
| } |
| |
| static void server_close(ChannelServer * serv) { |
| /* ServerWS * s = server2ws(serv);*/ |
| |
| assert(is_dispatch_thread()); |
| /* Closing a running server is not supported for now. This is |
| * because there is no way (AFAIK) to close a virtual host in |
| * libwebsockets library.*/ |
| /* lws_server_exit(s); */ |
| } |
| |
| /* following _wt_ functions are called from a worker thread so caution is required |
| * to keep its operations thread safe |
| */ |
| |
| static ChannelServer * channel_server_create(PeerServer * ps) { |
| ServerWS * si = (ServerWS *)loc_alloc_zero(sizeof *si); |
| si->serv.close = server_close; |
| si->serv.ps = ps; |
| if (server_list.next == NULL) { |
| list_init(&server_list); |
| post_event_with_delay(refresh_all_peer_servers, NULL, PEER_DATA_REFRESH_PERIOD * 1000000); |
| } |
| list_add_last(&si->serv.servlink, &channel_server_root); |
| shutdown_set_normal(&channel_shutdown); |
| list_add_last(&si->servlink, &server_list); |
| refresh_peer_server(dummy_socket, ps); |
| return &si->serv; |
| } |
| |
| /* LWS service thread */ |
| |
| static void * lws_service_thread(void * x) |
| { |
| struct lws_context_creation_info context_creation_info; |
| int vhost_created = 0; |
| |
| memset(&context_creation_info, 0, sizeof context_creation_info); |
| context_creation_info.gid = -1; |
| context_creation_info.uid = -1; |
| context_creation_info.max_http_header_pool = MAX_CONN_REQUESTS; |
| context_creation_info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT|LWS_SERVER_OPTION_EXPLICIT_VHOSTS; |
| context_creation_info.server_string = "tcf"; |
| |
| lws_ctx = lws_create_context(&context_creation_info); |
| if (lws_ctx == NULL) { |
| trace (LOG_ALWAYS, "Error creating libwebsockets client context: %s", strerror(errno)); |
| return NULL; |
| } |
| |
| while (1) { |
| if (vhost_created) { |
| #if ENABLE_Epoll |
| int i, n; |
| struct epoll_event events[16]; |
| n = epoll_wait (epoll_fd, events, 16, 1000); |
| if (n == 0) { |
| lws_service_fd(lws_ctx, NULL); |
| } |
| else { |
| for (i = 0; i < n; i++) { |
| struct lws_pollfd pollfd; |
| if (events[i].data.fd == dummy_pipe_fds[0]) { |
| char buf; |
| /* consume events; it will be handled at the end of the loop */ |
| if (read(events[i].data.fd, &buf, 1)); |
| continue; |
| } |
| pollfd.fd = events[i].data.fd; |
| EPOLL_TO_POLL_EVENT(pollfd.events, epoll_events[events[i].data.fd]); |
| EPOLL_TO_POLL_EVENT(pollfd.revents, events[i].events); |
| if (lws_service_fd(lws_ctx, &pollfd) < 0) continue; |
| } |
| } |
| #else |
| lws_service(lws_ctx, 1000); |
| #endif |
| } |
| else { |
| usleep (10000); |
| } |
| pthread_mutex_lock(&lws_list_mutex); |
| |
| /* The libwebsockets library only allows up to MAX_CONN_REQUESTS simultaneous |
| * connection requests (this is a parameter provided when creating the context; |
| * exceeding this number of pending requests can leads to crashes in the |
| * libwebsockets library. |
| */ |
| |
| while (!list_is_empty(&server_create_list)) { |
| struct lws_context_creation_info vhost_creation_info; |
| ServerCreateInfo * args = link2sci(server_create_list.next); |
| struct lws_vhost *lws_vh; |
| ServerWS ** si; |
| memset(&vhost_creation_info, 0, sizeof vhost_creation_info); |
| vhost_creation_info.port = args->port; |
| vhost_creation_info.protocols = protocols; |
| vhost_creation_info.options = args->options; |
| vhost_creation_info.ssl_cipher_list = "AES128-SHA256"; |
| |
| /* Currently, the certificate is shared for all connections; |
| * this is specified for the first created connection and used |
| * for subsequent ones. This would need some rework to either |
| * make this a global property of the agent (instead of a per |
| * connection) making this more natural or we should fix the |
| * libwebsockets code to move this info to the connection info level |
| * instead of the context/vhost level. |
| */ |
| vhost_creation_info.ssl_cert_filepath = args->certificate; |
| vhost_creation_info.ssl_private_key_filepath = args->key; |
| vhost_creation_info.ssl_ca_filepath = args->ca_filepath; |
| vhost_creation_info.ssl_cipher_list = args->cipher_list; |
| |
| list_remove(&args->link); |
| pthread_mutex_unlock(&lws_list_mutex); |
| if ((lws_vh = lws_create_vhost(lws_ctx, &vhost_creation_info)) == NULL) { |
| trace (LOG_ALWAYS, "Error creating libwebsockets vhost: %s", strerror(errno)); |
| post_event(lws_server_exit, args->si); |
| } |
| else { |
| vhost_created = 1; |
| si = (ServerWS **)lws_protocol_vh_priv_zalloc(lws_vh, protocols, sizeof(ChannelServer *)); |
| *si = args->si; |
| } |
| loc_free(args); |
| pthread_mutex_lock(&lws_list_mutex); |
| } |
| |
| while (pending_connections < MAX_CONN_REQUESTS && !list_is_empty(&client_connect_list)) { |
| struct lws_client_connect_info client_connect_info; |
| ChannelConnectInfo * args = link2cci(client_connect_list.next); |
| WSIUserData * userdata = (WSIUserData *) loc_alloc_zero(sizeof (WSIUserData)); |
| |
| if (client_vhost == NULL) { |
| struct lws_context_creation_info vhost_creation_info; |
| memset(&vhost_creation_info, 0, sizeof vhost_creation_info); |
| vhost_creation_info.port = CONTEXT_PORT_NO_LISTEN; |
| vhost_creation_info.protocols = protocols; |
| vhost_creation_info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; |
| vhost_creation_info.ssl_ca_filepath = args->ca_filepath; |
| vhost_creation_info.ssl_cert_filepath = args->certificate; |
| vhost_creation_info.ssl_private_key_filepath = args->key; |
| vhost_creation_info.ssl_cipher_list = args->cipher_list; |
| |
| client_vhost = lws_create_vhost(lws_ctx, &vhost_creation_info); |
| } |
| if (client_vhost == NULL) { |
| trace (LOG_ALWAYS, "Error creating libwebsockets client context: %s", strerror(errno)); |
| } |
| else vhost_created = 1; |
| |
| list_remove(&args->link); |
| pending_connections++; |
| pthread_mutex_unlock(&lws_list_mutex); |
| |
| userdata->args = args; |
| |
| memset(&client_connect_info, 0, sizeof(client_connect_info)); |
| client_connect_info.context = lws_ctx; |
| client_connect_info.path = args->get_url; |
| if (args->is_ssl) { |
| client_connect_info.ssl_connection = args->self_signed ? 2 : 1; |
| } |
| client_connect_info.address = args->host; |
| client_connect_info.host = client_connect_info.address; |
| client_connect_info.port = args->port; |
| client_connect_info.origin = client_connect_info.address; |
| client_connect_info.ietf_version_or_minus_one = -1; |
| client_connect_info.client_exts = exts; |
| client_connect_info.userdata = userdata; |
| client_connect_info.vhost = client_vhost; |
| client_connect_info.protocol = "tcf"; |
| |
| if (lws_client_connect_via_info(&client_connect_info)); |
| pthread_mutex_lock(&lws_list_mutex); |
| } |
| pthread_mutex_unlock(&lws_list_mutex); |
| } |
| return NULL; |
| } |
| |
| static ChannelServer * channel_lws_server(PeerServer * ps) { |
| const char * port_str = peer_server_getprop(ps, "Port", NULL); |
| const char * host = peer_server_getprop(ps, "Host", NULL); |
| const char * certificate = peer_server_getprop(ps, "Cert", NULL); |
| const char * key = peer_server_getprop(ps, "Key", NULL); |
| const char * client_cert = peer_server_getprop(ps, "ReqClientCert", NULL); |
| ChannelServer * server; |
| int is_ssl = strcmp(peer_server_getprop(ps, "TransportName", ""), "WSS") == 0; |
| const char * ca_filepath = peer_server_getprop(ps, "CAfile", NULL); |
| const char * cipher_list = peer_server_getprop(ps, "CipherList", NULL); |
| ServerCreateInfo * args; |
| int port = 0; |
| |
| if (port_str != NULL) port = atoi(port_str); |
| if (port == 0) { |
| trace(LOG_ALWAYS, "Specifying a port number in WebSocket server URL is mandatory"); |
| set_fmt_errno(ERR_OTHER, "Specifying a port number in WebSocket server URL is mandatory"); |
| return NULL; |
| } |
| if (host != NULL) { |
| trace(LOG_ALWAYS, "Specifying host in WebSocket server URL is not supported"); |
| set_fmt_errno(ERR_OTHER, "Specifying host in WebSocket server URL is not supported"); |
| return NULL; |
| } |
| if (is_ssl) { |
| if (key == NULL) { |
| trace(LOG_ALWAYS, "Key parameter needs to be specified for secured WebSocket connection"); |
| set_fmt_errno(ERR_OTHER, "Key parameter needs to be specified for secured WebSocket connection"); |
| return NULL; |
| } |
| if (certificate == NULL) { |
| trace(LOG_ALWAYS, "Cert parameter needs to be specified for secured WebSocket connection"); |
| set_fmt_errno(ERR_OTHER, "Cert parameter needs to be specified for secured WebSocket connection"); |
| return NULL; |
| } |
| } |
| |
| args = (ServerCreateInfo *)loc_alloc_zero(sizeof(ServerCreateInfo)); |
| args->port = port_str ? atoi(port_str) : 0; |
| if (is_ssl) { |
| args->certificate = certificate; |
| args->key = key; |
| args->ca_filepath = ca_filepath; |
| args->cipher_list = cipher_list; |
| args->options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; |
| if (client_cert |
| && (strcmp(client_cert, "true") == 0 |
| || strcmp(client_cert, "1") == 0)) { |
| args->options |= LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT; |
| } |
| } |
| server = channel_server_create(ps); |
| args->si = server2ws(server); |
| args->si->is_ssl = is_ssl; |
| pthread_mutex_lock(&lws_list_mutex); |
| list_add_last(&args->link, &server_create_list); |
| #if ENABLE_Epoll |
| channel_lws_abort_epoll(); |
| #else |
| lws_cancel_service(lws_ctx); |
| #endif |
| pthread_mutex_unlock(&lws_list_mutex); |
| return server; |
| } |
| |
| static void server_lws_connect_done(void * x) { |
| SessionData * data = (SessionData *)x; |
| ServerWS * si = data->si; |
| ChannelWS * c; |
| assert (si != NULL); |
| pthread_mutex_lock(&data->mutex); |
| if (data->wsi == NULL) { |
| pthread_mutex_unlock(&data->mutex); |
| return; |
| } |
| c= create_channel(si->is_ssl, 1); |
| data->c = c; |
| c->data = data; |
| lws_lock(c->chan); |
| pthread_mutex_unlock(&data->mutex); |
| set_peer_addr(c, data->addr_buf, data->addr_len); |
| si->serv.new_conn(&si->serv, c->chan); |
| } |
| |
| static void channel_lws_connect_done(void * x) { |
| ChannelConnectInfo * args = (ChannelConnectInfo *)x; |
| pthread_mutex_lock(&lws_list_mutex); |
| pending_connections--; |
| pthread_mutex_unlock(&lws_list_mutex); |
| |
| if (args->error) { |
| if (args->error == EPERM) { |
| args->error = set_fmt_errno(ERR_OTHER, "Failed to handshake the connection h:%s p:%d", args->host, args->port); |
| } |
| else if (args->error == ECONNREFUSED) { |
| args->error = set_fmt_errno(ERR_OTHER, "Failed to establish connection h:%s p:%d", args->host, args->port); |
| } |
| args->callback(args->callback_args, args->error, NULL); |
| loc_free(args->data); |
| } |
| else { |
| ChannelWS * c = create_channel(args->is_ssl, 0); |
| if (c == NULL) { |
| args->callback(args->callback_args, errno, NULL); |
| loc_free(args->data); |
| } |
| else { |
| args->data->c = c; |
| c->data = args->data; |
| lws_lock(c->chan); |
| set_peer_addr(c, args->data->addr_buf, args->data->addr_len); |
| args->callback(args->callback_args, 0, c->chan); |
| } |
| } |
| loc_free(args->host); |
| loc_free(args->get_url); |
| loc_free(args->ca_filepath); |
| loc_free(args->certificate); |
| loc_free(args->cipher_list); |
| loc_free(args->key); |
| loc_free(args); |
| } |
| |
| static void channel_lws_connect(PeerServer * ps, ChannelConnectCallBack callback, void * callback_args) { |
| const char * host = peer_server_getprop(ps, "Host", NULL); |
| const char * port_str = peer_server_getprop(ps, "Port", NULL); |
| const char * get_url = peer_server_getprop(ps, "GetUrl", NULL); |
| const char * self_signed = peer_server_getprop(ps, "SelfSigned", NULL); |
| const char * ca_filepath = peer_server_getprop(ps, "CAfile", NULL); |
| const char * cipher_list = peer_server_getprop(ps, "CipherList", NULL); |
| |
| const char * certificate = peer_server_getprop(ps, "Cert", NULL); |
| const char * key = peer_server_getprop(ps, "Key", NULL); |
| ChannelConnectInfo * args; |
| |
| args = (ChannelConnectInfo *)loc_alloc_zero(sizeof(ChannelConnectInfo)); |
| args->callback = callback; |
| args->callback_args = callback_args; |
| args->is_ssl = strcmp(peer_server_getprop(ps, "TransportName", ""), "WSS") == 0; |
| if (port_str != NULL) args->port = atoi(port_str); |
| args->ca_filepath = (ca_filepath != NULL ? loc_strdup(ca_filepath) : NULL); |
| args->certificate = (certificate != NULL ? loc_strdup(certificate) : NULL); |
| args->cipher_list = (cipher_list != NULL ? loc_strdup(cipher_list) : NULL); |
| args->key = (key != NULL ? loc_strdup(key) : NULL); |
| args->host = loc_strdup(host == NULL ? "127.0.0.1" : host); |
| args->get_url = loc_strdup(get_url == NULL ? "/" : get_url); |
| args->self_signed = self_signed ? (strcmp(self_signed, "true") == 0 || strcmp(self_signed, "1") == 0) : 0; |
| pthread_mutex_lock(&lws_list_mutex); |
| list_add_last(&args->link, &client_connect_list); |
| #if ENABLE_Epoll |
| channel_lws_abort_epoll(); |
| #else |
| lws_cancel_service(lws_ctx); |
| #endif |
| pthread_mutex_unlock(&lws_list_mutex); |
| } |
| |
| static void trace_lws(int level, const char *line) { |
| trace(LOG_PROTOCOL, line); |
| } |
| |
| void channel_lws_get_properties(Channel * channel, char *** prop_names, char *** prop_values, unsigned * prop_cnt) { |
| ChannelWS * c = channel2ws(channel); |
| assert(is_dispatch_thread()); |
| if(c->magic != CHANNEL_MAGIC) { |
| *prop_cnt = 0; |
| return; |
| } |
| assert(c->lock_cnt > 0); |
| *prop_names = c->data->prop_names; |
| *prop_values = c->data->prop_values; |
| *prop_cnt = c->data->prop_cnt; |
| } |
| |
| void ini_channel_lws() { |
| static int initialized = 0; |
| pthread_t thread; |
| #if ENABLE_Epoll |
| struct epoll_event event; |
| assert (POLLIN == EPOLLIN); |
| assert (POLLPRI == EPOLLPRI); |
| assert (POLLOUT == EPOLLOUT ); |
| assert (POLLRDHUP ==EPOLLRDHUP); |
| assert (POLLERR == EPOLLERR); |
| assert (POLLHUP == EPOLLHUP); |
| #endif |
| |
| if (initialized) return; |
| channel_lws_extension_offset = channel_extension(sizeof(ChannelWS *)); |
| #if ENABLE_Epoll |
| epoll_fd = epoll_create1(0); |
| epoll_events = (uint32_t *)loc_alloc_zero(getdtablesize() * sizeof(uint32_t)); |
| if (pipe(dummy_pipe_fds) < 0) check_error(errno); |
| memset(&event, 0, sizeof(struct epoll_event)); |
| event.data.fd = dummy_pipe_fds[0]; |
| event.events = EPOLLIN; |
| epoll_ctl (epoll_fd, EPOLL_CTL_ADD, event.data.fd, &event); |
| #endif |
| if ((log_mode & LOG_PROTOCOL) == 0) lws_set_log_level (0, NULL); |
| else lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO, trace_lws); |
| add_channel_transport("WS", channel_lws_server, channel_lws_connect); |
| add_channel_transport("WSS", channel_lws_server, channel_lws_connect); |
| |
| dummy_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP); |
| pthread_mutex_init(&lws_list_mutex, NULL); |
| pthread_create(&thread, NULL, lws_service_thread, NULL); |
| } |
| #endif /* ENABLE_LibWebSockets */ |