blob: 163c20ca831e152f353596c8f071c69fbe26c291 [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: RoutingList
Description: Provide routing services for all threads.
Author: Nicole Nie, Liu Wei
History:
Date Who ID Description
-------- --- --- -----------
05/08/09 nieyy Initial code (D156654)
****************************************************************************/
#include "routinglist.hpp"
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <vector>
using namespace std;
#include "log.hpp"
#include "packer.hpp"
#include "group.hpp"
#include "tools.hpp"
#include "exception.hpp"
#include "stream.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "ctrlblock.hpp"
#include "eventntf.hpp"
#include "dgroup.hpp"
const int MAX_SUCCESSOR_NUM = 1024;
const int TCP_ETHERNET_MTU = 1460;
RoutingList * RoutingList::instance = NULL;
RoutingList * RoutingList::getInstance()
{
if (instance == NULL) {
instance = new RoutingList();
}
return instance;
}
RoutingList::RoutingList()
: maxSegmentSize(TCP_ETHERNET_MTU * 32)
{
int hndl = gCtrlBlock->getMyHandle();
char *envp = ::getenv("SCI_SEGMENT_SIZE");
if (envp != NULL) {
maxSegmentSize = atoi(envp);
maxSegmentSize = maxSegmentSize > TCP_ETHERNET_MTU ? maxSegmentSize : TCP_ETHERNET_MTU * 32;
}
if (hndl == -1) {
// this is a front end, not parent
myDistriGroup = new DistributedGroup(0);
} else {
int pid = -1;
char *envp = ::getenv("SCI_PARENT_ID");
if (envp) {
pid = ::atoi(envp);
} else {
throw Exception(Exception::INVALID_LAUNCH);
}
myDistriGroup = new DistributedGroup(pid);
}
successorList = new int[MAX_SUCCESSOR_NUM];
}
RoutingList::~RoutingList()
{
delete myDistriGroup;
delete [] successorList;
instance = NULL;
}
void RoutingList::parseCmd(Message *msg)
{
bool notify = false;
int rc = SCI_SUCCESS;
if (msg->getType() == Message::GROUP_CREATE) {
Packer packer(msg->getContentBuf());
int num_bes = packer.unpackInt();
int be_list[num_bes];
for (int i=0; i<num_bes; i++) {
be_list[i] = packer.unpackInt();
}
myDistriGroup->create(num_bes, be_list, msg->getGroup());
bcast(msg->getGroup(), msg);
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_FREE) {
sci_group_t group = msg->getGroup();
bcast(group, msg);
myDistriGroup->remove(group);
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_OPERATE) {
Packer packer(msg->getContentBuf());
sci_op_t op = (sci_op_t) packer.unpackInt();
sci_group_t group1 = (sci_group_t) packer.unpackInt();
sci_group_t group2 = (sci_group_t) packer.unpackInt();
rc = myDistriGroup->operate(group1, group2, op, msg->getGroup());
if (rc == SCI_SUCCESS) {
bcast(msg->getGroup(), msg);
}
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_OPERATE_EXT) {
Packer packer(msg->getContentBuf());
sci_op_t op = (sci_op_t) packer.unpackInt();
sci_group_t group = (sci_group_t) packer.unpackInt();
int num_bes = packer.unpackInt();
int be_list[num_bes];
for (int i=0; i<num_bes; i++) {
be_list[i] = packer.unpackInt();
}
rc = myDistriGroup->operateExt(group, num_bes, be_list, op, msg->getGroup());
if (rc == SCI_SUCCESS) {
bcast(msg->getGroup(), msg);
}
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
notify = true;
}
} else if (msg->getType() == Message::GROUP_DROP) {
myDistriGroup->dropSuccessor(msg->getID());
} else if (msg->getType() == Message::GROUP_MERGE) {
DistributedGroup subDistriGroup(-1);
subDistriGroup.unpackMsg(*msg);
if (subDistriGroup.getPID() == gCtrlBlock->getMyHandle()) {
// if this message is from my son
myDistriGroup->merge(msg->getID(), subDistriGroup, false);
} else if (isSuccessorExist(subDistriGroup.getPID())){
// if this message is from my grandson
myDistriGroup->merge(msg->getID(), subDistriGroup, false);
} else {
// if this message is from my nephew
myDistriGroup->merge(msg->getID(), subDistriGroup, true);
// now update its parent id to me
subDistriGroup.setPID(gCtrlBlock->getMyHandle());
// repack a message and send to my parent
Message *newmsg = subDistriGroup.packMsg();
gCtrlBlock->getFilterOutQueue()->produce(newmsg);
}
} else {
assert(!"should never be here");
}
if (notify) {
void *ret = gNotifier->getRetVal(msg->getID());
*((int *) ret) = rc;
gNotifier->notify(msg->getID());
}
}
void RoutingList::propagateGroupInfo()
{
// propgate my group information to my parent
Message *msg = myDistriGroup->packMsg();
if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
gCtrlBlock->getFilterOutQueue()->produce(msg);
} else if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END) {
gCtrlBlock->getUpQueue()->produce(msg);
} else {
assert(!"should not be here");
}
}
int RoutingList::getSegments(Message *msg, Message ***segments, int ref)
{
int i = 0;
int segnum = (msg->getContentLen() + maxSegmentSize - 1) / maxSegmentSize + 1;
int size = 0;
char *ptr = msg->getContentBuf();
sci_group_t gid = msg->getGroup();
Message::Type typ = msg->getType();
int mid = msg->getID();
int mfid = msg->getFilterID();
int hfid = mfid;
int mlen = msg->getContentLen();
*segments = (Message **)::malloc(segnum * sizeof(Message *));
Message **segs = *segments;
if ((mfid != SCI_FILTER_NULL) || (typ != Message::COMMAND)) {
hfid = SCI_JOIN_SEGMENT;
}
::memset(segs, 0, segnum * sizeof(Message *));
segs[0] = new Message();
segs[0]->build(hfid, gid, 0, NULL, NULL, Message::SEGMENT, segnum);
segs[0]->setRefCount(ref);
for (i = 1; i < segnum; i++) {
segs[i] = new Message();
size = (i < (segnum - 1)) ? maxSegmentSize : (mlen % maxSegmentSize);
segs[i]->build(mfid, gid, 1, &ptr, &size, typ, mid);
segs[i]->setRefCount(ref);
ptr += size;
}
return segnum;
}
int RoutingList::bcast(sci_group_t group, Message *msg)
{
if (group > SCI_GROUP_ALL) {
int hndl = querySuccessorId((int) group);
if (hndl == INVLIDSUCCESSORID) {
return SCI_ERR_GROUP_NOTFOUND;
} else if (hndl == VALIDBACKENDIDS) {
ucast((int)group, msg);
} else {
ucast(hndl, msg);
}
return SCI_SUCCESS;
}
if (!isGroupExist(group)) {
return SCI_ERR_GROUP_NOTFOUND;
}
splitBcast(group, msg);
return SCI_SUCCESS;
}
void RoutingList::splitBcast(sci_group_t group, Message *msg)
{
int numSor = numOfSuccessor(group);
retrieveSuccessorList(group, successorList);
if (msg->getContentLen() <= (maxSegmentSize * 3 / 2)) {
int i = 0;
// include the original queue
for (i = 0; i < numSor; i++) {
ucast(successorList[i], msg, numSor);
}
} else {
mcast(msg, successorList, numSor);
}
}
void RoutingList::mcast(Message *msg, int *sorList, int num)
{
int i = 0;
Message **segments;
int segnum = getSegments(msg, &segments, num);
for (i = 0; i < num; i++) {
gCtrlBlock->queryQueue(successorList[i])->multiProduce(segments, segnum);
}
::free(segments);
if (msg->decRefCount() == 0) {
delete msg;
}
}
void RoutingList::ucast(int successor_id, Message *msg, int refInc)
{
log_debug("Processor Router: send msg to successor %d", successor_id);
if (msg->getContentLen() <= (maxSegmentSize * 3 / 2)) {
msg->setRefCount(msg->getRefCount() + refInc);
gCtrlBlock->queryQueue(successor_id)->produce(msg);
} else {
mcast(msg, &successor_id, 1);
}
return;
}
void RoutingList::initSubGroup(int successor_id, int start_be_id, int end_be_id)
{
myDistriGroup->initSubGroup(successor_id, start_be_id, end_be_id);
}
void RoutingList::addBE(sci_group_t group, int successor_id, int be_id)
{
myDistriGroup->addBE(group, successor_id, be_id);
}
void RoutingList::removeBE(int be_id)
{
myDistriGroup->removeBE(be_id);
}
void RoutingList::removeGroup(sci_group_t group)
{
myDistriGroup->remove(group);
}
void RoutingList::updateParentId(int pid)
{
::setenv("SCI_PARENT_ID", SysUtil::itoa(pid).c_str(), 1);
myDistriGroup->setPID(pid);
}
bool RoutingList::isGroupExist(sci_group_t group)
{
return myDistriGroup->isGroupExist(group);
}
bool RoutingList::isSuccessorExist(int successor_id)
{
return myDistriGroup->isSuccessorExist(successor_id);
}
int RoutingList::numOfBE(sci_group_t group)
{
return myDistriGroup->numOfBE(group);
}
int RoutingList::numOfSuccessor(sci_group_t group)
{
return myDistriGroup->numOfSuccessor(group);
}
int RoutingList::numOfBEOfSuccessor(int successor_id)
{
if (successor_id >= 0) {
// if it is a back end
return 1;
}
return myDistriGroup->numOfBEOfSuccessor(successor_id);
}
int RoutingList::querySuccessorId(int be_id)
{
return myDistriGroup->querySuccessorId(be_id);
}
void RoutingList::retrieveBEList(sci_group_t group, int * ret_val)
{
assert(ret_val);
myDistriGroup->retrieveBEList(group, ret_val);
}
void RoutingList::retrieveSuccessorList(sci_group_t group, int * ret_val)
{
assert(ret_val);
myDistriGroup->retrieveSuccessorList(group, ret_val);
}