| #ifndef _PRAGMA_COPYRIGHT_ |
| #define _PRAGMA_COPYRIGHT_ |
| #pragma comment(copyright, "%Z% %I% %W% %D% %T%\0") |
| #endif /* _PRAGMA_COPYRIGHT_ */ |
| /**************************************************************************** |
| |
| * Copyright (c) 2008, 2010 IBM Corporation. |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0s |
| * which accompanies this distribution, and is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| |
| Classes: Launcher |
| |
| Description: Runtime Launch the clients. |
| |
| Author: Tu HongJ |
| |
| History: |
| Date Who ID Description |
| -------- --- --- ----------- |
| 06/21/10 tuhongj Initial code (D153875) |
| 07/19/12 ronglli Fix tree4 external launching mode support |
| 07/19/12 ronglli Optimize tree launching |
| |
| ****************************************************************************/ |
| |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| #include <stdlib.h> |
| #include <stdio.h> |
| #include <assert.h> |
| #include <math.h> |
| #include <ctype.h> |
| #include <string.h> |
| #include <unistd.h> |
| #include <errno.h> |
| #include <sys/types.h> |
| #include <pwd.h> |
| |
| #include "log.hpp" |
| #include "tools.hpp" |
| #include "packer.hpp" |
| #include "exception.hpp" |
| #include "sshfunc.hpp" |
| #include "ipconverter.hpp" |
| |
| #include "atomic.hpp" |
| #include "launcher.hpp" |
| #include "topology.hpp" |
| #include "ctrlblock.hpp" |
| #include "initializer.hpp" |
| #include "message.hpp" |
| #include "queue.hpp" |
| #include "routinglist.hpp" |
| #include "filterlist.hpp" |
| #include "processor.hpp" |
| #include "eventntf.hpp" |
| #include "listener.hpp" |
| #include "sshfunc.hpp" |
| #include "purifierproc.hpp" |
| #include "embedagent.hpp" |
| |
| #ifdef __APPLE__ |
| extern char **environ; |
| #endif |
| |
| #define WAIT_TIMES 600 // 600s |
| |
| Launcher::Launcher(Topology &topo) |
| : topology(topo), shell(""), localName(""), scidPort(SCID_PORT), mode(INTERNAL), embedMode(false), waitTimes(WAIT_TIMES) |
| { |
| } |
| |
| Launcher::~Launcher() |
| { |
| env.unsetAll(); |
| childMap.clear(); |
| } |
| |
| int Launcher::initEnv() |
| { |
| char *envp = NULL; |
| string envStr; |
| struct servent *serv = NULL; |
| childMap.clear(); |
| |
| envp = getenv("SCI_DAEMON_NAME"); |
| if (envp != NULL) { |
| serv = getservbyname(envp, "tcp"); |
| } else { |
| serv = getservbyname(SCID_NAME, "tcp"); |
| } |
| if (serv != NULL) { |
| scidPort = ntohs(serv->s_port); |
| } |
| envp = ::getenv("SCI_DEVICE_NAME"); |
| if (envp) { |
| IPConverter converter; |
| string ifname = envp; |
| if (converter.getIP(ifname, true, localName) != 0) { |
| localName = ""; |
| log_error("Launcher: invalid device name(%s). Will use the localhost", ifname.c_str()); |
| } else { |
| env.set("SCI_DEVICE_NAME", envp); |
| } |
| } |
| if (localName == "") { |
| char tmp[256] = {0}; |
| ::gethostname(tmp, sizeof(tmp)); |
| localName = SysUtil::get_hostname(tmp); |
| if (localName == "") { |
| localName = tmp; |
| } |
| } |
| int jobKey = gCtrlBlock->getJobKey(); |
| |
| env.set("SCI_JOB_KEY", jobKey); |
| if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) { |
| env.set("SCI_WORK_DIRECTORY", ::getenv("PWD")); |
| } else { |
| env.set("SCI_WORK_DIRECTORY", ::getenv("SCI_WORK_DIRECTORY")); |
| } |
| envp = ::getenv("SCI_EMBED_AGENT"); |
| if ((envp != NULL) && (strcasecmp(envp, "yes") == 0)) { |
| embedMode = true; |
| env.set("SCI_EMBED_AGENT", envp); |
| } |
| env.set("SCI_AGENT_PATH", topology.agentPath); |
| envp = ::getenv("SCI_LIB_PATH"); |
| if (envp) { |
| env.set("SCI_LIB_PATH", envp); |
| envStr = envp; |
| } |
| #if defined(_SCI_LINUX) |
| char *library_path = "LD_LIBRARY_PATH"; |
| #elif defined(__APPLE__) |
| char *library_path = "DYLD_LIBRARY_PATH"; |
| #else |
| char *library_path = "LIBPATH"; |
| #endif |
| envp = ::getenv(library_path); |
| if (envp) { |
| if (envStr.length()) { |
| envStr += ":"; |
| envStr += envp; |
| } else { |
| envStr = envp; |
| } |
| } |
| if (envStr.length()) { |
| env.set(library_path, envStr); |
| } |
| |
| env.set("SCI_LOG_DIRECTORY", ::getenv("SCI_LOG_DIRECTORY")); |
| env.set("SCI_LOG_LEVEL", ::getenv("SCI_LOG_LEVEL")); |
| env.set("SCI_LOG_ENABLE", ::getenv("SCI_LOG_ENABLE")); |
| envp = ::getenv("SCI_WAIT_TIMES"); |
| if (envp) { |
| waitTimes = atoi(envp); |
| env.set("SCI_WAIT_TIMES", waitTimes); |
| } |
| envp = ::getenv("SCI_REMOTE_SHELL"); |
| if (envp) { |
| shell = envp; |
| env.set("SCI_REMOTE_SHELL", shell); |
| } |
| env.set("SCI_USE_EXTLAUNCHER", "no"); |
| envp = ::getenv("SCI_USE_EXTLAUNCHER"); |
| if (envp && (::strcasecmp(envp, "yes") == 0)) { |
| mode = REGISTER; |
| env.set("SCI_USE_EXTLAUNCHER", "yes"); |
| } |
| env.set("SCI_ENABLE_FAILOVER", "no"); |
| envp = ::getenv("SCI_ENABLE_FAILOVER"); |
| if (envp && (::strcasecmp(envp, "yes") == 0)) { |
| env.set("SCI_ENABLE_FAILOVER", "yes"); |
| } |
| envp = ::getenv("SCI_DEBUG_TREE"); |
| if (envp) { |
| env.set("SCI_DEBUG_TREE", envp); |
| } |
| envp = ::getenv("SCI_SEGMENT_SIZE"); |
| if (envp) { |
| env.set("SCI_SEGMENT_SIZE", envp); |
| } |
| |
| // add any tool specific environment variables |
| if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) { |
| char **tool_envp = NULL; |
| tool_envp = gCtrlBlock->getEndInfo()->fe_info.beenvp; |
| if (tool_envp) { |
| while (*tool_envp) { |
| // filter out SCI_ and library path. |
| if (::strncmp(*tool_envp, "SCI_", 4) && |
| ::strncmp(*tool_envp, library_path, ::strlen(library_path))) { |
| char *envstr = strdup(*tool_envp); |
| char *value = ::strchr(envstr, '='); |
| if (value) { |
| *value = '\0'; |
| env.set(envstr, value+1); |
| } |
| free(envstr); |
| } |
| tool_envp++; |
| } |
| } |
| } else { |
| string savedEnv = gInitializer->getEnvStr(); |
| int size = savedEnv.size(); |
| char *st = strdup(savedEnv.c_str()); |
| char *key = NULL; |
| char *value = NULL; |
| const char *delim = ";"; |
| char *savePtr1 = NULL; |
| key=strtok_r(st,delim,&savePtr1); |
| if((key!= NULL) && (key < (st + size))) { |
| if (::strncmp(key, "SCI_", 4) && |
| ::strncmp(key, library_path, ::strlen(library_path))) { |
| value = strchr(key,'='); |
| if(value != NULL) { |
| (*value) = '\0'; |
| if((value!= key)&&((value + 1) != NULL) && ((value+1) < (st + size))) { |
| if((*(value+1)) == '\0') |
| env.set(key,""); |
| else |
| env.set(key,value+1); |
| } |
| } else { |
| env.set(key,""); |
| } |
| } |
| while(key = strtok_r(NULL,delim,&savePtr1)) { |
| if (::strncmp(key, "SCI_", 4) && |
| ::strncmp(key, library_path, ::strlen(library_path))) { |
| value = strchr(key,'='); |
| if(value != NULL) { |
| (*value) = '\0'; |
| if((value != key)&&((value + 1) != NULL) && ((value+1) < (st + size))) { |
| if((*(value+1)) == '\0') |
| env.set(key,""); |
| else |
| env.set(key,value+1); |
| } |
| } else { |
| env.set(key,""); |
| } |
| } |
| } |
| } |
| free(st); |
| } |
| env.set("SCI_PARENT_HOSTNAME", localName); |
| env.set("SCI_ENABLE_LISTENER", ::getenv("SCI_ENABLE_LISTENER")); |
| env.set("SCI_PARENT_ID", topology.agentID); |
| // flow control threshold |
| env.set("SCI_FLOWCTL_THRESHOLD", gCtrlBlock->getFlowctlThreshold()); |
| |
| log_debug("Launcher: env(%s)", env.getEnvString().c_str()); |
| return SCI_SUCCESS; |
| } |
| |
| int Launcher::launch() |
| { |
| int tree = 2; |
| int rc = 0; |
| |
| char *envp = ::getenv("SCI_DEBUG_TREE"); |
| if (envp) { |
| tree = ::atoi(envp); |
| } |
| |
| try { |
| initEnv(); |
| switch (tree) { |
| case 1: |
| rc = launch_tree1(); |
| break; |
| case 2: |
| rc = launch_tree2(); |
| break; |
| case 3: |
| rc = launch_tree3(); |
| break; |
| case 4: |
| rc = launch_tree4(); |
| break; |
| default: |
| return -1; |
| } |
| if (rc != SCI_SUCCESS) { |
| return rc; |
| } |
| |
| if ((shell.empty()) && (childMap.size() > 0)) { |
| rc = startAll(); |
| if (rc != SCI_SUCCESS) { |
| return rc; |
| } |
| } |
| envp = getenv("SCI_ENABLE_LISTENER"); |
| if ((envp != NULL) && (strcasecmp(envp, "yes") == 0)) { |
| gInitializer->initListener(); |
| } |
| } catch (std::bad_alloc) { |
| log_error("Launcher: out of memory"); |
| return SCI_ERR_NO_MEM; |
| } catch (SocketException &e) { |
| log_error("Launcher: socket exception: %s", e.getErrMsg().c_str()); |
| return SCI_ERR_LAUNCH_FAILED; |
| } catch (ThreadException &e) { |
| log_error("Launcher: thread exception %d", e.getErrCode()); |
| return SCI_ERR_LAUNCH_FAILED; |
| } catch (...) { |
| log_error("Launcher: unknown exception"); |
| return SCI_ERR_LAUNCH_FAILED; |
| } |
| if ((mode == REGISTER) || !shell.empty()) { |
| int times = 0; |
| while (!topology.routingList->allRouted()) { |
| if (times >= (waitTimes * 1000000 / WAIT_INTERVAL)) |
| return SCI_ERR_LAUNCH_FAILED; |
| times++; |
| SysUtil::sleep(WAIT_INTERVAL); |
| } |
| } |
| |
| /*Moved the reader start into startRouting*/ |
| /* if (rc == SCI_SUCCESS) |
| rc = topology.routingList->startReaders(); |
| }*/ |
| |
| return rc; |
| } |
| |
| int Launcher::startAll() |
| { |
| int rc = SCI_SUCCESS; |
| CHILD_MAP::iterator it; |
| int hndl; |
| Stream *stream; |
| |
| try { |
| for (it = childMap.begin(); it != childMap.end(); ++it) { |
| struct iovec sign = {0}; |
| psec_idbuf_desc &usertok = SSHFUNC->get_token(); |
| hndl = it->first; |
| stream = it->second; |
| |
| env.set("SCI_CLIENT_ID", hndl); |
| log_debug("Launcher: start client(%d)", hndl); |
| psec_sign_data(&sign, "%s", env.getEnvString().c_str()); |
| *stream << usertok << env.getEnvString() << sign << endl; |
| psec_free_signature(&sign); |
| rc = topology.routingList->startRouting(hndl, stream); |
| if (rc != SCI_SUCCESS) { |
| childMap.clear(); |
| return rc; |
| } |
| } |
| } catch (SocketException &e) { |
| rc = SCI_ERR_LAUNCH_FAILED; |
| log_warn("Launcher: socket exception: %s", e.getErrMsg().c_str()); |
| } |
| childMap.clear(); |
| |
| return rc; |
| } |
| |
| int Launcher::launchBE(int beID, const char * hostname) |
| { |
| int rc; |
| char queueName[32]; |
| Message *flistMsg = topology.filterList->getFlistMsg(); |
| |
| initEnv(); |
| topology.routingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, beID, true); |
| topology.routingList->queryQueue(beID)->produce(flistMsg); |
| |
| rc = launchClient(beID, topology.bePath, hostname, mode); |
| if (rc != SCI_SUCCESS) { |
| topology.routingList->removeBE(beID); |
| } else { |
| if (mode == REGISTER) { |
| int times = 0; |
| while (!topology.routingList->allRouted()) { |
| if (times >= (waitTimes * 1000000 / WAIT_INTERVAL)) |
| return SCI_ERR_LAUNCH_FAILED; |
| times++; |
| SysUtil::sleep(WAIT_INTERVAL); |
| } |
| } |
| /*Moved the reader start into startRouting*/ |
| //topology.routingList->startReading(beID); |
| } |
| |
| return rc; |
| } |
| |
| int Launcher::launchAgent(int beID, const char * hostname) |
| { |
| int rc; |
| |
| initEnv(); |
| Topology *childTopo = new Topology(topology.nextAgentID--); |
| childTopo->fanOut = topology.fanOut; |
| childTopo->level = topology.level + 1; |
| childTopo->height = topology.height + 1; |
| childTopo->bePath = topology.bePath; |
| childTopo->agentPath = topology.agentPath; |
| childTopo->beMap[beID] = hostname; |
| |
| topology.routingList->addBE(SCI_GROUP_ALL, childTopo->agentID, beID, true); |
| MessageQueue *queue = topology.routingList->queryQueue(childTopo->agentID); |
| |
| rc = launchClient(childTopo->agentID, childTopo->agentPath, hostname); |
| if (rc == SCI_SUCCESS) { |
| Message *flistMsg = topology.filterList->getFlistMsg(); |
| Message *topoMsg = childTopo->packMsg(); |
| if (flistMsg != NULL) { |
| incRefCount(flistMsg->getRefCount()); |
| queue->produce(flistMsg); |
| } |
| queue->produce(topoMsg); |
| topology.incWeight(childTopo->agentID); |
| /*Moved the reader start into startRouting*/ |
| //topology.routingList->startReading(childTopo->agentID); |
| } else { |
| topology.routingList->removeBE(beID); |
| } |
| delete childTopo; |
| |
| return rc; |
| } |
| |
| int Launcher::launchClient(int ID, string &path, string host, Launcher::MODE m, bool batch, int beID) |
| { |
| int rc = 0; |
| Listener *listener = NULL; |
| assert(!path.empty()); |
| if ((m == REGISTER) || !shell.empty()) { |
| listener = gInitializer->initListener(); |
| } |
| if (listener != NULL) { |
| int port = 0; |
| while (1) { |
| port = listener->getBindPort(); |
| if (port > 0) |
| break; |
| SysUtil::sleep(WAIT_INTERVAL); |
| } |
| |
| env.set("SCI_PARENT_PORT", port); |
| } |
| env.set("SCI_CLIENT_ID", ID); |
| log_debug("Launch client: %s: %s", host.c_str(), path.c_str()); |
| |
| if (shell.empty()) { |
| string username = gCtrlBlock->getUsername(); |
| try { |
| int hndl = ID; |
| struct iovec sign = {0}; |
| int jobKey = gCtrlBlock->getJobKey(); |
| psec_idbuf_desc &usertok = SSHFUNC->get_token(); |
| Stream *stream = new Stream(); |
| SCI_connect_hndlr *conn = NULL; |
| |
| if (gCtrlBlock->getMyRole() != CtrlBlock::AGENT) |
| conn = gCtrlBlock->getEndInfo()->connect_hndlr; |
| if (conn == NULL) { |
| int cID = ID; |
| if (embedMode && (beID >= 0)) { |
| cID = beID; |
| } |
| rc = psec_sign_data(&sign, "%d%d%d%s%s", m, jobKey, cID, path.c_str(), env.getEnvString().c_str()); |
| stream->init(host.c_str(), scidPort); |
| *stream << username << usertok << sign << (int)m << jobKey << cID << path << env.getEnvString() << endl; |
| psec_free_signature(&sign); |
| } else { |
| int sockfd = conn(host.c_str()); |
| if (sockfd < 0) { |
| log_error("Launcher: invalid return sockfd(%d) of the connection handler.", sockfd); |
| return SCI_ERR_LAUNCH_FAILED; |
| } |
| stream->init(sockfd); |
| } |
| if (m == REGISTER) { |
| stream->stop(); |
| delete stream; |
| } else { |
| if (batch) { |
| childMap[hndl] = stream; |
| } else { |
| psec_sign_data(&sign, "%s", env.getEnvString().c_str()); |
| *stream << usertok << env.getEnvString() << sign << endl; |
| psec_free_signature(&sign); |
| rc = topology.routingList->startRouting(hndl, stream); |
| } |
| } |
| } catch (SocketException &e) { |
| rc = SCI_ERR_LAUNCH_FAILED; |
| log_error("Launcher: socket exception: %s", e.getErrMsg().c_str()); |
| } |
| } else { |
| if ((strcasecmp(shell.c_str(), "rsh") == 0) || (strcasecmp(shell.c_str(), "ssh") == 0)) { |
| string cmd = shell + " " + host + " -n '" + env.getExportcmd() + path + " >&- 2>&- <&- &'"; |
| rc = system(cmd.c_str()); |
| } |
| } |
| |
| return rc; |
| } |
| |
| int Launcher::launch_tree1() |
| { |
| int rc; |
| |
| // this tree will have minimum agents |
| int totalSize = (int) topology.beMap.size(); |
| char queueName[32]; |
| Message *flistMsg = topology.filterList->getFlistMsg(); |
| |
| if (totalSize <= topology.fanOut) { |
| // no need to generate agent |
| BEMap::iterator it = topology.beMap.begin(); |
| |
| int initID = (*it).first; |
| int startID = initID; |
| int endID = initID + totalSize - 1; |
| topology.routingList->initSubGroup(VALIDBACKENDIDS, startID, endID); |
| if (flistMsg != NULL) |
| flistMsg->setRefCount(totalSize + 1); |
| |
| for ( ; it != topology.beMap.end(); ++it) { |
| MessageQueue *queue = topology.routingList->queryQueue((*it).first); |
| queue->produce(flistMsg); |
| |
| rc = launchClient((*it).first, topology.bePath, (*it).second, mode, true); |
| if (rc != SCI_SUCCESS) { |
| return rc; |
| } |
| } |
| |
| return SCI_SUCCESS; |
| } |
| |
| int stride = (int) ::ceil (::pow(double(topology.fanOut), topology.height - topology.level - 1)); |
| int divf; |
| if ((totalSize % stride) == 0) { |
| divf = totalSize / stride; |
| } else { |
| divf = (totalSize - totalSize%stride) / stride + 1; |
| } |
| int step; |
| if ((totalSize % divf) == 0) { |
| step = totalSize / divf; |
| } else { |
| step = (totalSize - (totalSize % divf)) / divf + 1; |
| } |
| if (flistMsg != NULL) |
| flistMsg->setRefCount((totalSize + step - 1) / step + 1); |
| ::srand((unsigned int) ::time(NULL)); |
| BEMap::iterator it = topology.beMap.begin(); |
| int initID = (*it).first; |
| for (int i = 0; i < totalSize; i += step) { |
| it = topology.beMap.begin(); |
| for (int j = 0; j < i; j++) { |
| ++it; |
| } |
| |
| // generate an agent |
| Topology *childTopo = new Topology(topology.nextAgentID--); |
| childTopo->fanOut = topology.fanOut; |
| childTopo->level = topology.level + 1; |
| childTopo->height = topology.height; |
| childTopo->bePath = topology.bePath; |
| childTopo->agentPath = topology.agentPath; |
| |
| int min = (totalSize - i) < step ? (totalSize - i) : step; |
| |
| int startID = initID + i; |
| int endID = initID + i + min - 1; |
| topology.routingList->initSubGroup(childTopo->agentID, startID, endID); |
| |
| string hostname; |
| |
| int pos = ::rand() % min; |
| for (int j = 0; j < min; j++) { |
| if (pos == j) { |
| hostname = (*it).second; |
| } |
| childTopo->beMap[(*it).first] = (*it).second; |
| topology.incWeight(childTopo->agentID); |
| ++it; |
| } |
| |
| rc = launchClient(childTopo->agentID, topology.agentPath, hostname, INTERNAL, true); |
| if (rc == SCI_SUCCESS) { |
| Message *msg = childTopo->packMsg(); |
| MessageQueue *queue = topology.routingList->queryQueue(childTopo->agentID); |
| queue->produce(flistMsg); // before topology message |
| queue->produce(msg); |
| delete childTopo; |
| } else { |
| delete childTopo; |
| return rc; |
| } |
| } |
| |
| return SCI_SUCCESS; |
| } |
| |
| int Launcher::launch_tree2() |
| { |
| // this tree will have maximum agents but supposed to have better performance |
| // after evaluated by HongJun |
| int i, rc = SCI_SUCCESS; |
| int left = 0; |
| int totalSize = topology.beMap.size(); |
| int step; |
| int size = 0; |
| int out = topology.fanOut; |
| Message *flistMsg = topology.filterList->getFlistMsg(); |
| int ref = 0; |
| bool bottom = false; |
| |
| if (totalSize == 0) |
| return SCI_SUCCESS; |
| |
| bottom = ((totalSize + out - 1) / out) > 1 ? false : true; |
| ref = (totalSize > out) ? out : totalSize; |
| if (flistMsg != NULL) |
| flistMsg->setRefCount(ref + totalSize); // Keep it undeleted |
| // launch all of my back ends |
| BEMap::iterator it = topology.beMap.begin(); |
| int initID = it->first; |
| while (size < totalSize) { |
| BEMap::iterator fEnt = it; // first entry of each step |
| left = totalSize - size; |
| step = (left + out - 1) / out; |
| out--; |
| if (step == 1) { |
| topology.routingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, it->first, true); |
| MessageQueue *queue = topology.routingList->queryQueue(it->first); |
| queue->produce(flistMsg); |
| if ((gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) |
| && (fEnt == topology.beMap.begin())) { |
| gCtrlBlock->getPurifierProcessor()->setInQueue(queue); |
| gCtrlBlock->setMyHandle(it->first); |
| gCtrlBlock->getPurifierProcessor()->start(); |
| } else { |
| rc = launchClient(it->first, topology.bePath, it->second, mode, true); |
| if (rc != SCI_SUCCESS) { |
| topology.routingList->removeBE(it->first); |
| return rc; |
| } |
| } |
| it++; |
| // do the notify when it is back agent. |
| if ((size == (totalSize-1)) && bottom && (gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT)) { |
| int tmp_hndl = gCtrlBlock->getMyEmbedHandle(); |
| int tmp_initID = gCtrlBlock->getAgent(tmp_hndl)->getRoutingList()->getTopology()->getInitID(); |
| if ((mode == REGISTER) || !shell.empty()) { |
| int times = 0; |
| while (!gCtrlBlock->allRouted()) { |
| if (times >= (waitTimes * 1000000 / WAIT_INTERVAL)) { |
| rc = SCI_ERR_LAUNCH_FAILED; |
| *(int *)gNotifier->getRetVal(tmp_initID) = rc; |
| break; |
| } |
| times++; |
| SysUtil::sleep(WAIT_INTERVAL); |
| } |
| } |
| gNotifier->notify(tmp_initID); |
| } |
| } else { |
| int auxID = it->first; |
| string &hostname = it->second; |
| Topology *childTopo = new Topology(topology.nextAgentID--); |
| childTopo->fanOut = topology.fanOut; |
| childTopo->level = topology.level + 1; |
| childTopo->height = topology.height; |
| childTopo->bePath = topology.bePath; |
| childTopo->agentPath = topology.agentPath; |
| |
| int startID = initID + size; |
| int endID = initID + size + step - 1; |
| topology.routingList->initSubGroup(childTopo->agentID, startID, endID); |
| MessageQueue *queue = topology.routingList->queryQueue(childTopo->agentID); |
| |
| for (i = 0; i < step; i++) { |
| childTopo->beMap[it->first] = it->second; |
| topology.incWeight(childTopo->agentID); |
| it++; |
| } |
| |
| if ((gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) |
| && (fEnt == topology.beMap.begin())) { |
| EmbedAgent *beAgent = new EmbedAgent(); |
| beAgent->init(childTopo->agentID, NULL, queue, gCtrlBlock->getUpQueue()); |
| rc = beAgent->work(); |
| } else { |
| MODE m = INTERNAL; |
| if (embedMode) |
| m = mode; |
| rc = launchClient(childTopo->agentID, topology.agentPath, hostname, m, true, auxID); |
| } |
| if (rc == SCI_SUCCESS) { |
| Message *msg = childTopo->packMsg(); |
| queue->produce(flistMsg); // make the filter list loaded before topology |
| queue->produce(msg); |
| delete childTopo; |
| } else { |
| delete childTopo; |
| return rc; |
| } |
| } |
| size += step; |
| } |
| |
| return rc; |
| } |
| |
| int Launcher::launch_tree3() |
| { |
| // this tree will have maximum agents but supposed to have better performance |
| // after evaluated by HongJun |
| int i, rc = SCI_SUCCESS; |
| int left = 0; |
| int totalSize = topology.beMap.size(); |
| int step; |
| int size = 0; |
| int out = topology.fanOut; |
| Message *flistMsg = topology.filterList->getFlistMsg(); |
| int ref = 0; |
| bool shift = true; |
| bool bottom = false; |
| |
| if (totalSize == 0) |
| return SCI_SUCCESS; |
| |
| bottom = ((totalSize + out - 1) / out) > 1 ? false : true; |
| ref = (totalSize > out) ? out : totalSize; |
| if (flistMsg != NULL) |
| flistMsg->setRefCount(ref + totalSize); // Keep it undeleted |
| // launch all of my back ends |
| BEMap::iterator it = topology.beMap.begin(); |
| int initID = it->first; |
| while (size < totalSize) { |
| BEMap::iterator fEnt = it; // first entry of each step |
| left = totalSize - size; |
| step = (left + out - 1) / out; |
| out--; |
| if (step == 1) { |
| topology.routingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, it->first, true); |
| MessageQueue *queue = topology.routingList->queryQueue(it->first); |
| queue->produce(flistMsg); |
| if ((gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) |
| && (fEnt == topology.beMap.begin())) { |
| gCtrlBlock->getPurifierProcessor()->setInQueue(queue); |
| gCtrlBlock->setMyHandle(it->first); |
| gCtrlBlock->getPurifierProcessor()->start(); |
| } else { |
| rc = launchClient(it->first, topology.bePath, it->second, mode, true); |
| if (rc != SCI_SUCCESS) { |
| topology.routingList->removeBE(it->first); |
| return rc; |
| } |
| } |
| it++; |
| // do the notify when it is back agent. |
| if ((size == (totalSize-1)) && bottom && (gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT)) { |
| int tmp_hndl = gCtrlBlock->getMyEmbedHandle(); |
| int tmp_initID = gCtrlBlock->getAgent(tmp_hndl)->getRoutingList()->getTopology()->getInitID(); |
| if ((mode == REGISTER) || !shell.empty()) { |
| int times = 0; |
| while (!gCtrlBlock->allRouted()) { |
| if (times >= (waitTimes * 1000000 / WAIT_INTERVAL)) { |
| rc = SCI_ERR_LAUNCH_FAILED; |
| *(int *)gNotifier->getRetVal(tmp_initID) = rc; |
| break; |
| } |
| times++; |
| SysUtil::sleep(WAIT_INTERVAL); |
| } |
| } |
| gNotifier->notify(tmp_initID); |
| } |
| } else { |
| string &hostname = it->second; |
| if (shift) { |
| int aID = topology.agentID; |
| int tmp = aID; |
| BEMap::iterator fh = it; |
| do { |
| tmp = aID / topology.fanOut; |
| if (tmp * topology.fanOut == aID) { |
| fh++; |
| } else { |
| break; |
| } |
| aID = tmp; |
| } while (aID > 0); |
| hostname = fh->second; |
| shift = false; |
| } |
| Topology *childTopo = new Topology(topology.nextAgentID--); |
| childTopo->fanOut = topology.fanOut; |
| childTopo->level = topology.level + 1; |
| childTopo->height = topology.height; |
| childTopo->bePath = topology.bePath; |
| childTopo->agentPath = topology.agentPath; |
| |
| int startID = initID + size; |
| int endID = initID + size + step - 1; |
| topology.routingList->initSubGroup(childTopo->agentID, startID, endID); |
| MessageQueue *queue = topology.routingList->queryQueue(childTopo->agentID); |
| |
| for (i = 0; i < step; i++) { |
| childTopo->beMap[it->first] = it->second; |
| topology.incWeight(childTopo->agentID); |
| it++; |
| } |
| |
| if ((gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) |
| && (fEnt == topology.beMap.begin())) { |
| EmbedAgent *beAgent = new EmbedAgent(); |
| beAgent->init(childTopo->agentID, NULL, queue, gCtrlBlock->getUpQueue()); |
| rc = beAgent->work(); |
| } else { |
| rc = launchClient(childTopo->agentID, topology.agentPath, hostname, INTERNAL, true); |
| } |
| if (rc == SCI_SUCCESS) { |
| Message *msg = childTopo->packMsg(); |
| queue->produce(flistMsg); // make the filter list loaded before topology |
| queue->produce(msg); |
| delete childTopo; |
| } else { |
| delete childTopo; |
| return rc; |
| } |
| } |
| size += step; |
| } |
| |
| return rc; |
| } |
| |
| int Launcher::launch_tree4() |
| { |
| // this tree will have maximum agents but supposed to have better performance |
| // after evaluated by HongJun |
| int i, rc = SCI_SUCCESS; |
| int left = 0; |
| int totalSize = topology.beMap.size(); |
| int step = 1; |
| int size = 0; |
| int out = topology.fanOut; |
| Message *flistMsg = topology.filterList->getFlistMsg(); |
| int ref = 0; |
| bool shift = true; |
| |
| if (totalSize == 0) |
| return SCI_SUCCESS; |
| |
| ref = (totalSize > out) ? out : totalSize; |
| if (flistMsg != NULL) |
| flistMsg->setRefCount(ref + totalSize); // Keep it undeleted |
| // launch all of my back ends |
| BEMap::iterator it = topology.beMap.begin(); |
| int initID = it->first; |
| while (1) { |
| BEMap::iterator fEnt = it; // first entry of each step |
| if (step == 1) { |
| topology.routingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, it->first, true); |
| MessageQueue *queue = topology.routingList->queryQueue(it->first); |
| queue->produce(flistMsg); |
| if ((gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) |
| && (fEnt == topology.beMap.begin())) { |
| gCtrlBlock->getPurifierProcessor()->setInQueue(queue); |
| gCtrlBlock->setMyHandle(it->first); |
| gCtrlBlock->getPurifierProcessor()->start(); |
| } else { |
| rc = launchClient(it->first, topology.bePath, it->second, mode, true); |
| if (rc != SCI_SUCCESS) { |
| topology.routingList->removeBE(it->first); |
| return rc; |
| } |
| } |
| it++; |
| } else { |
| int auxID = it->first; |
| string &hostname = it->second; |
| Topology *childTopo = new Topology(topology.nextAgentID--); |
| childTopo->fanOut = topology.fanOut; |
| childTopo->level = topology.level + 1; |
| childTopo->height = topology.height; |
| childTopo->bePath = topology.bePath; |
| childTopo->agentPath = topology.agentPath; |
| |
| int startID = initID + size; |
| int endID = initID + size + step - 1; |
| topology.routingList->initSubGroup(childTopo->agentID, startID, endID); |
| MessageQueue *queue = topology.routingList->queryQueue(childTopo->agentID); |
| |
| for (i = 0; i < step; i++) { |
| childTopo->beMap[it->first] = it->second; |
| topology.incWeight(childTopo->agentID); |
| it++; |
| } |
| |
| if ((gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) |
| && (fEnt == topology.beMap.begin())) { |
| assert(!"should not come here"); |
| } else { |
| MODE m = INTERNAL; |
| if (embedMode) |
| m = mode; |
| rc = launchClient(childTopo->agentID, topology.agentPath, hostname, m, true, auxID); |
| } |
| if (rc == SCI_SUCCESS) { |
| Message *msg = childTopo->packMsg(); |
| queue->produce(flistMsg); // make the filter list loaded before topology |
| queue->produce(msg); |
| delete childTopo; |
| } else { |
| delete childTopo; |
| return rc; |
| } |
| } |
| size += step; |
| if (size >= totalSize) { |
| // do the notify when it is back agent. |
| if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) { |
| int tmp_hndl = gCtrlBlock->getMyEmbedHandle(); |
| int tmp_initID = gCtrlBlock->getAgent(tmp_hndl)->getRoutingList()->getTopology()->getInitID(); |
| if ((mode == REGISTER) || !shell.empty()) { |
| int times = 0; |
| while (!gCtrlBlock->allRouted()) { |
| if (times >= (waitTimes * 1000000 / WAIT_INTERVAL)) { |
| rc = SCI_ERR_LAUNCH_FAILED; |
| *(int *)gNotifier->getRetVal(tmp_initID) = rc; |
| break; |
| } |
| times++; |
| SysUtil::sleep(WAIT_INTERVAL); |
| } |
| } |
| gNotifier->notify(tmp_initID); |
| } |
| break; |
| } |
| left = totalSize - size; |
| step = (left + out - 1) / out; |
| out--; |
| } |
| |
| return rc; |
| } |
| |