blob: 03b1ecb3fb5b8375be7e0daabc98cc062ad455f6 [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: BEMap, Topology, Launcher
Description: Runtime topology manipulation.
Author: Nicole Nie, Liu Wei, Tu HongJ
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 nieyy Initial code (D153875)
****************************************************************************/
#include "topology.hpp"
#include <stdlib.h>
#include <math.h>
#include <assert.h>
#include <ctype.h>
#include <string.h>
#include <unistd.h>
#include <pwd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "log.hpp"
#include "tools.hpp"
#include "packer.hpp"
#include "socket.hpp"
#include "exception.hpp"
#include "ipconverter.hpp"
#include "ctrlblock.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "routinglist.hpp"
#include "processor.hpp"
#include "eventntf.hpp"
#include "listener.hpp"
#include "parent.hpp"
const int ONE_KK = 1024 * 1024;
const int MAX_FD = 1024;
const int SCI_DAEMON_PORT = 6688;
int BEMap::input(const char * filename, int num)
{
FILE *fp = NULL;
fp = ::fopen(filename,"r");
if (NULL == fp) {
return SCI_ERR_INVALID_HOSTFILE;
}
int rc = ::fseek(fp, 0, SEEK_END); //go to end
if (rc != 0) {
return SCI_ERR_INVALID_HOSTFILE;
}
long len = ::ftell(fp); //get position at end (length)
if (len <= 0) {
return SCI_ERR_INVALID_HOSTFILE;
}
rc = ::fseek(fp, 0, SEEK_SET); //go to begin
if (rc != 0) {
return SCI_ERR_INVALID_HOSTFILE;
}
char *text = new char[len+1]; //allocate buffer
::fread(text, len, 1, fp); //read into buffer
::fclose(fp);
// mark end with '\n\0'
text[len-1] = '\n'; // mark end
text[len] = '\0';
map<string, string> hostCache;
map<string, string>::iterator it;
log_debug("Hostlist is: ");
int index = 0;
char *pPrev = text, *pNext = text;
while (pNext <= (text + len)) {
if (index >= num) {
break;
}
pNext++;
if ((*pNext) == '\n') {
*pNext = '\0';
// ignore tabs
while (((*pPrev) == ' ') || ((*pPrev) == '\t')) {
pPrev++;
}
// ignore line with '#' as its first char
if (((*pPrev) != '\0') && ((*pPrev) != '\n') && ((*pPrev) != '#')) {
log_debug("%s", pPrev);
string key = pPrev;
it = hostCache.find(key);
if (it == hostCache.end()) {
hostCache[key] = pPrev;
(*this)[index++] = pPrev;
} else {
(*this)[index++] = (*it).second;
}
}
pPrev = pNext+1;
}
}
hostCache.clear();
delete [] text;
return SCI_SUCCESS;
}
Topology::Topology(int id)
: agentID(id)
{
beMap.clear();
weightMap.clear();
}
Topology::~Topology()
{
beMap.clear();
weightMap.clear();
}
Message * Topology::packMsg()
{
Packer packer;
packer.packInt(agentID);
packer.packInt(fanOut);
packer.packInt(level);
packer.packInt(height);
packer.packStr(bePath);
packer.packStr(agentPath);
BEMap::iterator it;
packer.packInt(beMap.size());
for (it = beMap.begin(); it != beMap.end(); ++it) {
packer.packInt((*it).first);
packer.packStr((*it).second);
}
char *bufs[1];
int sizes[1];
bufs[0] = packer.getPackedMsg();
sizes[0] = packer.getPackedMsgLen();
Message *msg = new Message(Message::CONFIG);
msg->build(SCI_FILTER_NULL, SCI_GROUP_ALL, 1, bufs, sizes, Message::CONFIG);
delete [] bufs[0];
return msg;
}
Topology & Topology::unpackMsg(Message &msg)
{
int i, id, size;
Packer packer(msg.getContentBuf());
agentID = packer.unpackInt();
fanOut = packer.unpackInt();
level = packer.unpackInt();
height = packer.unpackInt();
bePath = packer.unpackStr();
agentPath = packer.unpackStr();
size = packer.unpackInt();
for (i = 0; i < size; i++) {
id = packer.unpackInt();
beMap[id] = packer.unpackStr();
}
return *this;
}
int Topology::init()
{
int rc;
char *envp = NULL;
// check host file & num of be
char *hostfile = gCtrlBlock->getEndInfo()->fe_info.hostfile;
if ((envp = ::getenv("SCI_HOST_FILE")) != NULL) {
hostfile = envp;
}
if (hostfile == NULL) {
hostfile = "host.list";
}
int numItem = ONE_KK;
if ((envp = ::getenv("SCI_BACKEND_NUM")) != NULL) {
numItem = ::atoi(envp);
}
rc = beMap.input(hostfile, numItem);
if (rc != SCI_SUCCESS) {
return rc;
}
// check fanout
fanOut = 32;
if ((envp = ::getenv("SCI_DEBUG_FANOUT")) != NULL) {
fanOut = ::atoi(envp);
}
level = 0;
height = (int) ::ceil(::log((double)beMap.size()) / ::log((double)fanOut));
// check be path
if ((envp = ::getenv("SCI_BACKEND_PATH")) != NULL) {
bePath = envp;
} else {
if (gCtrlBlock->getEndInfo()->fe_info.bepath != NULL) {
bePath = gCtrlBlock->getEndInfo()->fe_info.bepath;
} else {
return SCI_ERR_UNKNOWN_INFO;
}
}
// check agent path
#ifdef __64BIT__
const char *agentName = "scia64";
#else
const char *agentName = "scia";
#endif
if ((envp = ::getenv("SCI_AGENT_PATH")) != NULL) {
agentPath = envp;
agentPath += "/";
agentPath += agentName;
} else {
agentPath = SysUtil::get_path_name(agentName);
}
return SCI_SUCCESS;
}
int Topology::deploy()
{
Launcher launcher(*this);
nextAgentID = (agentID + 1) * fanOut - 2; // A formular to calculate the agentID of the first child
int rc = launcher.launch();
if (rc == SCI_SUCCESS) {
// upload my hostname & port info to my parent for uncle collection purpose
if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
gCtrlBlock->genSelfInfo(gCtrlBlock->getFilterOutQueue(), true);
}
}
rc = launcher.syncWaiting(getBENum());
return rc;
}
int Topology::addBE(Message *msg)
{
assert(msg);
Packer packer(msg->getContentBuf());
char *host = packer.unpackStr();
int id = (int) msg->getGroup();
// find the first child agent with weight < fanOut
int aID = INVLIDSUCCESSORID;
map<int, int>::iterator it = weightMap.begin();
for (; it!=weightMap.end(); ++it) {
int weight = (*it).second;
if (!isFullTree(weight)) {
aID = (*it).first;
break;
}
}
int rc = SCI_SUCCESS;
if (aID == INVLIDSUCCESSORID) {
// if do not find
Launcher launcher(*this);
if (weightMap.size() == 0) { // if this agent does not have any child agents, launch a back end
rc = launcher.launchBE(id, host);
} else { // if this agent has child agent(s), launch an agent
rc = launcher.launchAgent(id, host);
}
launcher.syncWaiting(1);
} else {
// otherwise delegate this command
gRoutingList->addBE(SCI_GROUP_ALL, aID, id);
gRoutingList->ucast(aID, msg);
incWeight(aID);
}
if (rc == SCI_SUCCESS) {
beMap[id] = host;
}
return rc;
}
int Topology::removeBE(Message *msg)
{
assert(msg);
int id = (int) msg->getGroup();
if (!hasBE(id)) {
return SCI_ERR_BACKEND_NOTFOUND;
}
int aID = gRoutingList->querySuccessorId(id);
assert(aID != INVLIDSUCCESSORID);
gRoutingList->removeBE(id);
if (aID == VALIDBACKENDIDS) {
gRoutingList->ucast(id, msg);
} else {
gRoutingList->ucast(aID, msg);
decWeight(aID);
}
beMap.erase(id);
return SCI_SUCCESS;
}
int Topology::getBENum()
{
return beMap.size();
}
int Topology::getLevel()
{
return level;
}
bool Topology::hasBE(int beID)
{
if (beMap.find(beID) != beMap.end())
return true;
else
return false;
}
void Topology::incWeight(int id)
{
if (weightMap.find(id) == weightMap.end()) {
weightMap[id] = 1;
} else {
weightMap[id] = weightMap[id] + 1;
}
}
void Topology::decWeight(int id)
{
assert(weightMap.find(id) != weightMap.end());
weightMap[id] = weightMap[id] - 1;
if (weightMap[id] == 0) {
weightMap.erase(id);
}
}
bool Topology::isFullTree(int beNum)
{
int h = (int)(::log((double)beNum) / ::log((double)fanOut));
if (h == 0)
return false;
if ((int)(::pow((double)fanOut, (double)h)) == beNum)
return true;
return false;
}
Launcher::Launcher(Topology &topo)
: topology(topo), shell(""), mode(INTERNAL), sync(false), ackID(-1)
{
char *envp = NULL;
char tmp[256] = {0};
string envStr;
envp = ::getenv("SCI_DEVICE_NAME");
if (envp) {
IPConverter converter;
string ifname = envp;
converter.getIP(ifname, true, localName);
env.set("SCI_DEVICE_NAME", envp);
} else {
::gethostname(tmp, sizeof(tmp));
localName = SysUtil::get_hostname(tmp);
}
int jobKey = gCtrlBlock->getJobKey();
env.set("SCI_JOB_KEY", jobKey);
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
env.set("SCI_WORK_DIRECTORY", ::getenv("PWD"));
} else {
env.set("SCI_WORK_DIRECTORY", ::getenv("SCI_WORK_DIRECTORY"));
}
envp = ::getenv("SCI_AGENT_PATH");
if (envp) {
env.set("SCI_AGENT_PATH", envp);
}
envp = ::getenv("SCI_LIB_PATH");
if (envp) {
env.set("SCI_LIB_PATH", envp);
envStr = envp;
}
#ifdef _SCI_LINUX
char *library_path = "LD_LIBRARY_PATH";
#else
char *library_path = "LIBPATH";
#endif
envp = ::getenv(library_path);
if (envp) {
if (envStr.length()) {
envStr += ":";
envStr += envp;
} else {
envStr = envp;
}
}
if (envStr.length()) {
env.set(library_path, envStr);
}
env.set("SCI_LOG_DIRECTORY", Log::getInstance()->getLogDir());
env.set("SCI_LOG_LEVEL", Log::getInstance()->getLogLevel());
envp = ::getenv("SCI_REMOTE_SHELL");
if (envp) {
shell = envp;
}
env.set("SCI_USE_EXTLAUNCHER", "no");
envp = ::getenv("SCI_USE_EXTLAUNCHER");
if (envp && (::strcasecmp(envp, "yes") == 0)) {
mode = REGISTER;
env.set("SCI_USE_EXTLAUNCHER", "yes");
}
env.set("SCI_SYNC_INIT", "no");
envp = ::getenv("SCI_SYNC_INIT");
if (envp && (::strcasecmp(envp, "yes") == 0)) {
char *ev = ::getenv("SCI_INIT_ACKID");
sync = true;
env.set("SCI_SYNC_INIT", "yes");
if (ev != NULL) {
ackID = atoi(ev);
env.set("SCI_INIT_ACKID", ev);
} else {
ackID = gNotifier->allocate();
env.set("SCI_INIT_ACKID", SysUtil::itoa(ackID).c_str());
}
}
env.set("SCI_ENABLE_FAILOVER", "no");
envp = ::getenv("SCI_ENABLE_FAILOVER");
if (envp && (::strcasecmp(envp, "yes") == 0)) {
env.set("SCI_ENABLE_FAILOVER", "yes");
}
env.set("SCI_REMOTE_SHELL", shell);
envp = ::getenv("SCI_DEBUG_TREE");
if (envp) {
env.set("SCI_DEBUG_TREE", envp);
}
envp = ::getenv("SCI_SEGMENT_SIZE");
if (envp) {
env.set("SCI_SEGMENT_SIZE", envp);
}
// add any tool specific environment variables
char **tool_envp = NULL;
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
tool_envp = gCtrlBlock->getEndInfo()->fe_info.beenvp;
} else if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
// In <unistd.h>, the following variable:
// extern char **environ;
// is initialized as a pointer to an array of character pointers
// to the environment strings
tool_envp = environ;
}
if (tool_envp) {
while (*tool_envp) {
// filter out SCI_ and library path.
if (::strncmp(*tool_envp, "SCI_", 4) &&
::strncmp(*tool_envp, library_path, ::strlen(library_path)))
{
char *value = ::strchr(*tool_envp, '=');
if (value) {
*value = '\0';
env.set(*tool_envp, value+1);
*value = '=';
}
}
tool_envp++;
}
}
log_debug("Launcher: env(%s)", env.getEnvString().c_str());
}
Launcher::~Launcher()
{
env.unsetAll();
}
int Launcher::launch()
{
int tree = 1;
int rc = 0;
char *envp = ::getenv("SCI_DEBUG_TREE");
if (envp) {
tree = ::atoi(envp);
}
if (tree != 2) {
// use tree 1 when default
rc = launch_tree1();
} else {
rc = launch_tree2();
}
return rc;
}
int Launcher::syncWaiting(int beNum)
{
struct {
int num;
int count;
} rc;
if (!sync || (gCtrlBlock->getMyRole() != CtrlBlock::FRONT_END))
return 0;
rc.num = beNum;
rc.count = 0;
gNotifier->freeze(ackID, &rc);
return 0;
}
int Launcher::launchBE(int beID, const char * hostname)
{
int rc;
char queueName[32];
gRoutingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, beID);
MessageQueue *queue = new MessageQueue();
::sprintf(queueName, "BE%d_inQ", beID);
queue->setName(string(queueName));
gCtrlBlock->registerQueue(queue);
gCtrlBlock->mapQueue(beID, queue);
rc = launchClient(beID, topology.bePath, hostname, mode);
if (rc == SCI_SUCCESS) {
gCtrlBlock->genSelfInfo(queue, false);
} else {
gRoutingList->removeBE(beID);
}
return rc;
}
int Launcher::launchAgent(int beID, const char * hostname)
{
int rc;
char queueName[32];
Topology *childTopo = new Topology(topology.nextAgentID--);
childTopo->fanOut = topology.fanOut;
childTopo->level = topology.level + 1;
childTopo->height = topology.height + 1;
childTopo->bePath = topology.bePath;
childTopo->agentPath = topology.agentPath;
childTopo->beMap[beID] = hostname;
gRoutingList->addBE(SCI_GROUP_ALL, childTopo->agentID, beID);
MessageQueue *queue = new MessageQueue();
::sprintf(queueName, "Agent%d_inQ", childTopo->agentID);
queue->setName(string(queueName));
gCtrlBlock->registerQueue(queue);
gCtrlBlock->mapQueue(childTopo->agentID, queue);
rc = launchClient(childTopo->agentID, childTopo->agentPath, hostname);
if (rc == SCI_SUCCESS) {
Message *topoMsg = childTopo->packMsg();
queue->produce(topoMsg);
gCtrlBlock->genSelfInfo(queue, false);
topology.incWeight(childTopo->agentID);
} else {
gRoutingList->removeBE(beID);
}
delete childTopo;
return rc;
}
int Launcher::launchClient(int ID, string &path, string host, Launcher::MODE m)
{
int rc = 0;
assert(!path.empty());
env.set("SCI_PARENT_HOSTNAME", localName);
env.set("SCI_PARENT_PORT", gCtrlBlock->getListener()->getBindPort());
env.set("SCI_CLIENT_ID", ID);
env.set("SCI_PARENT_ID", topology.agentID);
log_debug("Launch client: %s: %s", host.c_str(), path.c_str());
if (shell.empty()) {
struct passwd *pwd = ::getpwuid(::getuid());
string usernam = pwd->pw_name;
try {
Stream stream;
stream.init(host.c_str(), SCI_DAEMON_PORT);
int jobKey = gCtrlBlock->getJobKey();
stream << usernam << (int)m << jobKey << ID << path << env.getEnvString() << endl;
stream.stop();
} catch (SocketException &e) {
rc = -1;
log_error("Launcher: socket exception: %s", e.getErrMsg().c_str());
}
} else {
string cmd = shell + " " + host + " -n '" + env.getExportcmd() + path + " >&- 2>&- <&- &'";
rc = system(cmd.c_str());
}
if (rc != SCI_SUCCESS)
rc = SCI_ERR_LAUNCH_FAILED;
return rc;
}
int Launcher::launch_tree1()
{
int rc;
// this tree will have minimum agents
int totalSize = (int) topology.beMap.size();
char queueName[32];
if (totalSize <= topology.fanOut) {
// no need to generate agent
BEMap::iterator it = topology.beMap.begin();
int initID = (*it).first;
int startID = initID;
int endID = initID + totalSize - 1;
gRoutingList->initSubGroup(VALIDBACKENDIDS, startID, endID);
for ( ; it != topology.beMap.end(); ++it) {
MessageQueue *queue = new MessageQueue();
::sprintf(queueName, "BE%d_inQ", (*it).first);
queue->setName(string(queueName));
gCtrlBlock->registerQueue(queue);
gCtrlBlock->mapQueue((*it).first, queue);
rc = launchClient((*it).first, topology.bePath, (*it).second, mode);
if (rc == SCI_SUCCESS) {
gCtrlBlock->genSelfInfo(queue, false);
} else {
return rc;
}
}
return SCI_SUCCESS;
}
int stride = (int) ::ceil (::pow(double(topology.fanOut), topology.height - topology.level - 1));
int divf;
if ((totalSize % stride) == 0) {
divf = totalSize / stride;
} else {
divf = (totalSize - totalSize%stride) / stride + 1;
}
int step;
if ((totalSize % divf) == 0) {
step = totalSize / divf;
} else {
step = (totalSize - (totalSize % divf)) / divf + 1;
}
::srand((unsigned int) ::time(NULL));
BEMap::iterator it = topology.beMap.begin();
int initID = (*it).first;
for (int i = 0; i < totalSize; i += step) {
it = topology.beMap.begin();
for (int j = 0; j < i; j++) {
++it;
}
// generate an agent
Topology *childTopo = new Topology(topology.nextAgentID--);
childTopo->fanOut = topology.fanOut;
childTopo->level = topology.level + 1;
childTopo->height = topology.height;
childTopo->bePath = topology.bePath;
childTopo->agentPath = topology.agentPath;
int min = (totalSize - i) < step ? (totalSize - i) : step;
int startID = initID + i;
int endID = initID + i + min - 1;
gRoutingList->initSubGroup(childTopo->agentID, startID, endID);
string hostname;
int pos = ::rand() % min;
for (int j = 0; j < min; j++) {
if (pos == j) {
hostname = (*it).second;
}
childTopo->beMap[(*it).first] = (*it).second;
topology.incWeight(childTopo->agentID);
++it;
}
MessageQueue *queue = new MessageQueue();
::sprintf(queueName, "Agent%d_inQ", childTopo->agentID);
queue->setName(string(queueName));
gCtrlBlock->registerQueue(queue);
gCtrlBlock->mapQueue(childTopo->agentID, queue);
rc = launchClient(childTopo->agentID, topology.agentPath, hostname);
if (rc == SCI_SUCCESS) {
Message *msg = childTopo->packMsg();
queue->produce(msg);
gCtrlBlock->genSelfInfo(queue, false);
delete childTopo;
} else {
delete childTopo;
return rc;
}
}
return SCI_SUCCESS;
}
int Launcher::launch_tree2()
{
// this tree will have maximum agents but supposed to have better performance
// after evaluated by HongJun
int i, rc;
int left = 0;
int totalSize = topology.beMap.size();
int step;
int size = 0;
int out = topology.fanOut;
char queueName[32];
if (totalSize == 0)
return SCI_SUCCESS;
// launch all of my back ends
BEMap::iterator it = topology.beMap.begin();
int initID = (*it).first;
while (size < totalSize) {
left = totalSize - size;
step = (left + out - 1) / out;
out--;
if (step == 1) {
MessageQueue *queue = new MessageQueue();
::sprintf(queueName, "BE%d_inQ", (*it).first);
queue->setName(string(queueName));
gCtrlBlock->registerQueue(queue);
gCtrlBlock->mapQueue((*it).first, queue);
gRoutingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, (*it).first);
rc = launchClient((*it).first, topology.bePath, (*it).second, mode);
if (rc == SCI_SUCCESS) {
gCtrlBlock->genSelfInfo(queue, false);
} else {
return rc;
}
it++;
} else {
string hostname;
BEMap::iterator fh = it;
Topology *childTopo = new Topology(topology.nextAgentID--);
childTopo->fanOut = topology.fanOut;
childTopo->level = topology.level + 1;
childTopo->height = topology.height;
childTopo->bePath = topology.bePath;
childTopo->agentPath = topology.agentPath;
int startID = initID + size;
int endID = initID + size + step - 1;
gRoutingList->initSubGroup(childTopo->agentID, startID, endID);
for (i = 0; i < step; i++) {
childTopo->beMap[(*it).first] = (*it).second;
topology.incWeight(childTopo->agentID);
it++;
}
MessageQueue *queue = new MessageQueue();
::sprintf(queueName, "Agent%d_inQ", childTopo->agentID);
queue->setName(string(queueName));
gCtrlBlock->registerQueue(queue);
gCtrlBlock->mapQueue(childTopo->agentID, queue);
int pos = ::rand() % step;
for (i = 0; i < step; i++) {
if (pos == i) {
hostname = fh->second;
break;
}
fh++;
}
rc = launchClient(childTopo->agentID, topology.agentPath, hostname);
if (rc == SCI_SUCCESS) {
Message *msg = childTopo->packMsg();
queue->produce(msg);
gCtrlBlock->genSelfInfo(queue, false);
delete childTopo;
} else {
delete childTopo;
return rc;
}
}
size += step;
}
return SCI_SUCCESS;
}