blob: 849114b4ac4f91098ec7f11de114509259ef453c [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2018 Xilinx, Inc. and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
* You may elect to redistribute this code under either of these licenses.
*
* Contributors:
* Xilinx - initial API and implementation
*******************************************************************************/
/*
* This module supports execution of TCF commands over HTTP connection.
*/
#include <tcf/config.h>
#if ENABLE_HttpServer
#include <assert.h>
#include <tcf/framework/ip_ifc.h>
#include <tcf/framework/mdep-inet.h>
#include <tcf/framework/json.h>
#include <tcf/framework/trace.h>
#include <tcf/framework/events.h>
#include <tcf/framework/errors.h>
#include <tcf/framework/exceptions.h>
#include <tcf/framework/myalloc.h>
#include <tcf/framework/streams.h>
#include <tcf/framework/protocol.h>
#include <tcf/http/http.h>
#include <tcf/http/http-tcf.h>
#define MAX_IFC 10
#define BUF_SIZE 0x1000
typedef struct ClientData {
LINK link_all;
Channel channel;
unsigned lock_cnt;
char * id;
char * cmd_data;
size_t cmd_size;
ByteArrayOutputStream out_buf;
OutputStream * http_out;
OutputStream * sse_out;
LINK link_messages;
char * wait_token;
int wait_done;
int sse_posted;
unsigned timeout;
} ClientData;
typedef struct Message {
LINK link;
char type[4];
char token[64];
char service[64];
char command[64];
char ** args_buf;
unsigned arg_cnt;
unsigned arg_max;
int error;
} Message;
static unsigned token_cnt = 0;
static ChannelServer server = { NULL };
static int server_sock = -1;
static LINK link_clients;
#define inp2channel(x) ((Channel *)((char *)(x) - offsetof(Channel, inp)))
#define out2channel(x) ((Channel *)((char *)(x) - offsetof(Channel, out)))
#define link2msg(x) ((Message *)((char *)(x) - offsetof(Message, link)))
#define all2client(x) ((ClientData *)((char *)(x) - offsetof(ClientData, link_all)))
#define channel2client(x) ((ClientData *)((char *)(x) - offsetof(ClientData, channel)))
static void close_client(ClientData * client) {
Channel * channel = &client->channel;
list_remove(&client->link_all);
channel->state = ChannelStateDisconnected;
notify_channel_closed(channel);
if (channel->disconnected) {
channel->disconnected(channel);
}
else {
trace(LOG_PROTOCOL, "channel %#" PRIxPTR " disconnected", (uintptr_t)channel);
if (channel->protocol != NULL) protocol_release(channel->protocol);
}
channel->protocol = NULL;
channel_unlock(channel);
}
static void free_message(Message * m) {
unsigned i;
for (i = 0; i < m->arg_cnt; i++) {
loc_free(m->args_buf[i]);
}
loc_free(m->args_buf);
loc_free(m);
}
static void http_message(OutputStream * out, Message * m) {
write_string(out, "{\n");
if (m->error != 0) {
json_write_string(out, "Error");
write_string(out, " : ");
json_write_string(out, errno_to_str(m->error));
write_string(out, ",\n");
}
if (m->token[0]) {
json_write_string(out, "Token");
write_string(out, " : ");
json_write_string(out, m->token);
write_string(out, ",\n");
}
if (m->service[0]) {
json_write_string(out, "Service");
write_string(out, " : ");
json_write_string(out, m->service);
write_string(out, ",\n");
}
if (m->command[0]) {
if (strcmp(m->type, "E") == 0) {
json_write_string(out, "Event");
}
else {
json_write_string(out, "Command");
}
write_string(out, " : ");
json_write_string(out, m->command);
write_string(out, ",\n");
}
json_write_string(out, "Type");
write_string(out, " : ");
json_write_string(out, m->type);
if (m->arg_cnt == 0) {
write_stream(out, '\n');
}
else {
unsigned i;
write_string(out, ",\n");
json_write_string(out, "Args");
write_string(out, " : [\n");
for (i = 0; i < m->arg_cnt; i++) {
write_string(out, " ");
if (m->args_buf[i] == NULL) {
write_string(out, "null");
}
else {
write_string(out, m->args_buf[i]);
}
if (i < m->arg_cnt - 1) write_stream(out, ',');
write_stream(out, '\n');
}
write_string(out, "]\n");
}
write_string(out, "}\n");
}
static void send_http_reply(ClientData * cd) {
OutputStream * out = cd->http_out;
write_string(out, "[\n");
while (!list_is_empty(&cd->link_messages)) {
Message * m = link2msg(cd->link_messages.next);
list_remove(&m->link);
http_message(out, m);
if (!list_is_empty(&cd->link_messages)) write_stream(out, ',');
write_stream(out, '\n');
free_message(m);
}
write_string(out, "]\n");
}
static void read_stringz(InputStream * inp, char * str, size_t size) {
unsigned len = 0;
for (;;) {
int ch = read_stream(inp);
if (ch <= 0) {
if (ch == 0) break;
exception(ERR_PROTOCOL);
}
if (len < size - 1) str[len++] = (char)ch;
}
str[len] = 0;
}
static void read_args(InputStream * inp, Message * m) {
for (;;) {
char * arg = NULL;
int ch = peek_stream(inp);
if (ch == MARKER_EOS) break;
if (ch != MARKER_EOA) {
arg = json_read_object(inp);
ch = peek_stream(inp);
}
if (ch != MARKER_EOA) exception(ERR_PROTOCOL);
if (m->arg_cnt >= m->arg_max) {
m->arg_max += 16;
m->args_buf = (char **)loc_realloc(m->args_buf, m->arg_max * sizeof(char *));
}
m->args_buf[m->arg_cnt++] = arg;
read_stream(inp);
}
}
static void sse_event(void * arg) {
ClientData * cd = (ClientData *)arg;
cd->sse_posted = 0;
if (cd->sse_out != NULL && !list_is_empty(&cd->link_messages)) {
http_resume(cd->sse_out);
http_printf("data: event\n");
http_flush();
}
}
static void http_channel_lock(Channel * channel) {
ClientData * client = channel2client(channel);
client->lock_cnt++;
}
static void http_channel_unlock(Channel * channel) {
ClientData * client = channel2client(channel);
assert(client->lock_cnt > 0);
client->lock_cnt--;
if (client->lock_cnt == 0) {
ClientData * cd = channel2client(channel);
channel_clear_broadcast_group(channel);
while (!list_is_empty(&cd->link_messages)) {
Message * e = link2msg(cd->link_messages.next);
list_remove(&e->link);
free_message(e);
}
loc_free(channel->peer_name);
loc_free(cd->out_buf.mem);
loc_free(cd->cmd_data);
loc_free(cd->id);
loc_free(cd);
}
}
static void http_channel_start_comm(Channel * channel) {
notify_channel_created(channel);
if (channel->connecting) {
channel->connecting(channel);
}
else {
send_hello_message(channel);
}
}
static int http_channel_read(InputStream * inp) {
if (inp->cur == NULL) return MARKER_EOS;
if (inp->cur < inp->end) return *inp->cur++;
inp->cur = inp->end = NULL;
return MARKER_EOM;
}
static int http_channel_peek(InputStream * inp) {
if (inp->cur == NULL) return MARKER_EOS;
if (inp->cur < inp->end) return *inp->cur;
return MARKER_EOM;
}
static void http_channel_write(OutputStream * out, int byte) {
Channel * ch = out2channel(out);
ClientData * cd = channel2client(ch);
if (ch->state == ChannelStateDisconnected) return;
if (byte == MARKER_EOM) {
Trap trap;
int reply = 0;
char * data = NULL;
size_t size = 0;
ByteArrayInputStream inp_buf;
InputStream * inp = NULL;
Message * m = (Message *)loc_alloc_zero(sizeof(Message));
get_byte_array_output_stream_data(&cd->out_buf, &data, &size);
inp = create_byte_array_input_stream(&inp_buf, data, size);
if (set_trap(&trap)) {
read_stringz(inp, m->type, sizeof(m->type));
if (strcmp(m->type, "C") == 0) {
read_stringz(inp, m->token, sizeof(m->token));
read_stringz(inp, m->service, sizeof(m->service));
read_stringz(inp, m->command, sizeof(m->command));
read_args(inp, m);
}
else if (strcmp(m->type, "P") == 0) {
read_stringz(inp, m->token, sizeof(m->token));
read_args(inp, m);
}
else if (strcmp(m->type, "R") == 0) {
read_stringz(inp, m->token, sizeof(m->token));
read_args(inp, m);
reply = 1;
}
else if (strcmp(m->type, "E") == 0) {
read_stringz(inp, m->service, sizeof(m->service));
read_stringz(inp, m->command, sizeof(m->command));
read_args(inp, m);
}
else if (strcmp(m->type, "N") == 0) {
read_stringz(inp, m->token, sizeof(m->token));
reply = 1;
}
else {
exception(ERR_PROTOCOL);
}
clear_trap(&trap);
}
m->error = trap.error;
list_add_last(&m->link, &cd->link_messages);
if (reply && cd->wait_token != NULL && strcmp(cd->wait_token, m->token) == 0) {
assert(cd->http_out != NULL);
cd->wait_done = 1;
if (cd->http_out != get_http_stream()) {
http_resume(cd->http_out);
send_http_reply(cd);
cd->http_out = NULL;
http_flush();
loc_free(cd->wait_token);
cd->wait_token = NULL;
cd->wait_done = 0;
}
}
loc_free(data);
if (cd->sse_out != NULL && !list_is_empty(&cd->link_messages) && !cd->sse_posted) {
post_event(sse_event, cd);
cd->sse_posted = 1;
}
}
else {
write_stream(&cd->out_buf.out, byte);
}
}
static void http_channel_write_block(OutputStream * out, const char * bytes, size_t size) {
unsigned i;
for (i = 0; i < size; i++) http_channel_write(out, (unsigned char)bytes[i]);
}
static ssize_t http_channel_splice_block(OutputStream * out, int fd, size_t size, int64_t * offset) {
ssize_t rd;
char buffer[BUF_SIZE];
assert(is_dispatch_thread());
if (size == 0) return 0;
if (size > BUF_SIZE) size = BUF_SIZE;
if (offset != NULL) {
rd = pread(fd, buffer, size, (off_t)*offset);
if (rd > 0) *offset += rd;
}
else {
rd = read(fd, buffer, size);
}
if (rd > 0) http_channel_write_block(out, buffer, rd);
return rd;
}
static void timer_event(void * arg) {
LINK * l = link_clients.next;
while (l != &link_clients) {
ClientData * x = all2client(l);
l = l->next;
x->timeout++;
if (x->timeout > 10) {
close_client(x);
}
}
if (list_is_empty(&link_clients)) return;
post_event_with_delay(timer_event, NULL, 60000000);
}
static void refresh_server_info(int sock, PeerServer * ps) {
unsigned i;
struct sockaddr_in sin;
#if defined(_WRS_KERNEL)
int sinlen;
#else
socklen_t sinlen;
#endif
const char * str_port = peer_server_getprop(ps, "Port", NULL);
struct in_addr src_addr;
ip_ifc_info if_list[MAX_IFC];
int if_cnt = build_ifclist(sock, MAX_IFC, if_list);
sinlen = sizeof(sin);
if (getsockname(sock, (struct sockaddr *)&sin, &sinlen) != 0) {
trace(LOG_ALWAYS, "refresh_server_info: getsockname error: %s", errno_to_str(errno));
return;
}
while (if_cnt-- > 0) {
char str_host[64];
PeerServer * ps2;
if (sin.sin_addr.s_addr != INADDR_ANY &&
(if_list[if_cnt].addr & if_list[if_cnt].mask) !=
(sin.sin_addr.s_addr & if_list[if_cnt].mask)) {
continue;
}
src_addr.s_addr = if_list[if_cnt].addr;
ps2 = peer_server_alloc();
ps2->flags = ps->flags | PS_FLAG_LOCAL | PS_FLAG_DISCOVERABLE;
for (i = 0; i < ps->ind; i++) {
peer_server_addprop(ps2, loc_strdup(ps->list[i].name), loc_strdup(ps->list[i].value));
}
inet_ntop(AF_INET, &src_addr, str_host, sizeof(str_host));
peer_server_addprop(ps2, loc_strdup("ID"), loc_printf("HTTP:%s:%s", str_host, str_port));
peer_server_addprop(ps2, loc_strdup("Host"), loc_strdup(str_host));
peer_server_addprop(ps2, loc_strdup("Port"), loc_strdup(str_port));
peer_server_add(ps2, PEER_DATA_RETENTION_PERIOD * 2);
}
}
static void refresh_server_info_event(void * x) {
refresh_server_info(server_sock, server.ps);
post_event_with_delay(refresh_server_info_event, NULL, PEER_DATA_REFRESH_PERIOD * 1000000);
}
static ClientData * find_client(void) {
ClientData * cd = NULL;
HttpParam * h = get_http_headers();
const char * id = "";
LINK * l = NULL;
while (h != NULL) {
if (strcmp(h->name, "X-Session-ID") == 0) {
id = h->value;
break;
}
h = h->next;
}
for (l = link_clients.next; l != &link_clients; l = l->next) {
ClientData * x = all2client(l);
if (strcmp(x->id, id) == 0) return x;
}
if (list_is_empty(&link_clients)) post_event_with_delay(timer_event, NULL, 60000000);
cd = (ClientData *)loc_alloc_zero(sizeof(ClientData));
cd->lock_cnt = 1;
cd->channel.state = ChannelStateStartWait;
cd->channel.lock = http_channel_lock;
cd->channel.unlock = http_channel_unlock;
cd->channel.start_comm = http_channel_start_comm;
cd->channel.inp.peek = http_channel_peek;
cd->channel.inp.read = http_channel_read;
cd->channel.out.write = http_channel_write;
cd->channel.out.write_block = http_channel_write_block;
cd->channel.out.splice_block = http_channel_splice_block;
cd->channel.peer_name = loc_printf("HTTP:%s", id);
create_byte_array_output_stream(&cd->out_buf);
list_init(&cd->link_messages);
cd->id = loc_strdup(id);
list_add_first(&cd->link_all, &link_clients);
if (cd->channel.state == ChannelStateStartWait) {
server.new_conn(&server, &cd->channel);
}
return cd;
}
static unsigned decode_digit(char * s, unsigned * i) {
unsigned d = 0;
char ch = s[*i];
if (ch >= '0' && ch <= '9') {
d = ch - '0';
(*i)++;
}
else if (ch >= 'A' && ch <= 'F') {
d = ch - 'A' + 10;
(*i)++;
}
else if (ch >= 'a' && ch <= 'f') {
d = ch - 'a' + 10;
(*i)++;
}
return d;
}
static void decode(char * s) {
unsigned i = 0;
unsigned j = 0;
for (;;) {
char ch = s[i++];
if (ch == '%') {
unsigned d1 = decode_digit(s, &i);
unsigned d2 = decode_digit(s, &i);
ch = (char)((d1 << 4) | d2);
}
s[j++] = ch;
if (ch == 0) break;
}
}
static int get_page(const char * uri) {
int no_token = 0;
const char * type = NULL;
if (strncmp(uri, "/tcf/s/", 7) == 0) {
type = "C"; no_token = 1;
}
else if (strncmp(uri, "/tcf/c/", 7) == 0) type = "C";
else if (strncmp(uri, "/tcf/e/", 7) == 0) type = "E";
if (type != NULL) {
unsigned i = 7;
unsigned j = i;
char * token = NULL;
char * service = NULL;
char * command = NULL;
char * query = NULL;
char ** attribute = NULL;
unsigned attribute_cnt = 0;
unsigned attribute_max = 0;
char sep = '/';
for (;;) {
char ch = uri[j];
switch (ch) {
case 0:
case '/':
case '?':
case '&':
if ((sep == '?' || sep == '&') && (ch == '?' || ch == '/')) {
/* '?' and '/' may appear unencoded as data within the query */
break;
}
else {
char * s = (char *)tmp_alloc_zero(j - i + 1);
memcpy(s, uri + i, j - i);
decode(s);
if (sep == '/') {
if (type[0] == 'C' && token == NULL && !no_token) token = s;
else if (service == NULL) service = s;
else if (command == NULL) command = s;
else return 0;
}
else if (sep == '?') {
if (query == NULL) query = s;
else return 0;
}
else if (sep == '&') {
if (attribute_cnt >= attribute_max) {
attribute_max += 8;
attribute = (char **)tmp_realloc(attribute, attribute_max * sizeof(char *));
}
attribute[attribute_cnt++] = s;
}
}
i = j + 1;
sep = ch;
break;
}
if (sep == 0) break;
j++;
}
if (service != NULL && command != NULL) {
Trap trap;
ClientData * cd = find_client();
Channel * ch = &cd->channel;
ch->inp.cur = NULL;
ch->inp.end = NULL;
loc_free(cd->cmd_data);
cd->cmd_data = NULL;
cd->cmd_size = 0;
cd->wait_done = 0;
cd->timeout = 0;
cd->http_out = get_http_stream();
http_content_type("application/json");
if (set_trap(&trap)) {
ByteArrayOutputStream buf;
OutputStream * out = create_byte_array_output_stream(&buf);
if (is_channel_closed(ch)) exception(ERR_CHANNEL_CLOSED);
if (ch->state != ChannelStateConnected) {
ch->state = ChannelStateConnected;
notify_channel_opened(ch);
if (ch->connected) ch->connected(ch);
}
assert(cd->wait_token == NULL);
write_stringz(out, type);
if (type[0] == 'C') {
if (token != NULL) {
write_stringz(out, token);
}
else {
cd->wait_token = loc_printf("%04X", token_cnt++ & 0xffff);
write_stringz(out, cd->wait_token);
}
}
write_stringz(out, service);
write_stringz(out, command);
if (query != NULL) {
write_stringz(out, query);
for (i = 0; i < attribute_cnt; i++) write_stringz(out, attribute[i]);
}
get_byte_array_output_stream_data(&buf, &cd->cmd_data, &cd->cmd_size);
ch->inp.cur = (unsigned char *)cd->cmd_data;
ch->inp.end = ch->inp.cur + cd->cmd_size;
handle_protocol_message(ch);
if (cd->wait_token != NULL && !cd->wait_done) {
assert(cd->http_out != NULL);
http_suspend();
}
else {
send_http_reply(cd);
loc_free(cd->wait_token);
cd->wait_token = NULL;
cd->wait_done = 0;
cd->http_out = NULL;
}
clear_trap(&trap);
}
else {
OutputStream * out = cd->http_out;
write_string(out, "[\n");
write_string(out, "{\n");
json_write_string(out, "Error");
write_string(out, " : ");
json_write_string(out, errno_to_str(trap.error));
write_string(out, "\n");
write_string(out, "}\n");
write_string(out, "]\n");
loc_free(cd->wait_token);
cd->wait_token = NULL;
cd->wait_done = 0;
cd->http_out = NULL;
}
return 1;
}
}
else if (strcmp(uri, "/tcf/sse") == 0) {
ClientData * cd = find_client();
cd->sse_out = get_http_stream();
http_content_type("text/event-stream");
return 1;
}
return 0;
}
void http_connection_closed(OutputStream * out) {
LINK * l;
for (l = link_clients.next; l != &link_clients; l = l->next) {
ClientData * cd = all2client(l);
if (cd->http_out == out) {
cd->http_out = NULL;
if (cd->wait_token != NULL) {
loc_free(cd->wait_token);
cd->wait_token = NULL;
cd->wait_done = 0;
}
}
if (cd->sse_out == out) {
cd->sse_out = NULL;
}
}
}
ChannelServer * ini_http_tcf(int sock, PeerServer * ps) {
static HttpListener l = { get_page };
assert(list_is_empty(&server.servlink));
add_http_listener(&l);
server_sock = sock;
server.ps = ps;
list_init(&link_clients);
list_add_last(&server.servlink, &channel_server_root);
post_event(refresh_server_info_event, NULL);
return &server;
}
#endif