blob: a59abc215fdba37de05f7786ab642c13f9fcf84e [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: RoutingList
Description: Provide routing services for all threads.
Author: Nicole Nie, Liu Wei, Tu HongJ
History:
Date Who ID Description
-------- --- --- -----------
05/08/09 nieyy Initial code (D156654)
01/16/12 ronglli Add codes to retrieve BE list
****************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <vector>
#include "log.hpp"
#include "packer.hpp"
#include "group.hpp"
#include "tools.hpp"
#include "exception.hpp"
#include "stream.hpp"
#include "sshfunc.hpp"
#include "routinglist.hpp"
#include "eventntf.hpp"
#include "initializer.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "ctrlblock.hpp"
#include "filterproc.hpp"
#include "readerproc.hpp"
#include "topology.hpp"
#include "writerproc.hpp"
#include "eventntf.hpp"
#include "dgroup.hpp"
using namespace std;
const int MAX_SUCCESSOR_NUM = 1024;
const int MAX_SEGMENT_SIZE = 11680;
const int MIN_SEGMENT_SIZE = 1440; // 1500 - 40 - 20 (ethernet MTU - tcp/ip header - message header)
RoutingList::RoutingList(int hndl)
: handle(hndl), maxSegmentSize(MAX_SEGMENT_SIZE), filterProc(NULL), myDistriGroup(NULL), topology(NULL)
{
char *envp = ::getenv("SCI_SEGMENT_SIZE");
if (envp != NULL) {
maxSegmentSize = atoi(envp);
maxSegmentSize = maxSegmentSize > MIN_SEGMENT_SIZE ? maxSegmentSize : MIN_SEGMENT_SIZE;
}
if (handle == -1) {
// this is a front end, not parent
myDistriGroup = new DistributedGroup(0);
} else {
int pid = -1;
envp = ::getenv("SCI_PARENT_ID");
if (envp) {
pid = ::atoi(envp);
} else {
throw Exception(Exception::INVALID_LAUNCH);
}
myDistriGroup = new DistributedGroup(pid);
}
if (gCtrlBlock->getMyRole() != CtrlBlock::BACK_END) {
topology = new Topology(0); // 0 is an impossible agent ID so it will be changed when it receives a real topology
}
successorList = new int[MAX_SUCCESSOR_NUM];
queueInfo.clear();
routers.clear();
::pthread_mutex_init(&mtx, NULL);
}
RoutingList::~RoutingList()
{
delete myDistriGroup;
delete [] successorList;
delete topology;
::pthread_mutex_destroy(&mtx);
}
void RoutingList::parseCmd(Message *msg)
{
bool notify = false;
int rc = SCI_SUCCESS;
if (msg->getType() == Message::GROUP_CREATE) {
Packer packer(msg->getContentBuf());
int num_bes = packer.unpackInt();
int be_list[num_bes];
for (int i=0; i<num_bes; i++) {
be_list[i] = packer.unpackInt();
}
myDistriGroup->create(num_bes, be_list, msg->getGroup());
bcast(msg->getGroup(), msg);
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_FREE) {
sci_group_t group = msg->getGroup();
bcast(group, msg);
myDistriGroup->remove(group);
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_OPERATE) {
Packer packer(msg->getContentBuf());
sci_op_t op = (sci_op_t) packer.unpackInt();
sci_group_t group1 = (sci_group_t) packer.unpackInt();
sci_group_t group2 = (sci_group_t) packer.unpackInt();
rc = myDistriGroup->operate(group1, group2, op, msg->getGroup());
if (rc == SCI_SUCCESS) {
bcast(msg->getGroup(), msg);
}
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_OPERATE_EXT) {
Packer packer(msg->getContentBuf());
sci_op_t op = (sci_op_t) packer.unpackInt();
sci_group_t group = (sci_group_t) packer.unpackInt();
int num_bes = packer.unpackInt();
int be_list[num_bes];
for (int i=0; i<num_bes; i++) {
be_list[i] = packer.unpackInt();
}
rc = myDistriGroup->operateExt(group, num_bes, be_list, op, msg->getGroup());
if (rc == SCI_SUCCESS) {
bcast(msg->getGroup(), msg);
}
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_DROP) {
myDistriGroup->dropSuccessor(msg->getID());
} else if (msg->getType() == Message::GROUP_MERGE) {
DistributedGroup subDistriGroup(-1);
subDistriGroup.unpackMsg(*msg);
if (subDistriGroup.getPID() == handle) {
// if this message is from my son
myDistriGroup->merge(msg->getID(), subDistriGroup, false);
} else if (isSuccessorExist(subDistriGroup.getPID())){
// if this message is from my grandson
myDistriGroup->merge(msg->getID(), subDistriGroup, false);
} else {
// if this message is from my nephew
myDistriGroup->merge(msg->getID(), subDistriGroup, true);
// now update its parent id to me
subDistriGroup.setPID(handle);
// repack a message and send to my parent
Message *newmsg = subDistriGroup.packMsg();
filterProc->getOutQueue()->produce(newmsg);
}
} else {
assert(!"should never be here");
}
if (notify) {
void *ret = gNotifier->getRetVal(msg->getID());
*((int *) ret) = rc;
gNotifier->notify(msg->getID());
}
}
void RoutingList::propagateGroupInfo()
{
// propgate my group information to my parent
Message *msg = myDistriGroup->packMsg();
if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
filterProc->getOutQueue()->produce(msg);
} else if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END) {
gCtrlBlock->getUpQueue()->produce(msg);
} else {
assert(!"should not be here");
}
}
int RoutingList::getSegments(Message *msg, Message ***segments, int ref)
{
int i = 0;
int segnum = (msg->getContentLen() + maxSegmentSize - 1) / maxSegmentSize + 1;
int size = 0;
char *ptr = msg->getContentBuf();
sci_group_t gid = msg->getGroup();
Message::Type typ = msg->getType();
int mid = msg->getID();
int mfid = msg->getFilterID();
int hfid = mfid;
int mlen = msg->getContentLen();
*segments = (Message **)::malloc(segnum * sizeof(Message *));
Message **segs = *segments;
if ((mfid != SCI_FILTER_NULL) || (typ != Message::COMMAND)) {
hfid = SCI_JOIN_SEGMENT;
}
::memset(segs, 0, segnum * sizeof(Message *));
segs[0] = new Message();
segs[0]->build(hfid, gid, 0, NULL, NULL, Message::SEGMENT, segnum);
segs[0]->setRefCount(ref);
for (i = 1; i < segnum; i++) {
segs[i] = new Message();
size = (i < (segnum - 1)) ? maxSegmentSize : ((mlen - 1) % maxSegmentSize + 1);
segs[i]->build(mfid, gid, 1, &ptr, &size, typ, mid);
segs[i]->setRefCount(ref);
ptr += size;
}
return segnum;
}
int RoutingList::bcast(sci_group_t group, Message *msg)
{
if (group > SCI_GROUP_ALL) {
int hndl = querySuccessorId((int) group);
if (hndl == INVLIDSUCCESSORID) {
return SCI_ERR_GROUP_NOTFOUND;
} else if (hndl == VALIDBACKENDIDS) {
ucast((int)group, msg);
} else {
ucast(hndl, msg);
}
return SCI_SUCCESS;
}
if (!isGroupExist(group)) {
return SCI_ERR_GROUP_NOTFOUND;
}
splitBcast(group, msg);
return SCI_SUCCESS;
}
void RoutingList::splitBcast(sci_group_t group, Message *msg)
{
int numSor = numOfSuccessor(group);
retrieveSuccessorList(group, successorList);
mcast(msg, successorList, numSor);
}
void RoutingList::mcast(Message *msg, int *sorList, int num)
{
int i = 0;
if (msg->getContentLen() <= maxSegmentSize) {
msg->setRefCount(msg->getRefCount() + num);
for (i = 0; i < num; i++) {
queryQueue(sorList[i])->produce(msg);
}
return;
}
Message **segments;
int segnum = getSegments(msg, &segments, num);
for (i = 0; i < num; i++) {
queryQueue(sorList[i])->multiProduce(segments, segnum);
}
::free(segments);
}
void RoutingList::ucast(int successor_id, Message *msg, int refInc)
{
log_debug("Processor Router: send msg to successor %d", successor_id);
mcast(msg, &successor_id, refInc);
return;
}
void RoutingList::initSubGroup(int successor_id, int start_be_id, int end_be_id)
{
char qName[64] = {0};
MessageQueue *queue = NULL;
if (successor_id != VALIDBACKENDIDS) {
queue = new MessageQueue();
::sprintf(qName, "Agent%d_inQ", successor_id);
queue->setName(qName);
mapQueue(successor_id, queue);
} else {
int i = 0;
for (i = start_be_id; i <= end_be_id; i++) {
queue = new MessageQueue();
::sprintf(qName, "BE%d_inQ", i);
queue->setName(qName);
mapQueue(i, queue);
}
}
myDistriGroup->initSubGroup(successor_id, start_be_id, end_be_id);
}
int RoutingList::startReading(int hndl)
{
ROUTING_MAP::iterator it = routers.find(hndl);
assert(it != routers.end());
ReaderProcessor *reader = it->second.processor->getPeerProcessor();
reader->start();
return 0;
}
int RoutingList::startReaders()
{
ReaderProcessor *reader = NULL;
ROUTING_MAP::iterator pit;
for (pit = routers.begin(); pit != routers.end(); ++pit) {
while (pit->second.processor == NULL) {
SysUtil::sleep(WAIT_INTERVAL);
}
reader = pit->second.processor->getPeerProcessor();
while (reader == NULL) {
SysUtil::sleep(WAIT_INTERVAL);
reader = pit->second.processor->getPeerProcessor();
}
reader->start();
}
return 0;
}
int RoutingList::numOfStreams()
{
int size;
lock();
size = routers.size();
unlock();
return size;
}
int RoutingList::getStreamsSockfds(int *fds)
{
int i = 0;
ROUTING_MAP::iterator it;
lock();
for (it = routers.begin(); it != routers.end(); ++it) {
fds[i] = it->second.stream->getSocket();
i++;
}
unlock();
return i;
}
int RoutingList::isActiveSockfd(int fd)
{
int isSocket = 0;
ROUTING_MAP::iterator it;
lock();
for (it = routers.begin(); it != routers.end(); ++it) {
if (fd == it->second.stream->getSocket()) {
if ((it->second.stream->isReadActive()) || (it->second.stream->isWriteActive()) ){
isSocket = 1;
break;
}
}
}
unlock();
return isSocket;
}
bool RoutingList::allActive()
{
bool active = true;
ROUTING_MAP::iterator it;
lock();
for (it = routers.begin(); it != routers.end(); ++it) {
if ((!(it->second.stream->isReadActive())) || (!(it->second.stream->isWriteActive())) ){
active = false;
break;
}
}
unlock();
return active;
}
void RoutingList::mapRouters(int hndl, WriterProcessor *writer, Stream *stream)
{
lock();
if (writer != NULL) {
routers[hndl].processor = writer;
}
routers[hndl].stream = stream;
unlock();
}
int RoutingList::startRouting(int hndl, Stream *stream)
{
char name[64] = {0};
MessageQueue *inQ = queryQueue(hndl);
while (inQ == NULL) {
SysUtil::sleep(WAIT_INTERVAL);
inQ = queryQueue(hndl);
}
WriterProcessor *writer = NULL;
if ((routers.find(hndl) != routers.end()) && (routers[hndl].stream != NULL)) {
if (!gCtrlBlock->getRecoverMode()) {
log_error("Duplicated client are trying to connect!!!");
return -1;
}
writer = routers[hndl].processor;
while(!(writer->getRecoverState())) {
SysUtil::sleep(WAIT_INTERVAL);
}
writer->setOutStream(stream);
mapRouters(hndl, NULL, stream);
return 0;
}
log_debug("routers[%d].stream = %p", hndl, stream);
ReaderProcessor *reader = new ReaderProcessor(hndl);
reader->setInStream(stream);
reader->setOutQueue(filterProc->getInQueue());
::sprintf(name, "Reader%d", hndl);
reader->setName(name);
writer = new WriterProcessor(hndl);
writer->setInQueue(inQ);
writer->setOutStream(stream);
::sprintf(name, "Writer%d", hndl);
writer->setName(name);
// reader is a peer processor of writer
writer->setPeerProcessor(reader);
reader->setPeerProcessor(writer);
mapRouters(hndl, writer, stream);
log_debug("The Reader%d thread has been newed!", hndl);
writer->start();
reader->start();
return 0;
}
int RoutingList::stopRouting(int hndl)
{
ROUTING_MAP::iterator pit = routers.find(hndl);
if (pit != routers.end()) {
pit->second.processor->release();
delete pit->second.processor;
routers.erase(hndl);
queueInfo.erase(hndl);
}
return 0;
}
int RoutingList::stopRouting()
{
// waiting for all processor threads terminate
ROUTING_MAP::iterator pit;
for (pit = routers.begin(); pit != routers.end(); ++pit) {
pit->second.processor->release();
delete pit->second.processor;
}
routers.clear();
queueInfo.clear();
return 0;
}
bool RoutingList::allRouted()
{
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) {
char *envp = getenv("SCI_EMBED_AGENT");
if ((envp != NULL) && (strcasecmp(envp, "yes") == 0)) {
return (numOfQueues() == (numOfStreams() + 1)); // queueInfo contains itself
}
}
return (numOfQueues() == numOfStreams());
}
void RoutingList::addBE(sci_group_t group, int successor_id, int be_id, bool init)
{
if (init) {
char qName[64] = {0};
int qID = 0;
MessageQueue *queue = new MessageQueue();
if (successor_id == VALIDBACKENDIDS) {
qID = be_id;
::sprintf(qName, "BE%d_inQ", qID);
} else {
qID = successor_id;
::sprintf(qName, "Agent%d_inQ", qID);
}
queue->setName(qName);
mapQueue(qID, queue);
}
myDistriGroup->addBE(group, successor_id, be_id);
}
void RoutingList::removeBE(int be_id)
{
myDistriGroup->removeBE(be_id);
}
void RoutingList::removeGroup(sci_group_t group)
{
myDistriGroup->remove(group);
}
void RoutingList::updateParentId(int pid)
{
::setenv("SCI_PARENT_ID", SysUtil::itoa(pid).c_str(), 1);
myDistriGroup->setPID(pid);
}
bool RoutingList::isGroupExist(sci_group_t group)
{
return myDistriGroup->isGroupExist(group);
}
bool RoutingList::isSuccessorExist(int successor_id)
{
return myDistriGroup->isSuccessorExist(successor_id);
}
int RoutingList::numOfBE(sci_group_t group)
{
return myDistriGroup->numOfBE(group);
}
int RoutingList::numOfSuccessor(sci_group_t group)
{
return myDistriGroup->numOfSuccessor(group);
}
int RoutingList::numOfBEOfSuccessor(int successor_id)
{
if (successor_id >= 0) {
// if it is a back end
return 1;
}
return myDistriGroup->numOfBEOfSuccessor(successor_id);
}
int RoutingList::querySuccessorId(int be_id)
{
return myDistriGroup->querySuccessorId(be_id);
}
void RoutingList::retrieveBEList(sci_group_t group, int * ret_val)
{
assert(ret_val);
myDistriGroup->retrieveBEList(group, ret_val);
}
void RoutingList::retrieveSuccessorList(sci_group_t group, int * ret_val)
{
assert(ret_val);
myDistriGroup->retrieveSuccessorList(group, ret_val);
}
void RoutingList::retrieveBEListOfSuccessor(int successor_id, int * ret_val)
{
assert(ret_val);
myDistriGroup->retrieveBEListOfSuccessor(successor_id, ret_val);
}
void RoutingList::mapQueue(int hndl, MessageQueue *queue)
{
lock();
queueInfo[hndl] = queue;
unlock();
}
MessageQueue * RoutingList::queryQueue(int hndl)
{
MessageQueue *queue = NULL;
lock();
QUEUE_MAP::iterator qit = queueInfo.find(hndl);
if (qit != queueInfo.end()) {
queue = (*qit).second;
}
unlock();
return queue;
}
int RoutingList::numOfQueues()
{
int size;
lock();
size = queueInfo.size();
unlock();
return size;
}
void RoutingList::lock()
{
::pthread_mutex_lock(&mtx);
}
void RoutingList::unlock()
{
::pthread_mutex_unlock(&mtx);
}
Topology * RoutingList::getTopology()
{
return topology;
}
FilterProcessor * RoutingList::getFilterProcessor()
{
return filterProc;
}
void RoutingList::setFilterProcessor(FilterProcessor *proc)
{
filterProc = proc;
}