blob: c0f28883d08aa9a042fd6ef8b24e388be37a7206 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2010-2018 Wind River Systems, Inc. and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
* You may elect to redistribute this code under either of these licenses.
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
/*
* Implements input and output stream over named pipe transport.
*/
#include <tcf/config.h>
#if defined(_WIN32) || defined(__CYGWIN__)
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <tcf/framework/mdep-fs.h>
#include <tcf/framework/channel_pipe.h>
#include <tcf/framework/myalloc.h>
#include <tcf/framework/protocol.h>
#include <tcf/framework/errors.h>
#include <tcf/framework/events.h>
#include <tcf/framework/exceptions.h>
#include <tcf/framework/trace.h>
#include <tcf/framework/json.h>
#include <tcf/framework/peer.h>
#include <tcf/framework/asyncreq.h>
#include <tcf/framework/inputbuf.h>
#include <tcf/framework/outputbuf.h>
#define BUF_SIZE (128 * MEM_USAGE_FACTOR)
#define CHANNEL_MAGIC 0x52376532
#define SERVER_INSTANCE_CNT 16
#define DEFAULT_PIPE_NAME "TCF-Agent"
#define DEFAULT_PIPE_DIR "//./pipe/"
static const char * def_pipe_name = DEFAULT_PIPE_DIR DEFAULT_PIPE_NAME;
static const char * attr_pipe_name = "PipeName";
typedef struct ChannelPIPE ChannelPIPE;
typedef struct ServerPIPE ServerPIPE;
typedef struct ServerInstance ServerInstance;
struct ChannelPIPE {
Channel * chan; /* Public channel information */
int magic; /* Magic number */
int lock_cnt; /* Stream lock count, when > 0 channel cannot be deleted */
int fd_inp;
int fd_out;
ServerInstance * server;
/* Input stream buffer */
InputBuf ibuf;
/* Output stream state */
int out_flush_cnt;
unsigned char obuf[BUF_SIZE];
OutputQueue out_queue;
AsyncReqInfo out_req;
/* Async read request */
AsyncReqInfo rd_req;
int read_pending;
};
struct ServerInstance {
ServerPIPE * server;
int index;
int fd_inp;
int fd_out;
#if defined(_WIN32) && !defined(__CYGWIN__)
HANDLE pipe;
AsyncReqInfo req;
#endif
};
struct ServerPIPE {
ChannelServer serv;
int closed;
LINK servlink;
ServerInstance arr[SERVER_INSTANCE_CNT];
};
static size_t channel_pipe_extension_offset = 0;
#define EXT(ctx) ((ChannelPIPE **)((char *)(ctx) + channel_pipe_extension_offset))
#define channel2pipe(A) (*EXT(A))
#define inp2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, inp)))
#define out2channel(A) ((Channel *)((char *)(A) - offsetof(Channel, out)))
#define server2pipe(A) ((ServerPIPE *)((char *)(A) - offsetof(ServerPIPE, serv)))
#define servlink2pipe(A) ((ServerPIPE *)((char *)(A) - offsetof(ServerPIPE, servlink)))
#define ibuf2pipe(A) ((ChannelPIPE *)((char *)(A) - offsetof(ChannelPIPE, ibuf)))
#define obuf2pipe(A) ((ChannelPIPE *)((char *)(A) - offsetof(ChannelPIPE, out_queue)))
static LINK server_list = TCF_LIST_INIT(server_list);
static void pipe_read_done(void * x);
static void handle_channel_msg(void * x);
static void close_input_pipe(ChannelPIPE * c);
static void close_output_pipe(ChannelPIPE * c);
static void delete_channel(ChannelPIPE * c) {
trace(LOG_PROTOCOL, "Deleting channel %#" PRIxPTR, (uintptr_t)c);
assert(c->lock_cnt == 0);
assert(c->out_flush_cnt == 0);
assert(c->magic == CHANNEL_MAGIC);
assert(c->read_pending == 0);
assert(c->ibuf.handling_msg != HandleMsgTriggered);
assert(output_queue_is_empty(&c->out_queue));
output_queue_clear(&c->out_queue);
channel_clear_broadcast_group(c->chan);
close_input_pipe(c);
list_remove(&c->chan->chanlink);
if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
shutdown_set_stopped(&channel_shutdown);
c->magic = 0;
loc_free(c->ibuf.buf);
loc_free(c->chan->peer_name);
channel_free(c->chan);
loc_free(c);
}
static void pipe_lock(Channel * channel) {
ChannelPIPE * c = channel2pipe(channel);
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
c->lock_cnt++;
}
static void pipe_unlock(Channel * channel) {
ChannelPIPE * c = channel2pipe(channel);
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->lock_cnt > 0);
c->lock_cnt--;
if (c->lock_cnt == 0) {
assert(!c->read_pending);
delete_channel(c);
}
}
static int pipe_is_closed(Channel * channel) {
ChannelPIPE * c = channel2pipe(channel);
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->lock_cnt > 0);
return c->chan->state == ChannelStateDisconnected;
}
static void done_write_request(void * args) {
ChannelPIPE * c = (ChannelPIPE *)((AsyncReqInfo *)args)->client_data;
int size = 0;
int error = 0;
assert(args == &c->out_req);
if (c->out_req.u.fio.rval < 0) error = c->out_req.error;
else size = c->out_req.u.fio.rval;
output_queue_done(&c->out_queue, error, size);
if (output_queue_is_empty(&c->out_queue) &&
c->chan->state == ChannelStateDisconnected) close_output_pipe(c);
pipe_unlock(c->chan);
}
static void post_write_request(OutputBuffer * bf) {
ChannelPIPE * c = obuf2pipe(bf->queue);
c->out_req.client_data = c;
c->out_req.done = done_write_request;
c->out_req.type = AsyncReqWrite;
c->out_req.u.fio.fd = c->fd_out;
c->out_req.u.fio.bufp = bf->buf + bf->buf_pos;
c->out_req.u.fio.bufsz = bf->buf_len - bf->buf_pos;
async_req_post(&c->out_req);
pipe_lock(c->chan);
}
static void create_write_request(ChannelPIPE * c, const void * buf, size_t size) {
if (c->chan->state == ChannelStateDisconnected) return;
c->out_queue.post_io_request = post_write_request;
output_queue_add(&c->out_queue, buf, size);
}
static void pipe_flush(ChannelPIPE * c) {
unsigned char * p = c->obuf;
unsigned char * e = c->chan->out.cur;
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->chan->out.end == p + sizeof(c->obuf));
if (e == p) return;
assert(e >= p && e <= p + sizeof(c->obuf));
create_write_request(c, p, e - p);
c->chan->out.cur = p;
}
static void pipe_flush_event(void * x) {
ChannelPIPE * c = (ChannelPIPE *)x;
assert(c->magic == CHANNEL_MAGIC);
if (--c->out_flush_cnt == 0) {
int congestion_level = c->chan->congestion_level;
if (congestion_level > 0) usleep(congestion_level * 2500);
pipe_flush(c);
pipe_unlock(c->chan);
}
}
static void pipe_write_stream(OutputStream * out, int byte) {
ChannelPIPE * c = channel2pipe(out2channel(out));
assert(c->magic == CHANNEL_MAGIC);
if (c->chan->state == ChannelStateDisconnected) return;
if (c->chan->out.cur == c->chan->out.end) pipe_flush(c);
if (byte < 0 || byte == ESC) {
char esc = 0;
*c->chan->out.cur++ = ESC;
if (byte == ESC) esc = 0;
else if (byte == MARKER_EOM) esc = 1;
else if (byte == MARKER_EOS) esc = 2;
else assert(0);
if (c->chan->state == ChannelStateDisconnected) return;
if (c->chan->out.cur == c->chan->out.end) pipe_flush(c);
*c->chan->out.cur++ = esc;
if (byte == MARKER_EOM && c->out_flush_cnt < 2) {
if (c->out_flush_cnt++ == 0) pipe_lock(c->chan);
post_event_with_delay(pipe_flush_event, c, 0);
}
return;
}
*c->chan->out.cur++ = (char)byte;
}
static void pipe_write_block_stream(OutputStream * out, const char * bytes, size_t size) {
size_t cnt = 0;
ChannelPIPE * c = channel2pipe(out2channel(out));
if (out->supports_zero_copy && size > 32) {
/* Send the binary data escape seq */
size_t n = size;
if (c->chan->out.cur >= c->chan->out.end - 8) pipe_flush(c);
*c->chan->out.cur++ = ESC;
*c->chan->out.cur++ = 3;
for (;;) {
if (n <= 0x7fu) {
*c->chan->out.cur++ = (char)n;
break;
}
*c->chan->out.cur++ = (n & 0x7fu) | 0x80u;
n = n >> 7;
}
/* We need to flush the buffer then send our data */
pipe_flush(c);
create_write_request(c, bytes, size);
return;
}
while (cnt < size) write_stream(out, (unsigned char)bytes[cnt++]);
}
static ssize_t pipe_splice_block_stream(OutputStream * out, int fd, size_t size, int64_t * offset) {
ssize_t rd = 0;
char buffer[BUF_SIZE];
assert(is_dispatch_thread());
if (size == 0) return 0;
if (size > BUF_SIZE) size = BUF_SIZE;
if (offset != NULL) {
rd = pread(fd, buffer, size, (off_t)*offset);
if (rd > 0) *offset += rd;
}
else {
rd = read(fd, buffer, size);
}
if (rd > 0) pipe_write_block_stream(out, buffer, rd);
return rd;
}
static void pipe_post_read(InputBuf * ibuf, unsigned char * buf, size_t size) {
ChannelPIPE * c = ibuf2pipe(ibuf);
if (c->read_pending) return;
c->read_pending = 1;
c->rd_req.u.fio.bufp = buf;
c->rd_req.u.fio.bufsz = size;
async_req_post(&c->rd_req);
}
static void pipe_wait_read(InputBuf * ibuf) {
ChannelPIPE * c = ibuf2pipe(ibuf);
/* Wait for read to complete */
assert(c->lock_cnt > 0);
assert(c->read_pending != 0);
cancel_event(pipe_read_done, &c->rd_req, 1);
pipe_read_done(&c->rd_req);
}
static int pipe_read_stream(InputStream * inp) {
Channel * channel = inp2channel(inp);
ChannelPIPE * c = channel2pipe(channel);
assert(c->lock_cnt > 0);
if (inp->cur < inp->end) return *inp->cur++;
return ibuf_get_more(&c->ibuf, 0);
}
static int pipe_peek_stream(InputStream * inp) {
Channel * channel = inp2channel(inp);
ChannelPIPE * c = channel2pipe(channel);
assert(c->lock_cnt > 0);
if (inp->cur < inp->end) return *inp->cur;
return ibuf_get_more(&c->ibuf, 1);
}
static void send_eof_and_close(Channel * channel, int err) {
ChannelPIPE * c = channel2pipe(channel);
assert(c->magic == CHANNEL_MAGIC);
if (channel->state == ChannelStateDisconnected) return;
ibuf_flush(&c->ibuf);
if (c->ibuf.handling_msg == HandleMsgTriggered) {
/* Cancel pending message handling */
cancel_event(handle_channel_msg, c, 0);
c->ibuf.handling_msg = HandleMsgIdle;
}
write_stream(&c->chan->out, MARKER_EOS);
write_errno(&c->chan->out, err);
write_stream(&c->chan->out, MARKER_EOM);
pipe_flush(c);
pipe_post_read(&c->ibuf, c->obuf, sizeof(c->obuf));
c->chan->state = ChannelStateDisconnected;
if (output_queue_is_empty(&c->out_queue)) close_output_pipe(c);
notify_channel_closed(channel);
if (channel->disconnected) {
channel->disconnected(channel);
}
else {
trace(LOG_PROTOCOL, "channel %#" PRIxPTR " disconnected", (uintptr_t)c);
if (channel->protocol != NULL) protocol_release(channel->protocol);
}
channel->protocol = NULL;
}
static void handle_channel_msg(void * x) {
Trap trap;
ChannelPIPE * c = (ChannelPIPE *)x;
int has_msg;
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->ibuf.handling_msg == HandleMsgTriggered);
assert(c->ibuf.message_count);
has_msg = ibuf_start_message(&c->ibuf);
if (has_msg <= 0) {
if (has_msg < 0 && c->chan->state != ChannelStateDisconnected) {
trace(LOG_PROTOCOL, "Pipe is close by remote peer, channel %#" PRIxPTR " %s", (uintptr_t)c, c->chan->peer_name);
channel_close(c->chan);
}
}
else if (set_trap(&trap)) {
if (c->chan->receive) {
c->chan->receive(c->chan);
}
else {
handle_protocol_message(c->chan);
}
clear_trap(&trap);
}
else {
trace(LOG_ALWAYS, "Exception in message handler: %s", errno_to_str(trap.error));
send_eof_and_close(c->chan, trap.error);
}
}
static void channel_check_pending(Channel * channel) {
ChannelPIPE * c = channel2pipe(channel);
assert(is_dispatch_thread());
if (c->ibuf.handling_msg == HandleMsgIdle && c->ibuf.message_count) {
post_event(handle_channel_msg, c);
c->ibuf.handling_msg = HandleMsgTriggered;
}
}
static void pipe_trigger_message(InputBuf * ibuf) {
ChannelPIPE * c = ibuf2pipe(ibuf);
assert(is_dispatch_thread());
assert(c->ibuf.message_count > 0);
if (c->ibuf.handling_msg == HandleMsgIdle) {
post_event(handle_channel_msg, c);
c->ibuf.handling_msg = HandleMsgTriggered;
}
}
static int channel_get_message_count(Channel * channel) {
ChannelPIPE * c = channel2pipe(channel);
assert(is_dispatch_thread());
if (c->ibuf.handling_msg != HandleMsgTriggered) return 0;
return c->ibuf.message_count;
}
static void pipe_read_done(void * x) {
AsyncReqInfo * req = (AsyncReqInfo *)x;
ChannelPIPE * c = (ChannelPIPE *)req->client_data;
int len = 0;
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->read_pending != 0);
assert(c->lock_cnt > 0);
c->read_pending = 0;
len = c->rd_req.u.fio.rval;
if (req->error) {
if (c->chan->state != ChannelStateDisconnected) {
trace(LOG_ALWAYS, "Can't read from pipe: %s", errno_to_str(req->error));
}
len = 0; /* Treat error as eof */
}
if (c->chan->state != ChannelStateDisconnected) {
ibuf_read_done(&c->ibuf, len);
}
else if (len > 0) {
pipe_post_read(&c->ibuf, c->obuf, sizeof(c->obuf));
}
else {
pipe_unlock(c->chan);
}
}
static void start_channel(Channel * channel) {
ChannelPIPE * c = channel2pipe(channel);
assert(is_dispatch_thread());
assert(c->magic == CHANNEL_MAGIC);
assert(c->fd_inp >= 0);
assert(c->fd_out >= 0);
notify_channel_created(c->chan);
if (c->chan->connecting) {
c->chan->connecting(c->chan);
}
else {
trace(LOG_PROTOCOL, "channel server connecting");
send_hello_message(c->chan);
}
ibuf_trigger_read(&c->ibuf);
}
static ChannelPIPE * create_channel(int fd_inp, int fd_out, ServerInstance * server) {
ChannelPIPE * c = NULL;
assert(fd_inp >= 0);
assert(fd_out >= 0);
c = (ChannelPIPE *)loc_alloc_zero(sizeof *c);
c->chan = channel_alloc();
channel2pipe(c->chan) = c;
c->magic = CHANNEL_MAGIC;
c->chan->inp.read = pipe_read_stream;
c->chan->inp.peek = pipe_peek_stream;
c->chan->out.cur = c->obuf;
c->chan->out.end = c->obuf + sizeof(c->obuf);
c->chan->out.write = pipe_write_stream;
c->chan->out.write_block = pipe_write_block_stream;
c->chan->out.splice_block = pipe_splice_block_stream;
list_add_last(&c->chan->chanlink, &channel_root);
shutdown_set_normal(&channel_shutdown);
c->chan->state = ChannelStateStartWait;
c->chan->incoming = server != NULL;
c->chan->start_comm = start_channel;
c->chan->check_pending = channel_check_pending;
c->chan->message_count = channel_get_message_count;
c->chan->lock = pipe_lock;
c->chan->unlock = pipe_unlock;
c->chan->is_closed = pipe_is_closed;
c->chan->close = send_eof_and_close;
ibuf_init(&c->ibuf, &c->chan->inp);
c->ibuf.post_read = pipe_post_read;
c->ibuf.wait_read = pipe_wait_read;
c->ibuf.trigger_message = pipe_trigger_message;
c->fd_inp = fd_inp;
c->fd_out = fd_out;
c->lock_cnt = 1;
c->server = server;
c->rd_req.done = pipe_read_done;
c->rd_req.client_data = c;
c->rd_req.type = AsyncReqRead;
c->rd_req.u.fio.fd = fd_inp;
output_queue_ini(&c->out_queue);
return c;
}
static void set_peer_name(ChannelPIPE * c) {
/* Create a human readable channel name that uniquely identifies remote peer */
static int pipe_cnt = 0;
c->chan->peer_name = loc_printf("PIPE:%d", pipe_cnt++);
}
typedef struct ChannelConnectInfo {
ChannelConnectCallBack callback;
void * callback_args;
int error;
int fd_inp;
int fd_out;
} ChannelConnectInfo;
static void channel_pipe_connect_done(void * args) {
ChannelConnectInfo * info = (ChannelConnectInfo *)args;
if (info->error) {
info->callback(info->callback_args, info->error, NULL);
}
else {
ChannelPIPE * c = create_channel(info->fd_inp, info->fd_out, NULL);
set_peer_name(c);
info->callback(info->callback_args, 0, c->chan);
}
loc_free(info);
}
void channel_pipe_connect(PeerServer * ps, ChannelConnectCallBack callback, void * callback_args) {
const char * path = peer_server_getprop(ps, attr_pipe_name, def_pipe_name);
ChannelConnectInfo * info = (ChannelConnectInfo *)loc_alloc_zero(sizeof(ChannelConnectInfo));
char out_path[FILE_PATH_SIZE];
info->fd_out = -1;
info->fd_inp = open(path, O_BINARY | O_RDONLY, 0);
if (info->fd_inp < 0) info->error = errno;
if (!info->error) {
int l = read(info->fd_inp, out_path, sizeof(out_path) - 1);
if (l < 0) info->error = errno;
else out_path[l] = 0;
}
if (!info->error) {
info->fd_out = open(out_path, O_BINARY | O_WRONLY, 0);
if (info->fd_out < 0) info->error = errno;
}
if (info->error) {
if (info->fd_inp >= 0) close(info->fd_inp);
if (info->fd_out >= 0) close(info->fd_out);
info->fd_inp = -1;
info->fd_out = -1;
}
info->callback = callback;
info->callback_args = callback_args;
post_event(channel_pipe_connect_done, info);
}
#if defined(_WIN32) && !defined(__CYGWIN__)
#define check_error_win32(ok) { if (!(ok)) check_error(set_win32_errno(GetLastError())); }
static void pipe_client_connected(void * args) {
AsyncReqInfo * req = (AsyncReqInfo *)args;
ServerInstance * ins = (ServerInstance *)req->client_data;
int error = 0;
assert(req == &ins->req);
if (req->error) error = req->error;
if (!error) {
size_t l = 0;
HANDLE h = NULL;
OVERLAPPED overlap;
char inp_path[FILE_PATH_SIZE];
const char * path = peer_server_getprop(ins->server->serv.ps, attr_pipe_name, def_pipe_name);
static unsigned pipe_cnt = 0;
memset(&overlap, 0, sizeof(overlap));
snprintf(inp_path, sizeof(inp_path), "%s-%u", path, pipe_cnt++);
l = strlen(inp_path) + 1;
h = CreateNamedPipe(inp_path, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
1, BUF_SIZE, BUF_SIZE, 0, NULL);
if (h == INVALID_HANDLE_VALUE) error = set_win32_errno(GetLastError());
if (!error) {
ins->fd_inp = _open_osfhandle((intptr_t)h, O_BINARY | O_RDONLY);
if (ins->fd_inp < 0) error = errno;
}
if (!error) {
overlap.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (overlap.hEvent == NULL) error = set_win32_errno(GetLastError());
}
if (!error) {
if (ConnectNamedPipe(h, &overlap)) {
error = ERR_PROTOCOL;
}
else {
DWORD e = GetLastError();
if (e != ERROR_IO_PENDING) error = set_win32_errno(GetLastError());
}
}
if (!error) {
int wr = write(ins->fd_out, inp_path, l);
if (wr < 0) error = errno;
}
if (!error) {
switch (WaitForSingleObject(overlap.hEvent, 10000)) {
case WAIT_OBJECT_0:
break;
case WAIT_ABANDONED:
case WAIT_TIMEOUT:
error = ETIMEDOUT;
break;
default:
error = set_win32_errno(GetLastError());
break;
}
}
if (!CloseHandle(overlap.hEvent)) error = set_win32_errno(GetLastError());
}
if (error) {
trace(LOG_ALWAYS, "Cannot connect pipe: %s", errno_to_str(error));
if (ins->fd_inp >= 0) close(ins->fd_inp);
ins->fd_inp = -1;
DisconnectNamedPipe(ins->pipe);
async_req_post(&ins->req);
}
else {
ChannelPIPE * c = create_channel(ins->fd_inp, ins->fd_out, ins);
set_peer_name(c);
ins->server->serv.new_conn(&ins->server->serv, c->chan);
}
}
static void close_input_pipe(ChannelPIPE * c) {
assert(c->fd_out < 0);
assert(c->fd_inp > 0);
if (c->server != NULL) {
ServerInstance * ins = c->server;
close(ins->fd_inp);
ins->fd_inp = -1;
async_req_post(&ins->req);
c->server = NULL;
}
else {
close(c->fd_inp);
}
c->fd_inp = -1;
}
static void close_output_pipe(ChannelPIPE * c) {
assert(c->fd_out > 0);
assert(c->fd_inp > 0);
if (c->server != NULL) {
ServerInstance * ins = c->server;
check_error_win32(DisconnectNamedPipe(ins->pipe));
}
else {
close(c->fd_out);
}
c->fd_out = -1;
}
static void register_server(ServerPIPE * s) {
size_t i;
PeerServer * ps = s->serv.ps;
PeerServer * ps2 = peer_server_alloc();
const char * transport = peer_server_getprop(ps, "TransportName", NULL);
const char * path = peer_server_getprop(ps, attr_pipe_name, def_pipe_name);
char id[256];
ps2->flags = ps->flags | PS_FLAG_LOCAL | PS_FLAG_DISCOVERABLE;
for (i = 0; i < ps->ind; i++) {
peer_server_addprop(ps2, loc_strdup(ps->list[i].name), loc_strdup(ps->list[i].value));
}
i = strlen(DEFAULT_PIPE_DIR);
if (strncmp(path, DEFAULT_PIPE_DIR, i) != 0) i = 0;
snprintf(id, sizeof(id), "%s:%s", transport, path + i);
for (i = 0; id[i]; i++) {
/* Character '/' is prohibited in a peer ID string */
if (id[i] == '/') id[i] = '|';
}
peer_server_addprop(ps2, loc_strdup("ID"), loc_strdup(id));
peer_server_addprop(ps2, loc_strdup(attr_pipe_name), loc_strdup(path));
peer_server_add(ps2, ~0u);
}
#else
static void close_output_pipe(ChannelPIPE * c) {
}
static void close_input_pipe(ChannelPIPE * c) {
}
#endif
static void server_close(ChannelServer * serv) {
ServerPIPE * s = server2pipe(serv);
int i;
assert(is_dispatch_thread());
if (s->closed) return;
s->closed = 1;
list_remove(&s->serv.servlink);
if (list_is_empty(&channel_root) && list_is_empty(&channel_server_root))
shutdown_set_stopped(&channel_shutdown);
list_remove(&s->servlink);
peer_server_free(s->serv.ps);
for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
ServerInstance * ins = s->arr + i;
if (ins->fd_inp >= 0 && close(ins->fd_inp) < 0) check_error(errno);
if (ins->fd_out >= 0 && close(ins->fd_out) < 0) check_error(errno);
ins->fd_inp = ins->fd_out = -1;
#if defined(_WIN32) && !defined(__CYGWIN__)
ins->pipe = NULL;
#endif
}
/* TODO: free 's' when all pending reqs are done */
}
ChannelServer * channel_pipe_server(PeerServer * ps) {
ServerPIPE * s = (ServerPIPE *)loc_alloc_zero(sizeof(ServerPIPE));
#if defined(_WIN32) && !defined(__CYGWIN__)
{
int i;
const char * path = peer_server_getprop(ps, attr_pipe_name, def_pipe_name);
for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
ServerInstance * ins = s->arr + i;
ins->server = s;
ins->index = i;
ins->pipe = CreateNamedPipe(path, PIPE_ACCESS_OUTBOUND,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
SERVER_INSTANCE_CNT, BUF_SIZE, BUF_SIZE, 0, NULL);
if (ins->pipe == INVALID_HANDLE_VALUE) {
set_win32_errno(GetLastError());
return NULL;
}
ins->fd_inp = -1;
ins->fd_out = _open_osfhandle((intptr_t)ins->pipe, O_BINARY | O_WRONLY);
}
s->serv.ps = ps;
s->serv.close = server_close;
list_add_last(&s->serv.servlink, &channel_server_root);
shutdown_set_normal(&channel_shutdown);
list_add_last(&s->servlink, &server_list);
for (i = 0; i < SERVER_INSTANCE_CNT; i++) {
ServerInstance * ins = s->arr + i;
ins->req.type = AsyncReqConnectPipe;
ins->req.client_data = &s->arr[i];
ins->req.done = pipe_client_connected;
ins->req.u.cnp.pipe = ins->pipe;
async_req_post(&ins->req);
}
register_server(s);
return &s->serv;
}
#else
s->serv.close = server_close;
/* TODO: Unix pipe channel */
loc_free(s);
errno = ERR_UNSUPPORTED;
return NULL;
#endif
}
void ini_channel_pipe() {
channel_pipe_extension_offset = channel_extension(sizeof(ChannelPIPE *));
}
#else
/* Pipes are not supported */
#include <tcf/framework/errors.h>
#include <tcf/framework/channel_pipe.h>
void channel_pipe_connect(PeerServer * server, ChannelConnectCallBack callback, void * callback_args) {
callback(callback_args, ERR_UNSUPPORTED, NULL);
}
ChannelServer * channel_pipe_server(PeerServer * server) {
errno = ERR_UNSUPPORTED;
return NULL;
}
void ini_channel_pipe(void) {
}
#endif