| /******************************************************************************* |
| * Copyright (c) 2009, 2010 Wind River Systems, Inc. and others. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * You may elect to redistribute this code under either of these licenses. |
| * |
| * Contributors: |
| * Wind River Systems - initial API and implementation |
| *******************************************************************************/ |
| |
| /* |
| * TCF Streams - generic streams service. |
| */ |
| |
| #include <config.h> |
| |
| #if SERVICE_Streams |
| |
| #include <assert.h> |
| #include <string.h> |
| #include <framework/channel.h> |
| #include <framework/exceptions.h> |
| #include <framework/json.h> |
| #include <framework/errors.h> |
| #include <framework/trace.h> |
| #include <framework/events.h> |
| #include <framework/myalloc.h> |
| #include <services/streamsservice.h> |
| |
| static const char * STREAMS = "Streams"; |
| |
| typedef struct StreamClient StreamClient; |
| typedef struct ReadRequest ReadRequest; |
| typedef struct WriteRequest WriteRequest; |
| typedef struct Subscription Subscription; |
| |
| #define STREAM_MAGIC 0x29465398 |
| |
| struct VirtualStream { |
| LINK link_all; |
| unsigned magic; |
| char type[256]; |
| unsigned id; |
| unsigned access; |
| VirtualStreamCallBack * callback; |
| void * callback_args; |
| int ref_cnt; |
| int deleted; |
| LINK clients; |
| uint64_t pos; |
| char * buf; |
| size_t buf_len; |
| size_t buf_inp; |
| size_t buf_out; |
| unsigned eos_inp; |
| unsigned eos_out; |
| unsigned data_available_posted; |
| unsigned space_available_posted; |
| }; |
| |
| struct StreamClient { |
| LINK link_hash; |
| LINK link_stream; |
| LINK link_all; |
| LINK read_requests; |
| LINK write_requests; |
| VirtualStream * stream; |
| Channel * channel; |
| uint64_t pos; |
| }; |
| |
| struct ReadRequest { |
| LINK link_client; |
| StreamClient * client; |
| char token[256]; |
| size_t size; |
| }; |
| |
| struct WriteRequest { |
| LINK link_client; |
| StreamClient * client; |
| char token[256]; |
| char * data; |
| size_t offs; |
| size_t size; |
| int eos; |
| }; |
| |
| struct Subscription { |
| LINK link_all; |
| char type[256]; |
| Channel * channel; |
| }; |
| |
| #define hash2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_hash))) |
| #define stream2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_stream))) |
| #define all2client(A) ((StreamClient *)((char *)(A) - offsetof(StreamClient, link_all))) |
| #define all2subscription(A) ((Subscription *)((char *)(A) - offsetof(Subscription, link_all))) |
| #define all2stream(A) ((VirtualStream *)((char *)(A) - offsetof(VirtualStream, link_all))) |
| #define client2read_request(A) ((ReadRequest *)((char *)(A) - offsetof(ReadRequest, link_client))) |
| #define client2write_request(A) ((WriteRequest *)((char *)(A) - offsetof(WriteRequest, link_client))) |
| |
| #define HANDLE_HASH_SIZE (4 * MEM_USAGE_FACTOR - 1) |
| static LINK handle_hash[HANDLE_HASH_SIZE]; |
| static LINK clients; |
| static LINK streams; |
| static LINK subscriptions; |
| static unsigned id_cnt = 0; |
| |
| static unsigned get_client_hash(unsigned id, Channel * c) { |
| return (id + (unsigned)(uintptr_t)c) % HANDLE_HASH_SIZE; |
| } |
| |
| static int str2id(char * s, unsigned * id) { |
| char * p = NULL; |
| if (*s++ != 'V') return 0; |
| if (*s++ != 'S') return 0; |
| *id = (unsigned)strtoul(s, &p, 10); |
| return *p == 0; |
| } |
| |
| void virtual_stream_get_id(VirtualStream * stream, char * id_buf, size_t buf_size) { |
| snprintf(id_buf, buf_size, "VS%u", stream->id); |
| } |
| |
| static StreamClient * find_client(char * s, Channel * c) { |
| unsigned id = 0; |
| if (str2id(s, &id)) { |
| unsigned h = get_client_hash(id, c); |
| LINK * l = handle_hash[h].next; |
| while (l != &handle_hash[h]) { |
| StreamClient * client = hash2client(l); |
| if (client->stream->id == id && client->channel == c) return client; |
| l = l->next; |
| } |
| } |
| errno = ERR_INV_CONTEXT; |
| return NULL; |
| } |
| |
| static void send_event_stream_created(OutputStream * out, VirtualStream * stream, const char * context_id) { |
| char id[256]; |
| |
| virtual_stream_get_id(stream, id, sizeof(id)); |
| write_stringz(out, "E"); |
| write_stringz(out, STREAMS); |
| write_stringz(out, "created"); |
| |
| json_write_string(out, stream->type); |
| write_stream(out, 0); |
| json_write_string(out, id); |
| write_stream(out, 0); |
| json_write_string(out, context_id); |
| write_stream(out, 0); |
| write_stream(out, MARKER_EOM); |
| } |
| |
| static void send_event_stream_disposed(OutputStream * out, VirtualStream * stream) { |
| char id[256]; |
| |
| virtual_stream_get_id(stream, id, sizeof(id)); |
| write_stringz(out, "E"); |
| write_stringz(out, STREAMS); |
| write_stringz(out, "disposed"); |
| |
| json_write_string(out, stream->type); |
| write_stream(out, 0); |
| json_write_string(out, id); |
| write_stream(out, 0); |
| write_stream(out, MARKER_EOM); |
| } |
| |
| static void delete_read_request(ReadRequest * r) { |
| Channel * c = r->client->channel; |
| Trap trap; |
| |
| if (set_trap(&trap)) { |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, r->token); |
| write_stringz(&c->out, "null"); |
| write_errno(&c->out, ERR_COMMAND_CANCELLED); |
| json_write_long(&c->out, 0); |
| write_stream(&c->out, 0); |
| json_write_boolean(&c->out, 1); |
| write_stream(&c->out, 0); |
| write_stream(&c->out, MARKER_EOM); |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception handling pending stream read command: %d %s", |
| trap.error, errno_to_str(trap.error)); |
| } |
| list_remove(&r->link_client); |
| loc_free(r); |
| } |
| |
| static void delete_write_request(WriteRequest * r, int error) { |
| Channel * c = r->client->channel; |
| Trap trap; |
| |
| if (set_trap(&trap)) { |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, r->token); |
| write_errno(&c->out, error); |
| write_stream(&c->out, MARKER_EOM); |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception handling pending stream write command: %d %s", |
| trap.error, errno_to_str(trap.error)); |
| } |
| list_remove(&r->link_client); |
| loc_free(r->data); |
| loc_free(r); |
| } |
| |
| static void delete_stream(void * args) { |
| VirtualStream * stream = (VirtualStream *)args; |
| |
| assert(stream->magic == STREAM_MAGIC); |
| assert(list_is_empty(&stream->clients)); |
| assert(stream->deleted); |
| stream->magic = 0; |
| list_remove(&stream->link_all); |
| loc_free(stream->buf); |
| loc_free(stream); |
| } |
| |
| static void notify_data_available(void * args) { |
| VirtualStream * stream = (VirtualStream *)args; |
| assert(stream->magic == STREAM_MAGIC); |
| stream->data_available_posted = 0; |
| if (stream->deleted || stream->eos_out) return; |
| stream->callback(stream, VS_EVENT_DATA_AVAILABLE, stream->callback_args); |
| } |
| |
| static void notify_space_available(void * args) { |
| VirtualStream * stream = (VirtualStream *)args; |
| assert(stream->magic == STREAM_MAGIC); |
| stream->space_available_posted = 0; |
| if (stream->deleted || stream->eos_inp) return; |
| stream->callback(stream, VS_EVENT_SPACE_AVAILABLE, stream->callback_args); |
| } |
| |
| static void advance_stream_buffer(VirtualStream * stream) { |
| size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; |
| uint64_t min_pos = ~(uint64_t)0; |
| uint64_t buf_pos = stream->pos - len; |
| LINK * l; |
| |
| assert(stream->access & VS_ENABLE_REMOTE_READ); |
| for (l = stream->clients.next; l != &stream->clients; l = l->next) { |
| StreamClient * client = stream2client(l); |
| assert(client->pos <= stream->pos); |
| if (client->pos < min_pos) min_pos = client->pos; |
| } |
| if (min_pos == ~(uint64_t)0) { |
| stream->buf_out = stream->buf_inp; |
| } |
| else if (min_pos > buf_pos) { |
| assert(min_pos - buf_pos <= len); |
| stream->buf_out = (stream->buf_out + (unsigned)(min_pos - buf_pos)) % stream->buf_len; |
| } |
| else if (stream->pos - min_pos >= stream->buf_len) { |
| /* TODO: drop stream data */ |
| assert(0); |
| } |
| if (len != (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len && |
| !stream->space_available_posted) { |
| post_event(notify_space_available, stream); |
| stream->space_available_posted = 1; |
| } |
| } |
| |
| static StreamClient * create_client(VirtualStream * stream, Channel * channel) { |
| size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; |
| StreamClient * client = (StreamClient *)loc_alloc_zero(sizeof(StreamClient)); |
| list_init(&client->link_hash); |
| list_init(&client->link_stream); |
| list_init(&client->link_all); |
| list_init(&client->read_requests); |
| list_init(&client->write_requests); |
| client->stream = stream; |
| client->channel = channel; |
| client->pos = stream->pos - len; |
| list_add_first(&client->link_hash, &handle_hash[get_client_hash(stream->id, channel)]); |
| list_add_first(&client->link_stream, &stream->clients); |
| list_add_first(&client->link_all, &clients); |
| stream->ref_cnt++; |
| return client; |
| } |
| |
| static void delete_client(StreamClient * client) { |
| VirtualStream * stream = client->stream; |
| Trap trap; |
| LINK * n; |
| |
| assert(stream->ref_cnt > 0); |
| if (set_trap(&trap)) { |
| send_event_stream_disposed(&client->channel->out, stream); |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception sending stream deleted event: %d %s", |
| trap.error, errno_to_str(trap.error)); |
| } |
| list_remove(&client->link_hash); |
| list_remove(&client->link_stream); |
| list_remove(&client->link_all); |
| for (n = client->read_requests.next; n != &client->read_requests;) { |
| ReadRequest * r = client2read_request(n); |
| n = n->next; |
| delete_read_request(r); |
| } |
| for (n = client->write_requests.next; n != &client->write_requests;) { |
| WriteRequest * r = client2write_request(n); |
| n = n->next; |
| delete_write_request(r, ERR_COMMAND_CANCELLED); |
| } |
| loc_free(client); |
| if (--stream->ref_cnt == 0) { |
| assert(list_is_empty(&stream->clients)); |
| assert(stream->deleted); |
| post_event(delete_stream, stream); |
| } |
| else if (stream->access & VS_ENABLE_REMOTE_READ) { |
| advance_stream_buffer(stream); |
| } |
| } |
| |
| static void delete_subscription(Subscription * s) { |
| list_remove(&s->link_all); |
| loc_free(s); |
| } |
| |
| static void send_read_reply(StreamClient * client, char * token, size_t size) { |
| VirtualStream * stream = client->stream; |
| Channel * c = client->channel; |
| size_t lost = 0; |
| size_t read1 = 0; |
| size_t read2 = 0; |
| int eos = 0; |
| char * data1 = NULL; |
| char * data2 = NULL; |
| size_t pos; |
| size_t len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; |
| |
| assert(len > 0 || stream->eos_inp); |
| assert(client->pos <= stream->pos); |
| if ((uint64_t)len < stream->pos - client->pos) { |
| lost = (long)(stream->pos - client->pos - len); |
| } |
| else { |
| len = (unsigned)(stream->pos - client->pos); |
| } |
| pos = (stream->buf_inp + stream->buf_len - len) % stream->buf_len; |
| if (len > size) len = size; |
| data1 = stream->buf + pos; |
| if (pos + len <= stream->buf_len) { |
| read1 = len; |
| } |
| else { |
| read1 = stream->buf_len - pos; |
| data2 = stream->buf; |
| read2 = len - read1; |
| } |
| assert(read1 + read2 == len); |
| client->pos += lost + read1 + read2; |
| assert(client->pos <= stream->pos); |
| if (client->pos == stream->pos && stream->eos_inp) eos = 1; |
| assert(eos || lost + read1 + read2 > 0); |
| |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| if (read1 + read2 > 0) { |
| JsonWriteBinaryState state; |
| |
| json_write_binary_start(&state, &c->out, read1 + read2); |
| json_write_binary_data(&state, data1, read1); |
| json_write_binary_data(&state, data2, read2); |
| json_write_binary_end(&state); |
| write_stream(&c->out, 0); |
| } |
| else { |
| write_stringz(&c->out, "null"); |
| } |
| write_errno(&c->out, 0); |
| json_write_long(&c->out, lost); |
| write_stream(&c->out, 0); |
| json_write_boolean(&c->out, eos); |
| write_stream(&c->out, 0); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| |
| void virtual_stream_create(const char * type, const char * context_id, size_t buf_len, unsigned access, |
| VirtualStreamCallBack * callback, void * callback_args, VirtualStream ** res) { |
| LINK * l; |
| VirtualStream * stream = (VirtualStream *)loc_alloc_zero(sizeof(VirtualStream)); |
| |
| buf_len++; |
| list_init(&stream->clients); |
| strlcpy(stream->type, type, sizeof(stream->type)); |
| stream->magic = STREAM_MAGIC; |
| stream->id = id_cnt++; |
| stream->access = access; |
| stream->callback = callback; |
| stream->callback_args = callback_args; |
| stream->ref_cnt = 1; |
| stream->buf = (char *)loc_alloc(buf_len); |
| stream->buf_len = buf_len; |
| for (l = subscriptions.next; l != &subscriptions; l = l->next) { |
| Subscription * h = all2subscription(l); |
| if (strcmp(type, h->type) == 0) { |
| Trap trap; |
| create_client(stream, h->channel); |
| if (set_trap(&trap)) { |
| send_event_stream_created(&h->channel->out, stream, context_id); |
| clear_trap(&trap); |
| } |
| else { |
| trace(LOG_ALWAYS, "Exception sending stream created event: %d %s", |
| trap.error, errno_to_str(trap.error)); |
| } |
| } |
| } |
| list_add_first(&stream->link_all, &streams); |
| *res = stream; |
| } |
| |
| VirtualStream * virtual_stream_find(char * id) { |
| LINK * l; |
| unsigned n = 0; |
| |
| if (str2id(id, &n)) { |
| for (l = streams.next; l != &streams; l = l->next) { |
| VirtualStream * stream = all2stream(l); |
| if (stream->id == n && !stream->deleted) return stream; |
| } |
| } |
| errno = ERR_INV_CONTEXT; |
| return NULL; |
| } |
| |
| int virtual_stream_add_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int eos) { |
| int err = 0; |
| |
| assert(stream->magic == STREAM_MAGIC); |
| if (stream->eos_inp) err = ERR_EOF; |
| |
| if (!err) { |
| size_t len = (stream->buf_out + stream->buf_len - stream->buf_inp - 1) % stream->buf_len; |
| assert(len < stream->buf_len); |
| if (buf_size < len) len = buf_size; |
| if (stream->buf_inp + len <= stream->buf_len) { |
| memcpy(stream->buf + stream->buf_inp, buf, len); |
| } |
| else { |
| size_t x = stream->buf_len - stream->buf_inp; |
| size_t y = len - x; |
| memcpy(stream->buf + stream->buf_inp, buf, x); |
| memcpy(stream->buf, buf + x, y); |
| } |
| stream->buf_inp = (stream->buf_inp + len) % stream->buf_len; |
| stream->pos += len; |
| *data_size = len; |
| if (eos && buf_size == len) stream->eos_inp = 1; |
| } |
| |
| if (stream->access & VS_ENABLE_REMOTE_READ) { |
| if (!err && (stream->eos_inp || *data_size > 0)) { |
| LINK * l; |
| for (l = stream->clients.next; l != &stream->clients; l = l->next) { |
| StreamClient * client = stream2client(l); |
| while (!list_is_empty(&client->read_requests) && (client->pos < stream->pos || stream->eos_inp)) { |
| ReadRequest * r = client2read_request(client->read_requests.next); |
| list_remove(&r->link_client); |
| send_read_reply(client, r->token, r->size); |
| loc_free(r); |
| } |
| } |
| advance_stream_buffer(stream); |
| } |
| } |
| else if (!stream->data_available_posted) { |
| post_event(notify_data_available, stream); |
| stream->data_available_posted = 1; |
| } |
| |
| errno = err; |
| return err ? -1 : 0; |
| } |
| |
| int virtual_stream_get_data(VirtualStream * stream, char * buf, size_t buf_size, size_t * data_size, int * eos) { |
| size_t len; |
| |
| assert(stream->magic == STREAM_MAGIC); |
| len = (stream->buf_inp + stream->buf_len - stream->buf_out) % stream->buf_len; |
| |
| if (len > buf_size) { |
| len = buf_size; |
| *eos = 0; |
| } |
| else { |
| *eos = stream->eos_inp; |
| } |
| *data_size = len; |
| if (*eos) stream->eos_out = 1; |
| if (stream->buf_out + len <= stream->buf_len) { |
| memcpy(buf, stream->buf + stream->buf_out, len); |
| } |
| else { |
| size_t x = stream->buf_len - stream->buf_out; |
| size_t y = len - x; |
| memcpy(buf, stream->buf + stream->buf_out, x); |
| memcpy(buf + x, stream->buf, y); |
| } |
| if (stream->access & VS_ENABLE_REMOTE_WRITE) { |
| LINK * l; |
| for (l = stream->clients.next; l != &stream->clients; l = l->next) { |
| StreamClient * client = stream2client(l); |
| if (!list_is_empty(&client->write_requests)) { |
| WriteRequest * r = client2write_request(client->write_requests.next); |
| size_t done = 0; |
| int error = 0; |
| if (virtual_stream_add_data(client->stream, r->data + r->offs, |
| r->size - r->offs, &done, r->eos) < 0) error = errno; |
| r->offs += done; |
| if (error || r->offs >= r->size) { |
| delete_write_request(r, error); |
| } |
| while (error && !list_is_empty(&client->write_requests)) { |
| r = client2write_request(client->write_requests.next); |
| delete_write_request(r, ERR_COMMAND_CANCELLED); |
| } |
| } |
| } |
| } |
| if ((stream->access & VS_ENABLE_REMOTE_READ) == 0 && len > 0) { |
| stream->buf_out = (stream->buf_out + len) % stream->buf_len; |
| assert(!*eos || stream->buf_out == stream->buf_inp); |
| if (!stream->space_available_posted) { |
| post_event(notify_space_available, stream); |
| stream->space_available_posted = 1; |
| } |
| } |
| return 0; |
| } |
| |
| int virtual_stream_is_empty(VirtualStream * stream) { |
| assert(stream->magic == STREAM_MAGIC); |
| assert(!stream->deleted); |
| return stream->buf_out == stream->buf_inp; |
| } |
| |
| void virtual_stream_delete(VirtualStream * stream) { |
| assert(stream->magic == STREAM_MAGIC); |
| assert(!stream->deleted); |
| stream->deleted = 1; |
| if (--stream->ref_cnt > 0) return; |
| assert(list_is_empty(&stream->clients)); |
| post_event(delete_stream, stream); |
| } |
| |
| static void command_subscribe(char * token, Channel * c) { |
| char type[256]; |
| int err = 0; |
| LINK * l; |
| |
| json_read_string(&c->inp, type, sizeof(type)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| for (l = subscriptions.next; l != &subscriptions;) { |
| Subscription * h = all2subscription(l); |
| l = l->next; |
| if (h->channel == c && strcmp(type, h->type) == 0) { |
| err = ERR_OTHER; |
| break; |
| } |
| } |
| if (err == 0) { |
| Subscription * s = (Subscription *)loc_alloc_zero(sizeof(Subscription)); |
| list_init(&s->link_all); |
| list_add_first(&s->link_all, &subscriptions); |
| strlcpy(s->type, type, sizeof(s->type)); |
| s->channel = c; |
| } |
| |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_errno(&c->out, err); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| |
| static void command_unsubscribe(char * token, Channel * c) { |
| char type[256]; |
| int err = 0; |
| Subscription * s = NULL; |
| LINK * l; |
| |
| json_read_string(&c->inp, type, sizeof(type)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| for (l = subscriptions.next; l != &subscriptions;) { |
| Subscription * h = all2subscription(l); |
| l = l->next; |
| if (h->channel == c && strcmp(type, h->type) == 0) { |
| s = h; |
| break; |
| } |
| } |
| if (s == NULL) err = ERR_INV_CONTEXT; |
| if (err == 0) delete_subscription(s); |
| |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_errno(&c->out, err); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| |
| static void command_read(char * token, Channel * c) { |
| char id[256]; |
| size_t size = 0; |
| StreamClient * client = NULL; |
| int err = 0; |
| |
| json_read_string(&c->inp, id, sizeof(id)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| size = json_read_ulong(&c->inp); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| client = find_client(id, c); |
| if (client == NULL) err = errno; |
| if (!err && (client->stream->access & VS_ENABLE_REMOTE_READ) == 0) err = ERR_UNSUPPORTED; |
| |
| if (err == 0) { |
| VirtualStream * stream = client->stream; |
| if (client->pos == stream->pos && !stream->eos_inp) { |
| ReadRequest * r = (ReadRequest *)loc_alloc_zero(sizeof(ReadRequest)); |
| list_init(&r->link_client); |
| r->client = client; |
| r->size = size; |
| strlcpy(r->token, token, sizeof(r->token)); |
| list_add_last(&r->link_client, &client->read_requests); |
| } |
| else { |
| assert(list_is_empty(&client->read_requests)); |
| assert(client->channel == c); |
| send_read_reply(client, token, size); |
| advance_stream_buffer(stream); |
| } |
| } |
| else { |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_stringz(&c->out, "null"); |
| write_errno(&c->out, err); |
| json_write_long(&c->out, 0); |
| write_stream(&c->out, 0); |
| json_write_boolean(&c->out, 0); |
| write_stream(&c->out, 0); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| } |
| |
| static void command_write(char * token, Channel * c) { |
| char id[256]; |
| StreamClient * client = NULL; |
| long size = 0; |
| long offs = 0; |
| char * data = NULL; |
| int err = 0; |
| |
| json_read_string(&c->inp, id, sizeof(id)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| size = json_read_long(&c->inp); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| |
| client = find_client(id, c); |
| if (client == NULL) err = errno; |
| if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; |
| |
| { |
| JsonReadBinaryState state; |
| size_t data_pos = 0; |
| |
| if (!err && !list_is_empty(&client->write_requests)) data = (char *)loc_alloc(size); |
| |
| json_read_binary_start(&state, &c->inp); |
| for (;;) { |
| if (data != NULL) { |
| size_t rd = json_read_binary_data(&state, data + data_pos, size - offs - data_pos); |
| if (rd == 0) break; |
| data_pos += rd; |
| } |
| else { |
| char buf[256]; |
| size_t rd = json_read_binary_data(&state, buf, sizeof(buf)); |
| if (rd == 0) break; |
| if (!err) { |
| size_t done = 0; |
| if (virtual_stream_add_data(client->stream, buf, rd, &done, 0) < 0) err = errno; |
| assert(done <= rd); |
| offs += done; |
| if (!err && done < rd) { |
| data = (char *)loc_alloc(size - offs); |
| memcpy(data, buf + done, rd - done); |
| data_pos = rd - done; |
| } |
| } |
| } |
| } |
| json_read_binary_end(&state); |
| } |
| |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| if (data != NULL) { |
| WriteRequest * r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); |
| list_init(&r->link_client); |
| r->client = client; |
| r->data = data; |
| r->size = size - offs; |
| strlcpy(r->token, token, sizeof(r->token)); |
| list_add_last(&r->link_client, &client->write_requests); |
| } |
| else { |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_errno(&c->out, err); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| } |
| |
| static void command_eos(char * token, Channel * c) { |
| char id[256]; |
| StreamClient * client = NULL; |
| size_t done = 0; |
| WriteRequest * r = NULL; |
| int err = 0; |
| |
| json_read_string(&c->inp, id, sizeof(id)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| client = find_client(id, c); |
| if (client == NULL) err = errno; |
| if (!err && (client->stream->access & VS_ENABLE_REMOTE_WRITE) == 0) err = ERR_UNSUPPORTED; |
| if (!err && !list_is_empty(&client->write_requests)) r = (WriteRequest *)loc_alloc_zero(sizeof(WriteRequest)); |
| if (!err && r == NULL && virtual_stream_add_data(client->stream, NULL, 0, &done, 1) < 0) err = errno; |
| |
| if (r != NULL) { |
| list_init(&r->link_client); |
| r->client = client; |
| r->eos = 1; |
| strlcpy(r->token, token, sizeof(r->token)); |
| list_add_last(&r->link_client, &client->write_requests); |
| } |
| else { |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_errno(&c->out, err); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| } |
| |
| static void command_connect(char * token, Channel * c) { |
| char id[256]; |
| int err = 0; |
| |
| json_read_string(&c->inp, id, sizeof(id)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| if (find_client(id, c) == NULL) { |
| VirtualStream * stream = virtual_stream_find(id); |
| if (stream == NULL) err = errno; |
| else create_client(stream, c); |
| } |
| |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_errno(&c->out, err); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| |
| static void command_disconnect(char * token, Channel * c) { |
| char id[256]; |
| StreamClient * client = NULL; |
| int err = 0; |
| |
| json_read_string(&c->inp, id, sizeof(id)); |
| if (read_stream(&c->inp) != 0) exception(ERR_JSON_SYNTAX); |
| if (read_stream(&c->inp) != MARKER_EOM) exception(ERR_JSON_SYNTAX); |
| |
| client = find_client(id, c); |
| if (client == NULL) err = errno; |
| if (!err) delete_client(client); |
| |
| write_stringz(&c->out, "R"); |
| write_stringz(&c->out, token); |
| write_errno(&c->out, err); |
| write_stream(&c->out, MARKER_EOM); |
| } |
| |
| static void channel_close_listener(Channel * c) { |
| LINK * l = NULL; |
| |
| for (l = clients.next; l != &clients;) { |
| StreamClient * client = all2client(l); |
| l = l->next; |
| if (client->channel == c) { |
| trace(LOG_ALWAYS, "Stream is left connected by client: VS%d", client->stream->id); |
| delete_client(client); |
| } |
| } |
| |
| for (l = subscriptions.next; l != &subscriptions;) { |
| Subscription * h = all2subscription(l); |
| l = l->next; |
| if (h->channel == c) { |
| delete_subscription(h); |
| } |
| } |
| } |
| |
| void ini_streams_service(Protocol * proto) { |
| int i; |
| |
| list_init(&clients); |
| list_init(&streams); |
| list_init(&subscriptions); |
| for (i = 0; i < HANDLE_HASH_SIZE; i++) { |
| list_init(&handle_hash[i]); |
| } |
| |
| add_channel_close_listener(channel_close_listener); |
| |
| add_command_handler(proto, STREAMS, "subscribe", command_subscribe); |
| add_command_handler(proto, STREAMS, "unsubscribe", command_unsubscribe); |
| add_command_handler(proto, STREAMS, "read", command_read); |
| add_command_handler(proto, STREAMS, "write", command_write); |
| add_command_handler(proto, STREAMS, "eos", command_eos); |
| add_command_handler(proto, STREAMS, "connect", command_connect); |
| add_command_handler(proto, STREAMS, "disconnect", command_disconnect); |
| } |
| |
| #endif /* SERVICE_Streams */ |