blob: 6c6800849da3a7ad52b6bc4bee43abfbab3ac251 [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: None
Description: SCI APIs.
Author: Liu Wei, Nicole Nie, Tu HongJ
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 lwbjcdl Initial code (D153875)
01/16/12 ronglli Add codes to detect SOCKET_BROKEN
****************************************************************************/
#include <assert.h>
#include <stdlib.h>
#include <math.h>
#include <string.h>
#include <vector>
#include <sys/socket.h>
#include "sci.h"
#include "tools.hpp"
#include "log.hpp"
#include "envvar.hpp"
#include "packer.hpp"
#include "general.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "topology.hpp"
#include "ctrlblock.hpp"
#include "routinglist.hpp"
#include "eventntf.hpp"
#include "initializer.hpp"
#include "observer.hpp"
#include "filter.hpp"
#include "listener.hpp"
#include "filterlist.hpp"
#include "filterproc.hpp"
#include "routerproc.hpp"
#include "allocator.hpp"
#include "sshfunc.hpp"
#include "exception.hpp"
const char * ErrRetMsg[] = {
"Succeeded.",
"Invalid host file.",
"Invalid end type, can only be SCI_FRONT_END or SCI_BACK_END.",
"Error occurred when doing the initialization.",
"The API is called by invalid end type.",
"Error occurred when searching the group.",
"Error occurred when searching the filter.",
"Invalid filter.",
"Error occurred when searching the backend.",
"Unknown information.",
"Uninitialized SCI execution environment.",
"Can't free predefined group.",
"The group is an empty group.",
"Invalid group operator specified.",
"Can't unload predefined filter.",
"A polling timeout occurred after timeout milliseconds elapsed",
"Invalid job key specified by SCI_JOB_KEY.",
"Can only be used in polling mode.",
"Invalid filter id",
"The successor id list contains non-existed successor id.",
"The back id already existed.",
"Out of memory.",
"Failed to launch client(s).",
"Invalid polling calls.",
"Invalid user.",
"Invalid mode.",
"Error occurred when searching the agent.",
"Invalid version number.",
"Error occurred when doing the SSH-based authentication.",
"Exception occurred",
"Invalid error message number.",
"The parent is broken.",
"The child is broken.",
"Error occurred when doing the recovery.",
"Recover failed.",
"Received incorrect data.",
"Error occurred in the threads.",
};
SCI_msg_hndlr *gHndlr = NULL;
void *gParam = NULL;
// Initialization & Termination
int SCI_Initialize(sci_info_t *info)
{
int rc;
if (gCtrlBlock->getMyRole() != CtrlBlock::INVALID) {
log_warn("Has already been initialized");
return SCI_SUCCESS;
}
rc = gCtrlBlock->init(info);
if (rc != SCI_SUCCESS)
return rc;
return gInitializer->init();
}
int SCI_Terminate()
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
try {
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
Message *msg = new Message();
msg->build(SCI_FILTER_NULL, SCI_GROUP_ALL, 0, NULL, NULL, Message::QUIT);
gCtrlBlock->getRouterInQueue()->produce(msg);
}
gCtrlBlock->term();
delete gNotifier;
delete gInitializer;
delete gCtrlBlock;
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
return SCI_SUCCESS;
}
int SCI_Release()
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID) {
return SCI_ERR_UNINTIALIZED;
}
try {
int role = gCtrlBlock->getMyRole();
int num_fds = gCtrlBlock->numOfChildrenFds();
log_debug("there are total %d children", num_fds);
if (num_fds > 0) {
int * fd_list = (int *)malloc(num_fds * sizeof(int));
gCtrlBlock->getChildrenSockfds(fd_list);
for(int i=0; i < num_fds; i++) {
log_debug("close child fd %d", fd_list[i]);
::shutdown(fd_list[i], SHUT_RDWR);
::close(fd_list[i]);
}
free(fd_list);
}
if (role != CtrlBlock::FRONT_END) {
if (gInitializer->getInStream() != NULL) {
int p_fd = gInitializer->getInStream()->getSocket();
log_debug("close parent fd %d", p_fd);
::shutdown(p_fd, SHUT_RDWR);
::close(p_fd);
}
}
if (role == CtrlBlock::FRONT_END) {
Message *msg = new Message();
msg->build(SCI_FILTER_NULL, SCI_GROUP_ALL, 0, NULL, NULL, Message::RELEASE);
gCtrlBlock->getRouterInQueue()->produce(msg);
}
gCtrlBlock->term();
delete gNotifier;
delete gInitializer;
delete gCtrlBlock;
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
return SCI_SUCCESS;
}
int SCI_Parentinfo_update(char * parentAddr, int port)
{
int rc = -1;
if ((NULL == parentAddr) || (port <= 0))
return SCI_ERR_UNKNOWN_INFO;
if ((gCtrlBlock->getTermState()) || (!gCtrlBlock->getRecoverMode())
|| (!gCtrlBlock->getParentInfoWaitState()))
return SCI_ERR_INVALID_CALLER;
log_debug("Parentinfo_update: addr = %s, port = %d", parentAddr, port);
rc = gInitializer->updateParentInfo(parentAddr, port);
if (rc != SCI_SUCCESS) {
log_debug("Parentinfo_update: failed to update info, rc = %d", rc);
return rc;
}
return SCI_SUCCESS;
}
int SCI_Parentinfo_wait()
{
if ((gCtrlBlock->getTermState()) || (!gCtrlBlock->getRecoverMode()))
return SCI_ERR_INVALID_CALLER;
gCtrlBlock->setParentInfoWaitState(true);
log_debug("Parentinfo_wait: set the parentInfoWait state to true");
return SCI_SUCCESS;
}
int SCI_Recover_setmode(int mode)
{
gCtrlBlock->setRecoverMode(mode);
log_debug("Recover_setmode: set the recover mode to %d", mode);
return SCI_SUCCESS;
}
// Information Query
int SCI_Query(sci_query_t query, void *ret_val)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if(NULL == ret_val)
return SCI_ERR_UNKNOWN_INFO;
int *p = (int *) ret_val;
switch (query)
{
case CURRENT_VERSION:
*p = gCtrlBlock->getVersion();
break;
case JOB_KEY:
*p = gCtrlBlock->getJobKey();
break;
case NUM_BACKENDS:
if(gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
*p = gCtrlBlock->getTopology()->getBENum();
break;
case BACKEND_ID:
if((gCtrlBlock->getMyRole() != CtrlBlock::BACK_END)
&& (gCtrlBlock->getMyRole() != CtrlBlock::BACK_AGENT))
return SCI_ERR_INVALID_CALLER;
*p = gCtrlBlock->getMyHandle();
break;
case POLLING_FD:
if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT)
return SCI_ERR_INVALID_CALLER;
if (!gCtrlBlock->getObserver())
return SCI_ERR_MODE;
else
*p = gCtrlBlock->getObserver()->getPollFd();
break;
case PIPEWRITE_FD:
if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT)
return SCI_ERR_INVALID_CALLER;
if (!gCtrlBlock->getObserver())
return SCI_ERR_MODE;
else
*p = gCtrlBlock->getObserver()->getPipeWriteFd();
break;
case NUM_FILTERS:
*p = gCtrlBlock->getFilterList()->numOfFilters();
break;
case FILTER_IDLIST:
gCtrlBlock->getFilterList()->retrieveFilterList(p);
break;
case AGENT_ID:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
*p = gCtrlBlock->getMyHandle();
break;
case NUM_SUCCESSORS:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
*p = gCtrlBlock->getRoutingList()->numOfSuccessor(SCI_GROUP_ALL);
break;
case SUCCESSOR_IDLIST:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
gCtrlBlock->getRoutingList()->retrieveSuccessorList(SCI_GROUP_ALL, p);
break;
case HEALTH_STATUS:
if (gCtrlBlock->isEnabled()) {
*p = 0; // 0 - normal
} else {
*p = 1; // 1 - exited
}
break;
case AGENT_LEVEL:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
*p = gCtrlBlock->getTopology()->getLevel();
break;
case LISTENER_PORT:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
if (gInitializer->getListener() == NULL)
return SCI_ERR_INVALID_CALLER;
*p = gInitializer->getListener()->getBindPort();
break;
case NUM_LISTENER_FDS:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
if (gInitializer->getListener() == NULL)
return SCI_ERR_INVALID_CALLER;
*p = gInitializer->getListener()->numOfSockFds();
break;
case LISTENER_FDLIST:
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
if (gInitializer->getListener() == NULL)
return SCI_ERR_INVALID_CALLER;
gInitializer->getListener()->getSockFds(p);
break;
case PARENT_SOCKFD:
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
*p = gInitializer->getInStream()->getSocket();
break;
case NUM_CHILDREN_FDS:
*p = gCtrlBlock->numOfChildrenFds();
break;
case CHILDREN_FDLIST:
gCtrlBlock->getChildrenSockfds(p);
break;
case RECOVER_STATUS:
if (gCtrlBlock->allActive()) {
*p = 1;
} else {
*p = 0;
}
break;
default:
return SCI_ERR_UNKNOWN_INFO;
}
return SCI_SUCCESS;
}
int SCI_Error(int err_code, char *err_msg, int msg_size)
{
if ((err_msg == NULL) || (msg_size <= 0)) {
return SCI_ERR_NO_MEM;
}
memset(err_msg, 0, msg_size);
if (err_code == 0) {
strncpy(err_msg, ErrRetMsg[err_code], msg_size);
return SCI_SUCCESS;
}
if ((err_code <= -2001) && (err_code >= -2030)){
int index = err_code * (-1) % 2000;
strncpy(err_msg, ErrRetMsg[index], msg_size);
return SCI_SUCCESS;
}
if ((err_code <= -5000) && (err_code >= -5005)){
int index = err_code * (-1) % 5000;
strncpy(err_msg, ErrRetMsg[index + 31], msg_size);
return SCI_SUCCESS;
}
return SCI_ERR_MSG;
}
int SCI_Query_errchildren(int *num, int **id_list)
{
return gCtrlBlock->getErrChildren(num, id_list);
}
// Communication
int SCI_Bcast(int filter_id, sci_group_t group, int num_bufs, void *bufs[], int sizes[])
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (group > SCI_GROUP_ALL) {
if (!gCtrlBlock->getTopology()->hasBE((int)group))
return SCI_ERR_GROUP_NOTFOUND;
} else {
if (!gCtrlBlock->getRoutingList()->isGroupExist(group))
return SCI_ERR_GROUP_NOTFOUND;
}
int rc = SCI_SUCCESS;
rc = gCtrlBlock->checkChildHealthState();
if (rc != SCI_SUCCESS)
return rc;
try {
Message *msg = new Message();
msg->build(filter_id, group, num_bufs, (char **)bufs, sizes, Message::COMMAND);
log_debug("Produced a message bcast command, message group=%d, message size=%d",
(int) group, msg->getContentLen());
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
rc = SCI_ERR_NO_MEM;
}
return rc;
}
int SCI_Upload(int filter_id, sci_group_t group, int num_bufs, void *bufs[], int sizes[])
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if ((gCtrlBlock->getMyRole() != CtrlBlock::BACK_END)
&& (gCtrlBlock->getMyRole() != CtrlBlock::BACK_AGENT))
return SCI_ERR_INVALID_CALLER;
try {
Message *msg = new Message();
msg->build(filter_id, group, num_bufs, (char **)bufs, sizes, Message::DATA);
gCtrlBlock->getUpQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
return SCI_SUCCESS;
}
int SCI_Poll(int timeout)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT)
return SCI_ERR_INVALID_CALLER;
sci_mode_t mode;
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END)
mode = gCtrlBlock->getEndInfo()->fe_info.mode;
else
mode = gCtrlBlock->getEndInfo()->be_info.mode;
if (mode != SCI_POLLING)
return SCI_ERR_MODE;
/*
if (gCtrlBlock->getPollQueue()->getSize() == 0) {
// no messages in the polling queue
// TODO
return SCI_ERR_POLL_INVALID;
}
*/
int rc = SCI_SUCCESS;
rc = gCtrlBlock->checkChildHealthState();
if (rc != SCI_SUCCESS)
return rc;
Message *msg = gCtrlBlock->getPollQueue()->consume(timeout);
if (msg) {
switch(msg->getType()) {
case Message::COMMAND:
case Message::DATA:
try {
gHndlr(gParam, msg->getGroup(), msg->getContentBuf(), msg->getContentLen());
} catch (...) {
// TODO
}
gCtrlBlock->getObserver()->unnotify();
break;
case Message::INVALID_POLL:
rc = SCI_ERR_POLL_INVALID;
gCtrlBlock->getObserver()->unnotify();
break;
case Message::SOCKET_BROKEN:
rc = SCI_ERR_CHILD_BROKEN;
log_debug("SCI_Poll: received msg SOCKET_BROKEN");
gCtrlBlock->getObserver()->unnotify();
break;
case Message::ERROR_DATA:
rc = SCI_ERR_DATA;
log_debug("SCI_Poll: received msg ERROR_DATA");
gCtrlBlock->getObserver()->unnotify();
break;
case Message::ERROR_THREAD:
rc = SCI_ERR_THREAD;
log_debug("SCI_Poll: received msg ERROR_THREAD");
gCtrlBlock->getObserver()->unnotify();
break;
default:
rc = SCI_ERR_UNKNOWN_INFO;
log_error("SCI_Poll: received unknown command");
gCtrlBlock->getObserver()->unnotify();
break;
}
gCtrlBlock->getPollQueue()->remove();
} else {
rc = SCI_ERR_POLL_TIMEOUT;
}
return rc;
}
// Group
int SCI_Group_create(int num_bes, int * be_list, sci_group_t * group)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
assert(be_list);
for (int i=0; i<num_bes; i++) {
if (!gCtrlBlock->getTopology()->hasBE(be_list[i]))
return SCI_ERR_BACKEND_NOTFOUND;
}
int msgID;
try {
Packer packer;
packer.packInt(num_bes);
for (int i=0; i<num_bes; i++) {
packer.packInt(be_list[i]);
}
char *bufs[1];
int sizes[1];
bufs[0] = packer.getPackedMsg();
sizes[0] = packer.getPackedMsgLen();
msgID = gNotifier->allocate();
Message *msg = new Message();
gAllocator->allocateGroup(group);
msg->build(SCI_FILTER_NULL, *group, 1, bufs, sizes, Message::GROUP_CREATE, msgID);
delete [] bufs[0];
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_Group_free(sci_group_t group)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (group>=SCI_GROUP_ALL)
return SCI_ERR_GROUP_PREDEFINED;
if (!gCtrlBlock->getRoutingList()->isGroupExist(group))
return SCI_ERR_GROUP_NOTFOUND;
int msgID;
try {
Message *msg = new Message();
msgID = gNotifier->allocate();
msg->build(SCI_FILTER_NULL, group, 0, NULL, NULL, Message::GROUP_FREE, msgID);
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_Group_operate(sci_group_t group1, sci_group_t group2,
sci_op_t op, sci_group_t *newgroup)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (!gCtrlBlock->getRoutingList()->isGroupExist(group1))
return SCI_ERR_GROUP_NOTFOUND;
if (!gCtrlBlock->getRoutingList()->isGroupExist(group2))
return SCI_ERR_GROUP_NOTFOUND;
if ((op!=SCI_UNION) && (op!=SCI_INTERSECTION) && (op!=SCI_DIFFERENCE))
return SCI_ERR_INVALID_OPERATOR;
int msgID;
try {
Packer packer;
packer.packInt((int) op);
packer.packInt((int) group1);
packer.packInt((int) group2);
char *bufs[1];
int sizes[1];
bufs[0] = packer.getPackedMsg();
sizes[0] = packer.getPackedMsgLen();
msgID = gNotifier->allocate();
Message *msg = new Message();
gAllocator->allocateGroup(newgroup);
msg->build(SCI_FILTER_NULL, *newgroup, 1, bufs, sizes, Message::GROUP_OPERATE, msgID);
delete [] bufs[0];
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_Group_operate_ext(sci_group_t group, int num_bes, int *be_list,
sci_op_t op, sci_group_t *newgroup)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (!gCtrlBlock->getRoutingList()->isGroupExist(group))
return SCI_ERR_GROUP_NOTFOUND;
assert(be_list);
for (int i=0; i<num_bes; i++) {
if (!gCtrlBlock->getTopology()->hasBE(be_list[i]))
return SCI_ERR_BACKEND_NOTFOUND;
}
int msgID;
try {
Packer packer;
packer.packInt((int) op);
packer.packInt((int) group);
packer.packInt(num_bes);
for (int i=0; i<num_bes; i++) {
packer.packInt(be_list[i]);
}
char *bufs[1];
int sizes[1];
bufs[0] = packer.getPackedMsg();
sizes[0] = packer.getPackedMsgLen();
msgID = gNotifier->allocate();
Message *msg = new Message();
gAllocator->allocateGroup(newgroup);
msg->build(SCI_FILTER_NULL, *newgroup, 1, bufs, sizes, Message::GROUP_OPERATE_EXT, msgID);
delete [] bufs[0];
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_Group_query(sci_group_t group, sci_group_query_t query, void *ret_val)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END)
return SCI_ERR_INVALID_CALLER;
if (!gCtrlBlock->getRoutingList()->isGroupExist(group))
return SCI_ERR_GROUP_NOTFOUND;
switch(query)
{
case GROUP_MEMBER_NUM:
*((int *) ret_val) = gCtrlBlock->getRoutingList()->numOfBE(group);
break;
case GROUP_MEMBER:
gCtrlBlock->getRoutingList()->retrieveBEList(group, (int *) ret_val);
break;
case GROUP_SUCCESSOR_NUM:
*((int *) ret_val) = gCtrlBlock->getRoutingList()->numOfSuccessor(group);
break;
case GROUP_SUCCESSOR:
gCtrlBlock->getRoutingList()->retrieveSuccessorList(group, (int *) ret_val);
break;
default:
return SCI_ERR_UNKNOWN_INFO;
}
return SCI_SUCCESS;
}
// Filter
int SCI_Filter_load(sci_filter_info_t *filter_info)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
assert(filter_info);
if (filter_info->filter_id == SCI_FILTER_NULL)
return SCI_ERR_FILTER_PREDEFINED;
if (filter_info->filter_id < SCI_FILTER_NULL)
return SCI_ERR_FILTER_ID;
int msgID;
try {
Filter *filter = new Filter(*filter_info);
Message *msg = filter->packMsg();
msgID = gNotifier->allocate();
msg->setID(msgID);
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_Filter_unload(int filter_id)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (filter_id == SCI_FILTER_NULL)
return SCI_ERR_FILTER_PREDEFINED;
if (filter_id < SCI_FILTER_NULL)
return SCI_ERR_FILTER_ID;
int msgID;
try {
Message *msg = new Message();
msgID = gNotifier->allocate();
msg->build(filter_id, SCI_GROUP_ALL, 0, NULL, NULL, Message::FILTER_UNLOAD, msgID);
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_Filter_bcast(int filter_id, int num_successors, int * successor_list, int num_bufs,
void *bufs[], int sizes[])
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END) {
return SCI_ERR_INVALID_CALLER;
}
for (int i=0; i<num_successors; i++) {
if (!gCtrlBlock->getRoutingList()->isSuccessorExist(successor_list[i]))
return SCI_ERR_INVALID_SUCCESSOR;
}
try {
Message *msg = new Message();
int nextFilterID = filter_id;
if (filter_id == SCI_FILTER_NULL) {
nextFilterID = gCtrlBlock->getRouterProcessor()->getCurFilterID();
}
sci_group_t curGroup = gCtrlBlock->getRouterProcessor()->getCurGroup();
msg->build(nextFilterID, curGroup, num_bufs, (char **)bufs, sizes, Message::COMMAND);
msg->setRefCount(num_successors);
gCtrlBlock->getRoutingList()->mcast(msg, successor_list, num_successors);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
return SCI_SUCCESS;
}
int SCI_Filter_upload(int filter_id, sci_group_t group, int num_bufs, void *bufs[], int sizes[])
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END) {
return SCI_ERR_INVALID_CALLER;
}
try {
Filter *filter = NULL;
if (filter_id != SCI_FILTER_NULL) {
filter = gCtrlBlock->getFilterList()->getFilter(filter_id);
}
int curFilterID = gCtrlBlock->getFilterProcessor()->getCurFilterID();
Message *msg = new Message();
msg->build(curFilterID, group, num_bufs, (char **)bufs, sizes, Message::DATA);
if (filter) {
filter->input(group, msg->getContentBuf(), msg->getContentLen());
delete msg;
} else {
gCtrlBlock->getFilterProcessor()->deliever(msg);
}
} catch (Exception &e) {
return SCI_ERR_EXCEPTION;
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
return SCI_SUCCESS;
}
// Dynamic Back End
int SCI_BE_add(sci_be_t *be)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (be->id >= 0) { // user-assigned back end id
if (gCtrlBlock->getTopology()->hasBE(be->id))
return SCI_ERR_BACKEND_EXISTED;
} else { // SCI allocated back end id
gAllocator->allocateBE(&(be->id));
}
int msgID;
try {
Packer packer;
packer.packStr(be->hostname);
packer.packInt(be->level);
char *bufs[1];
int sizes[1];
bufs[0] = packer.getPackedMsg();
sizes[0] = packer.getPackedMsgLen();
Message *msg = new Message();
msgID = gNotifier->allocate();
msg->build(SCI_FILTER_NULL, (sci_group_t) (be->id), 1, bufs, sizes, Message::BE_ADD, msgID);
delete [] bufs[0];
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}
int SCI_BE_remove(int be_id)
{
if (gCtrlBlock->getMyRole() == CtrlBlock::INVALID)
return SCI_ERR_UNINTIALIZED;
if (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END)
return SCI_ERR_INVALID_CALLER;
if (!gCtrlBlock->getTopology()->hasBE(be_id))
return SCI_ERR_BACKEND_NOTFOUND;
int msgID;
try {
Message *msg = new Message();
msgID = gNotifier->allocate();
msg->build(SCI_FILTER_NULL, (sci_group_t) be_id, 0, NULL, NULL, Message::BE_REMOVE, msgID);
gCtrlBlock->getRouterInQueue()->produce(msg);
} catch (std::bad_alloc) {
return SCI_ERR_NO_MEM;
}
int rc;
gNotifier->freeze(msgID, &rc);
return rc;
}