blob: 4ff733ccc828374f6b8c481b9ae0983c9d0bc9d3 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - Initial API and implementation
*******************************************************************************/
/*
* Message transport implementation. This provides an implementation of
* SDM messages using TCP/IP.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netdb.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "compat.h"
#include "list.h"
#include "serdes.h"
#include "sdm.h"
#include "helloproto.h"
#define MESSAGE_LENGTH_SIZE 8
#define MESSAGE_ID_SIZE 8
struct sdm_message {
unsigned int id; /* ID of the message */
sdm_idset dest; /* Destinations of the message */
sdm_idset src; /* Sources of the message */
sdm_aggregate aggregate; /* Message aggregation */
char * payload; /* Payload */
int payload_len; /* Payload length */
char * buf; /* Receive buffer */
int buf_len; /* Receive buffer length */
void (*send_complete)(sdm_message msg);
};
/**
* Base type for a linked list of ids and respective
* socket descriptors
*/
typedef struct id_sockd_map {
sdm_id id;
int sockd;
struct id_sockd_map *next;
} sdm_id_sockd_map, *sdm_id_sockd_map_p;
static void (*sdm_recv_callback)(sdm_message msg) = NULL;
static void (*deliver_callback)(const sdm_message msg) = NULL;
#define MAX_PORT_INCREMENT 1000 /* Max port increment before failing */
#define CHILD_CONNECT_TRIES 5 /* Number of times to try each port */
#define CHILD_CONNECT_SLEEP 1 /* Seconds to sleep between each connection attempt */
sdm_id_sockd_map_p children_sockd_map, parent_sockd_map; /* Leaf case, children = NULL
Root case, parent = NULL */
/** Begin of functions created by Richard **/
/**
* Create the maps and fill with the ids of the children and parent
*/
void
sdm_create_sockd_map()
{
sdm_idset all_nodes;
sdm_idset adjacent_nodes;
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] in sdm_create_sockd_map\n",
sdm_route_get_id());
// Set parent map
if (sdm_route_get_id() == SDM_MASTER) {
parent_sockd_map = NULL; // Root
} else {
parent_sockd_map = (sdm_id_sockd_map_p)malloc(sizeof(sdm_id_sockd_map));
parent_sockd_map->id = sdm_route_get_parent();
}
// Get all adjacent nodes
all_nodes = sdm_set_new();
all_nodes = sdm_set_add_all(all_nodes, sdm_route_get_size()-1);
// Filter the parent node and master
if (parent_sockd_map != NULL) {
sdm_set_remove_element(all_nodes, parent_sockd_map->id);
sdm_set_remove_element(all_nodes, SDM_MASTER);
}
adjacent_nodes = sdm_route_get_route(all_nodes);
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] adjacent nodes: %s\n",
sdm_route_get_id(), _set_to_str(adjacent_nodes));
children_sockd_map = NULL;
if (!sdm_set_is_empty(adjacent_nodes)) {
// Traverse adjacent nodes adding them to the children linked list
sdm_id child;
sdm_id_sockd_map_p p;
for (child = sdm_set_first(adjacent_nodes); !sdm_set_done(adjacent_nodes);
child = sdm_set_next(adjacent_nodes)) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] adding %d to my map\n",
sdm_route_get_id(), child);
p = (sdm_id_sockd_map_p)malloc(sizeof(sdm_id_sockd_map));
p->next = children_sockd_map;
p->id = child;
children_sockd_map = p;
}
}
sdm_set_free(all_nodes);
}
/**
* Binds and listen to a port. After accepting connection, updates the
*
* @return socket descriptor if successful, -1 if error
*/
int
sdm_parent_port_bind(int parentbaseport)
{
int sockfd, peersockfd;
int parentport = parentbaseport;
struct sockaddr_in sockaddr_info;
struct sockaddr_in peersockaddr_info;
struct in_addr addr = {INADDR_ANY};
unsigned peeraddr_len = sizeof(struct in_addr);
//.Create a socket and bind to the port
sockfd = socket(PF_INET, SOCK_STREAM, 0);
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] [ACCEPT] sockfd: %d\n", sdm_route_get_id(), sockfd);
memset(&sockaddr_info, 0, sizeof(struct sockaddr_in));
sockaddr_info.sin_family = AF_INET;
sockaddr_info.sin_port = htons(parentport);
sockaddr_info.sin_addr = addr;
// Try to bind port until MAX_PORT_INCREMENT, except if bind error is not EADDRINUSE
for(;parentport < parentbaseport + MAX_PORT_INCREMENT; parentport++) {
if(bind(sockfd, (struct sockaddr *)&(sockaddr_info), sizeof(struct sockaddr_in)) < 0) {
if(errno != EADDRINUSE) {
// Error!
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Error binding to the port! - errno %d\n",
sdm_route_get_id(), errno);
return -1;
}
} else { // Bound successfully
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] port bound: %d\n", sdm_route_get_id(), parentport);
break;
}
// Increment port number and try again
}
if (parentport < parentbaseport + MAX_PORT_INCREMENT) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] bound to port %d\n", sdm_route_get_id(), parentport);
} else {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] could not find port to bind to\n", sdm_route_get_id());
close(sockfd);
return -1;
}
if(listen(sockfd, 5) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Error listening to the port %d!\n", sdm_route_get_id(), parentport);
return -1;
}
// Wait for a valid client to connect
while(1) {
memset(&peersockaddr_info, 0, sizeof(struct sockaddr_in));
peersockfd = accept(sockfd, (struct sockaddr *)&(peersockaddr_info), &peeraddr_len);
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] peersockfd: %d\n", sdm_route_get_id(), peersockfd);
if(peersockfd < 0) {
perror("Status of accepting data from parent");
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Could not accept port connection on port %d - errno %d!\n",
sdm_route_get_id(), parentport, errno);
return -1;
}
// Check if it is connected to another sdm process before returning sockfd
if(is_client_peer_valid(peersockfd)) {
break;
}
close(peersockfd);
}
// We don't need to wait for connections anymore
close(sockfd);
return peersockfd;
}
/**
* Connect to the child node using hostname. Port is the first that connects between
* childbaseport and childbaseport + MAX_PORT_INCREMENT
*
* Try CHILD_CONNECT_TRIES to connect to each port, delaying CHILD_CONNECT_SLEEP between each attempt.
*
* This function also checks if the protocol is the sdm one.
*/
int
sdm_connect_to_child(char *hostname, int childbaseport)
{
struct addrinfo hints, *result;
int sockfd;
int childport;
int num_tries;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = PF_INET;
hints.ai_socktype = SOCK_STREAM;
for(childport = childbaseport;childport < childbaseport + MAX_PORT_INCREMENT; childport++) {
char port_str[10];
// Get first result from the linked list
sockfd = socket(PF_INET, SOCK_STREAM, 0);
if(sockfd < 0) {
perror("Socket");
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] socket syscall error\n", sdm_route_get_id());
return -1;
}
// Connect to the provided address & port
sprintf(port_str, "%d", childport);
if(getaddrinfo(hostname, port_str, &hints, &result)) {
perror("getaddrinfo");
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] getaddrinfo error. hostname: %s, port: %s\n",
sdm_route_get_id(), hostname, port_str);
return -1;
}
for (num_tries = 0; num_tries < CHILD_CONNECT_TRIES; num_tries++) {
if (connect(sockfd, result->ai_addr, result->ai_addrlen) >= 0) {
break;
}
if( (errno != ECONNREFUSED) && (errno != ENETUNREACH) &&
(errno != ETIMEDOUT) ) {
perror("connect");
freeaddrinfo(result);
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] connect error. hostname: %s, port: %s\n",
sdm_route_get_id(), hostname, port_str);
return -1;
}
sleep(CHILD_CONNECT_SLEEP);
}
if (num_tries < CHILD_CONNECT_TRIES){
freeaddrinfo(result);
// Check if it is connected to another sdm process before returning sockfd
if (is_server_peer_valid(sockfd)) {
return sockfd;
}
close(sockfd);
}
}
// No valid peer found!
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] No port found for the sdm child. hostname: %s\n",
sdm_route_get_id(), hostname);
return -1;
}
/**
* Initializes the message layer using tcp/ip to transport data
*/
int
sdm_tcpip_init()
{
routing_table_entry * entry;
sdm_id_sockd_map_p mapp;
sdm_create_sockd_map();
// If it isn't the master node bind to the port provided
if(sdm_route_get_id() != SDM_MASTER) {
int parentsockd = -1;
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] This node is a server!\n", sdm_route_get_id());
for (sdm_routing_table_set(); (entry = sdm_routing_table_next()) != NULL; ) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] found entry for node %d\n", sdm_route_get_id(), entry->nodeID);
if(entry->nodeID == sdm_route_get_id()) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] found my entry\n", sdm_route_get_id());
parentsockd = sdm_parent_port_bind(entry->port);
break;
}
}
if(parentsockd < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] failed to bind to port\n", sdm_route_get_id());
return -1;
}
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Parent %d successfully connected\n", sdm_route_get_id(), parent_sockd_map->id);
// Update the parent socket descriptor
parent_sockd_map->sockd = parentsockd;
}
// If node is leaf, initialization is done
if(children_sockd_map == NULL) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] This node is a leaf\n", sdm_route_get_id());
return 0;
}
// Otherwise, open connections to the children
for (sdm_routing_table_set(); (entry = sdm_routing_table_next()) != NULL; ) {
int childsockd;
mapp = children_sockd_map;
while(mapp != NULL) {
if(mapp->id == entry->nodeID) {
// ID found! Connect to the children and generate a socket descriptor
childsockd = sdm_connect_to_child(entry->hostname, entry->port);
if(childsockd < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] failed to bind to connect to child %s:%d\n", sdm_route_get_id(), entry->hostname, entry->port);
return -1;
}
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Connection to child %d successful\n", sdm_route_get_id(), mapp->id);
// Set the children socket descriptor
mapp->sockd = childsockd;
break;
}
mapp = mapp->next;
}
}
return 0;
}
/**
* Close sockets and free all map structures
*/
void
sdm_clean_maps()
{
if(parent_sockd_map != NULL) {
close(parent_sockd_map->sockd);
free(parent_sockd_map);
}
while(children_sockd_map != NULL) {
sdm_id_sockd_map_p p = children_sockd_map->next;
close(children_sockd_map->sockd);
free(children_sockd_map);
children_sockd_map = p;
}
}
/**
* Given a nodeid, return an associated socket descriptor
*
* @return socket descriptor on success, -1 on error
*/
int
sdm_fetch_sockd(int nodeid)
{
sdm_id_sockd_map_p p;
// Look for nodeid on the children list first
p = children_sockd_map;
while(p != NULL) {
if(p->id == nodeid) {
return p->sockd;
}
p = p->next;
}
// Then, must be the parent
p = parent_sockd_map;
if(p->id == nodeid) {
return p->sockd;
}
// Not mapped! Return an error code!
return -1;
}
/**
* Given a sockid, return an associated id
*
* @return id on success, -1 on error
*/
int
sdm_fetch_nodeid(int sockd)
{
sdm_id_sockd_map_p p;
// Look for nodeid on the children list first
p = children_sockd_map;
while(p != NULL) {
if(p->sockd == sockd) {
return p->id;
}
p = p->next;
}
// Then, must be the parent
p = parent_sockd_map;
if(p->sockd == sockd) {
return p->id;
}
// Not mapped! Return an error code!
return -1;
}
/**
* Inspect all node's sockets for data, returning the socket descriptor
* of a socket which contains data.
*
* @return socket descriptor on success, -1 on error, -2 if no socket has data
*/
int
sdm_get_active_sock_desc()
{
fd_set rfdset;
int fdmax = -1;
int selrv;
sdm_id_sockd_map_p p;
struct timeval time = {0, 0};
// Add all valid sockets to the FD_SET
FD_ZERO(&rfdset);
if(parent_sockd_map != NULL) {
FD_SET(parent_sockd_map->sockd, &rfdset);
fdmax = parent_sockd_map->sockd;
}
p = children_sockd_map;
while(p != NULL) {
FD_SET(p->sockd, &rfdset);
if(p->sockd > fdmax) {
fdmax = p->sockd;
}
p = p->next;
}
if(fdmax < 0) {
// No children socket opened
return -2;
}
// Check if any of the sockets connected
// but don't block
selrv = select(fdmax + 1, &rfdset, NULL, NULL, &time);
if(selrv < 0) {
perror("select syscall status");
DEBUG_PRINTS(DEBUG_LEVEL_CLIENT, "select syscall failed!\n");
return -1;
} else if(selrv == 0) { // No socket has data. Return appropriated code
// printf("The sockets listened dont have data!\n");
return -2;
}
// return the first active socket descriptor
p = children_sockd_map;
while(p != NULL) {
if(FD_ISSET(p->sockd, &rfdset)) {
return p->sockd;
}
p = p->next;
}
if(parent_sockd_map != NULL) {
if(FD_ISSET(parent_sockd_map->sockd, &rfdset)) {
return parent_sockd_map->sockd;
}
}
// Should not reach this point
return -1;
}
/**
* Provides the number of bytes to be read of the message
* (the size of the header is already subtracted)
*/
int
sdm_tcpip_msgheader_receive(int sockfd, int *length)
{
int n;
int len;
char * p;
char length_str[MESSAGE_LENGTH_SIZE];
p = length_str;
len = MESSAGE_LENGTH_SIZE;
while ((n = read(sockfd, p, len)) < len) {
if (n <= 0) {
if (n < 0) {
if (errno == EINTR) {
continue;
}
perror("Status on read syscall");
}
return -1;
}
p += n;
len -= n;
}
*length = hex_str_to_int(length_str, MESSAGE_LENGTH_SIZE, NULL) - MESSAGE_LENGTH_SIZE;
return 0;
}
int
sdm_tcpip_msgbody_receive(int sockfd, char *buf, int length)
{
int n;
while ((n = read(sockfd, buf, length)) < length) {
if (n <= 0) {
if (n < 0) {
if (errno == EINTR) {
continue;
}
perror("Status on read syscall");
}
return -1;
}
buf += n;
length -= n;
}
return 0;
}
int
sdm_tcpip_send(int sockd, char *buf, int length)
{
int wcount;
// Write all message to the socket
do {
wcount = write(sockd, (void *)buf, length);
if (wcount <= 0) {
if (wcount < 0) {
perror("write syscall status");
}
DEBUG_PRINTS(DEBUG_LEVEL_CLIENT, "Could not send message data - write syscall failed!\n");
return -1;
}
buf += wcount;
length -= wcount;
} while(length > 0);
return 0;
}
/** End of functions created by Richard **/
/**
* Initialize the runtime abstraction. The jobid is the job ID
* that was allocated by the OpenMPI runtime system. This is used
* to set an environment variable that will enable MPI_Init to
* establish communication.
*
* @return 0 on success, -1 on failure
*/
int
sdm_message_init(int argc, char *argv[])
{
/*
* Initializes the communication layer
*/
if (sdm_tcpip_init() < 0) {
return -1;
}
return 0;
}
/**
* Finalize the message abstraction.
*/
void
sdm_message_finalize()
{
// Clean the maps
sdm_clean_maps();
}
/**
* Send a message to the destinations.
*
* @return 0 on success, -1 on failure
*/
int
sdm_message_send(const sdm_message msg)
{
int len;
int maxlen; // effective and maximum length of the message
// these two vars must have the same size!
char * p;
char * header_addr; // Pointer to the header of the message
char * buf;
sdm_id dest_id;
sdm_idset route;
/*
* Remove our id from the destination of the message before forwarding.
*/
if (sdm_set_contains(msg->dest, sdm_route_get_id())) {
sdm_set_remove_element(msg->dest, sdm_route_get_id());
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] sdm_message_send removing me from dest\n", sdm_route_get_id());
}
/*
* Compute the immediate destinations for the message
*/
route = sdm_route_get_route(msg->dest);
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] sdm_message_send src %s dest %s route %s\n", sdm_route_get_id(),
_set_to_str(msg->src),
_set_to_str(msg->dest),
_set_to_str(route));
if (!sdm_set_is_empty(route)) {
/*
* Create a serialized version of the message
* This includes the length of the message at the beginning
*/
maxlen = MESSAGE_LENGTH_SIZE
+ MESSAGE_ID_SIZE
+ sdm_aggregate_serialized_length(msg->aggregate)
+ sdm_set_serialized_length(msg->src)
+ sdm_set_serialized_length(msg->dest)
+ msg->payload_len;
p = buf = (char *)malloc(maxlen);
// Saves the address of the header
header_addr = p;
p += MESSAGE_LENGTH_SIZE; // Points to the body of the message
/*
* Note: maxlen was the maximum length of the serialized buffer, we
* now calculate the actual length for the send.
*/
len = MESSAGE_LENGTH_SIZE;
len += MESSAGE_ID_SIZE;
int_to_hex_str(msg->id, p, MESSAGE_ID_SIZE, &p);
len += sdm_aggregate_serialize(msg->aggregate, p, &p);
len += sdm_set_serialize(msg->src, p, &p);
len += sdm_set_serialize(msg->dest, p, &p);
memcpy(p, msg->payload, msg->payload_len);
len += msg->payload_len;
// Copies the actual length to the header of the message
int_to_hex_str(len, header_addr, MESSAGE_LENGTH_SIZE, NULL);
/*
* Send the message to each destination, converting the
* node ID to a proper address before the transmission
*/
for (dest_id = sdm_set_first(route); !sdm_set_done(route); dest_id = sdm_set_next(route)) {
int sockd;
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Sending len %d to %d\n", sdm_route_get_id(), len, dest_id);
// Get socket descriptor corresponding to the node where the message will be sent
sockd = sdm_fetch_sockd(dest_id);
if (sockd < 0) {
DEBUG_PRINTS(DEBUG_LEVEL_CLIENT, "Socket descriptor not found!\n");
return -1;
}
// Write all message to the socket
if (sdm_tcpip_send(sockd, buf, len) < 0) {
DEBUG_PRINTS(DEBUG_LEVEL_CLIENT, "Error sending message!\n");
return -1;
}
}
/*
* Free resources.
*/
free(buf);
}
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] About to call send_complete\n", sdm_route_get_id());
/*
* Notify that the send is complete.
*/
if (msg->send_complete != NULL) {
msg->send_complete(msg);
}
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Leaving sdm_message_send\n", sdm_route_get_id());
return 0;
}
/**
* Return one socket description related to a socket that is
* ready to read.
*
* @return the socket descriptor, -2 if no socket available for reading, -1 on failure
*
* Get the size of the message in bytes
* @return 0 if successful, -1 if socket closed or error
*
* Get a message from a socket. Since message length comes
* before the message, the sdm_get_count function must
* be executed first
*
* @return 0 if successful, -1 if error
*
* Message progress. Caller is responsible for freeing allocated message resources.
*
* @return 0 on success, -1 on failure
*/
int
sdm_message_progress(void)
{
int n;
int err;
int len;
char * buf;
// Retrieve the active socket descriptor
int sockfd = sdm_get_active_sock_desc();
if(sockfd == -1) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Error retrieving socket descriptor\n", sdm_route_get_id());
return -1;
}
if (sockfd >= 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] receiving header\n", sdm_route_get_id());
err = sdm_tcpip_msgheader_receive(sockfd, &len);
if(err != 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Error retrieving message size!\n", sdm_route_get_id());
return -1;
}
buf = (char *)malloc(len);
sdm_message msg = sdm_message_new(buf, len);
if(sdm_tcpip_msgbody_receive(sockfd, buf, len) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Error retrieving message!\n", sdm_route_get_id());
return -1;
}
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] sdm_message_progress received len %d from %d\n",
sdm_route_get_id(), len, sdm_fetch_nodeid(sockfd));
msg->id = hex_str_to_int(buf, MESSAGE_ID_SIZE, &buf);
len -= MESSAGE_ID_SIZE;
if ((n = sdm_aggregate_deserialize(msg->aggregate, buf, &buf)) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] invalid header\n", sdm_route_get_id());
return -1;
}
len -= n;
if ((n = sdm_set_deserialize(msg->src, buf, &buf)) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] invalid header\n", sdm_route_get_id());
return -1;
}
len -= n;
if ((n = sdm_set_deserialize(msg->dest, buf, &buf)) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] invalid header\n", sdm_route_get_id());
return -1;
}
len -= n;
msg->payload = buf;
msg->payload_len = len;
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] sdm_message_progress agg=%s src=%s dest=%s\n",
sdm_route_get_id(),
_aggregate_to_str(msg->aggregate),
_set_to_str(msg->src),
_set_to_str(msg->dest));
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] About to call recv_callback\n", sdm_route_get_id());
if (sdm_recv_callback != NULL) {
sdm_recv_callback(msg);
}
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] Finished recv_callback\n", sdm_route_get_id());
}
return 0;
}
void
sdm_message_set_send_callback(sdm_message msg, void (*callback)(sdm_message msg))
{
msg->send_complete = callback;
}
void
sdm_message_set_recv_callback(void (*callback)(sdm_message msg))
{
sdm_recv_callback = callback;
}
sdm_message
sdm_message_new(char *buf, int len)
{
static unsigned int ids = 0;
sdm_message msg = (sdm_message)malloc(sizeof(struct sdm_message));
msg->id = ids++;
msg->dest = sdm_set_new();
msg->src = sdm_set_new();
sdm_set_add_element(msg->src, sdm_route_get_id());
msg->buf = buf;
msg->buf_len = len;
msg->payload = msg->buf;
msg->payload_len = msg->buf_len;
msg->aggregate = sdm_aggregate_new();
msg->send_complete = NULL;
return msg;
}
void
sdm_message_free(sdm_message msg)
{
free(msg->buf);
sdm_set_free(msg->src);
sdm_set_free(msg->dest);
sdm_aggregate_free(msg->aggregate);
free(msg);
}
unsigned int
sdm_message_get_id(const sdm_message msg)
{
return msg->id;
}
void
sdm_message_set_id(const sdm_message msg, unsigned int id)
{
msg->id = id;
}
void
sdm_message_set_destination(const sdm_message msg, const sdm_idset dest_ids)
{
sdm_set_union(msg->dest, dest_ids);
}
sdm_idset
sdm_message_get_destination(const sdm_message msg)
{
return msg->dest;
}
void
sdm_message_set_source(const sdm_message msg, const sdm_idset source)
{
sdm_set_union(msg->src, source);
}
sdm_idset
sdm_message_get_source(const sdm_message msg)
{
return msg->src;
}
void
sdm_message_get_payload(const sdm_message msg, char **buf, int *len)
{
*buf = msg->payload;
*len = msg->payload_len;
}
sdm_aggregate
sdm_message_get_aggregate(const sdm_message msg)
{
return msg->aggregate;
}
void
sdm_message_set_aggregate(const sdm_message msg, const sdm_aggregate a)
{
sdm_aggregate_copy(msg->aggregate, a);
}
void
sdm_message_set_deliver_callback(void (*callback)(const sdm_message msg))
{
deliver_callback = callback;
}
void
sdm_message_deliver(const sdm_message msg)
{
if (deliver_callback != NULL) {
DEBUG_PRINTF(DEBUG_LEVEL_CLIENT, "[%d] sdm_message_deliver \n", sdm_route_get_id());
deliver_callback(msg);
}
}