blob: 1eb6b20fe0cc2d6ed4d7fed88078e537c8ef3846 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2008 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - Initial API and implementation
*******************************************************************************/
#include "config.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "sdm.h"
static int shutting_down = 0;
static void recv_callback(sdm_message msg);
/*
* Initialize the abstraction layers. ORDER IS IMPORTANT!
*/
int
sdm_init(int argc, char *argv[])
{
if (sdm_routing_table_init(argc, argv) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_STARTUP, "[%d] sdm_routing_table_init failed\n", sdm_route_get_id());
return -1;
}
if (sdm_message_init(argc, argv) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_STARTUP, "[%d] sdm_message_init failed\n", sdm_route_get_id());
return -1;
}
if (sdm_aggregate_init(argc, argv) < 0) {
DEBUG_PRINTF(DEBUG_LEVEL_STARTUP, "[%d] sdm_aggregate_init failed\n", sdm_route_get_id());
return -1;
}
DEBUG_PRINTF(DEBUG_LEVEL_STARTUP, "[%d] Initialization successful\n", sdm_route_get_id());
sdm_message_set_recv_callback(recv_callback);
return 0;
}
/**
* Finalize the abstraction layers
*/
void
sdm_finalize(void)
{
shutting_down = 1;
sdm_aggregate_finalize();
sdm_route_finalize();
sdm_message_finalize();
}
/*
* Progress messages and the aggregation layer
*/
int
sdm_progress(void)
{
if (sdm_message_progress() < 0) {
return -1;
}
sdm_aggregate_progress();
return 0;
}
/*
* Process a received message. This implements the main communication engine
* message processing algorithm.
*/
static void
recv_callback(sdm_message msg)
{
DEBUG_PRINTF(DEBUG_LEVEL_MESSAGES, "[%d] Enter recv_callback\n", sdm_route_get_id());
if (sdm_set_contains(sdm_message_get_source(msg), SDM_MASTER)) {
DEBUG_PRINTF(DEBUG_LEVEL_MESSAGES, "[%d] got downstream message src=%s, dest=%s\n",
sdm_route_get_id(),
_set_to_str(sdm_message_get_source(msg)),
_set_to_str(sdm_message_get_destination(msg)));
if (shutting_down) {
/*
* Stop processing downstream messages
*/
return;
}
/*
* Start aggregating messages
*/
sdm_aggregate_message(msg, SDM_AGGREGATE_DOWNSTREAM);
/*
* If we are the destination, then deliver the payload
*/
if (sdm_set_contains(sdm_message_get_destination(msg), sdm_route_get_id())) {
sdm_message_deliver(msg);
}
/*
* Free the message once it's been forwarded
*/
sdm_message_set_send_callback(msg, sdm_message_free);
/*
* Now forward the message
*/
sdm_message_send(msg);
} else {
DEBUG_PRINTF(DEBUG_LEVEL_MESSAGES, "[%d] got upstream message src=%s, dest=%s\n",
sdm_route_get_id(),
_set_to_str(sdm_message_get_source(msg)),
_set_to_str(sdm_message_get_destination(msg)));
/*
* Upstream messages are always aggregated
*/
sdm_aggregate_message(msg, SDM_AGGREGATE_UPSTREAM);
}
DEBUG_PRINTF(DEBUG_LEVEL_MESSAGES, "[%d] Leaving recv_callback\n", sdm_route_get_id());
}