blob: 9bc89d1443219434d35f78ed83f56c81b7b147b6 [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: CtrlBlock
Description: Internal running information management (Note: STL does not
guarantee the safety of several readers & one writer cowork
together, and user threads can query group information at
runtime, so it's necessary to add a lock to protect these
read & write operations).
Author: Tu HongJ, Nicole Nie, Liu Wei
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 tuhongj Initial code (D153875)
11/27/10 ronglli Add SCI Version
01/16/12 ronglli Add codes to detect SOCKET_BROKEN
07/19/12 ronglli Optimize the user query
****************************************************************************/
#include "ctrlblock.hpp"
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <pwd.h>
#include <errno.h>
#include "stream.hpp"
#include "exception.hpp"
#include "group.hpp"
#include "log.hpp"
#include "tools.hpp"
#include "packer.hpp"
#include "eventntf.hpp"
#include "handlerproc.hpp"
#include "embedagent.hpp"
#include "purifierproc.hpp"
#include "topology.hpp"
#include "routinglist.hpp"
#include "privatedata.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "listener.hpp"
#include "processor.hpp"
#include "routerproc.hpp"
#include "filterproc.hpp"
#include "observer.hpp"
#include "initializer.hpp"
const long long FLOWCTL_THRESHOLD = 1024 * 1024 * 128LL;
CtrlBlock * CtrlBlock::instance = NULL;
extern SCI_msg_hndlr *gHndlr;
extern void *gParam;
CtrlBlock::CtrlBlock()
: role(INVALID)
{
char *envp = NULL;
version = SCI_VERSION;
userName = "";
flowctlState = true;
childHealthState = HEALTH;
errChildren.clear();
cnt_disable = 0;
endInfo = NULL;
routerProc = NULL;
filterProc = NULL;
purifierProc = NULL;
handlerProc = NULL;
observer = NULL;
routerInQueue = NULL;
filterInQueue = NULL;
filterOutQueue = NULL;
purifierOutQueue = NULL;
upQueue = NULL;
pollQueue = NULL;
monitorInQueue = NULL;
errorQueue = NULL;
termState = false; // enter into term state
recoverMode = 0;
waitParentInfo = false; //whether to wait for parent info updating
parentStream = NULL;
embedAgents.clear();
enableID = gNotifier->allocate();
// flow control threshold
thresHold = FLOWCTL_THRESHOLD;
envp = getenv("SCI_FLOWCTL_THRESHOLD");
if(envp != NULL) {
thresHold = ::atoll(envp);
}
envp = ::getenv("SCI_DISABLE_IPV6");
if (envp && (::strcasecmp(envp, "yes") == 0)) {
Socket::setDisableIPv6(1);
}
envp = ::getenv("SCI_CONNECT_TIMES");
if (envp != NULL) {
int cnt = ::atoi(envp);
if (cnt > 0) {
Socket::setConnTimes(cnt);
}
}
::pthread_mutex_init(&mtx, NULL);
}
CtrlBlock::~CtrlBlock()
{
instance = NULL;
::pthread_mutex_destroy(&mtx);
}
void CtrlBlock::setRecoverMode(int mo)
{
recoverMode = mo;
}
int CtrlBlock::getRecoverMode()
{
return recoverMode;
}
void CtrlBlock::setTermState(bool mo)
{
termState = mo;
}
bool CtrlBlock::getTermState()
{
return termState;
}
void CtrlBlock::setParentInfoWaitState(bool mo)
{
waitParentInfo = mo;
}
bool CtrlBlock::getParentInfoWaitState()
{
return waitParentInfo;
}
CtrlBlock::ROLE CtrlBlock::getMyRole()
{
return role;
}
void CtrlBlock::setMyRole(CtrlBlock::ROLE ro)
{
role = ro;
}
int CtrlBlock::getMyHandle()
{
return handle;
}
void CtrlBlock::setMyHandle(int hndl)
{
handle = hndl;
}
int CtrlBlock::getMyEmbedHandle()
{
return embed_handle;
}
void CtrlBlock::setMyEmbedHandle(int hndl)
{
embed_handle = hndl;
}
sci_info_t * CtrlBlock::getEndInfo()
{
return endInfo;
}
int CtrlBlock::getJobKey()
{
return jobKey;
}
void CtrlBlock::setJobKey(int key)
{
jobKey = key;
}
int CtrlBlock::initClient(ROLE ro)
{
char *envp = ::getenv("SCI_JOB_KEY");
if (envp != NULL)
jobKey = ::atoi(envp);
envp = ::getenv("SCI_CLIENT_ID");
if (envp != NULL)
handle = ::atoi(envp);
role = ro;
return 0;
}
int CtrlBlock::init(sci_info_t * info)
{
char *envp = NULL;
if (info == NULL) {
initClient(AGENT);
return SCI_SUCCESS;
}
if ((info->sci_version != 0) && (info->sci_version != version)) {
return SCI_ERR_VERSION;
}
if (info->disable_sshauth == 1) {
::setenv("SCI_ENABLE_SSHAUTH", "no", 1);
}
if (info->enable_recover == 1) {
recoverMode = 1;
}
endInfo = (sci_info_t *) ::malloc(sizeof(sci_info_t));
if (NULL == endInfo) {
return SCI_ERR_NO_MEM;
}
::memset(endInfo, 0, sizeof(sci_info_t));
::memcpy(endInfo, info, sizeof(sci_info_t));
gHndlr = info->be_info.hndlr;
gParam = info->be_info.param;
switch (info->type) {
case SCI_FRONT_END:
handle = -1;
role = FRONT_END;
envp = ::getenv("SCI_JOB_KEY");
if (envp) {
// use user's job key
jobKey = ::atoi(envp);
} else {
// generate a random job key
::srand((unsigned int) ::time(NULL));
jobKey = ::rand();
}
break;
case SCI_BACK_END:
initClient(BACK_END);
break;
default:
return SCI_ERR_INVALID_ENDTYPE;
}
return SCI_SUCCESS;
}
int CtrlBlock::numOfChildrenFds()
{
int num = 0;
RoutingList *rtList = NULL;
/*
if (purifierProc) {
while (!purifierProc->isLaunched()) {
// before join, this thread should have been launched
SysUtil::sleep(WAIT_INTERVAL);
}
} */
lock();
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
rtList = it->second->getRoutingList();
num += rtList->numOfStreams();
}
unlock();
return num;
}
int CtrlBlock::getChildrenSockfds(int *fds)
{
int pos = 0;
RoutingList *rtList = NULL;
/*
if (purifierProc) {
while (!purifierProc->isLaunched()) {
// before join, this thread should have been launched
SysUtil::sleep(WAIT_INTERVAL);
}
} */
lock();
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
rtList = it->second->getRoutingList();
pos += rtList->getStreamsSockfds(&fds[pos]);
}
unlock();
return pos;
}
bool CtrlBlock::allRouted()
{
bool flag = false;
int streams = 0;
int queues = 0;
RoutingList *rtList = NULL;
lock();
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
rtList = it->second->getRoutingList();
streams += rtList->numOfStreams();
queues += rtList->numOfQueues();
}
if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) {
flag = (queues == (streams + embedAgents.size())); // queueInfo contains the embed agent itself
} else {
flag = (queues == streams);
}
unlock();
return flag;
}
int CtrlBlock::isActiveSockfd(int fd)
{
int isSocket = 0;
RoutingList *rtList = NULL;
lock();
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
rtList = it->second->getRoutingList();
isSocket = rtList->isActiveSockfd(fd);
if (isSocket)
break;
}
Stream * s = gInitializer->getInStream();
if ((s != NULL) && (s->getSocket() == fd)) {
if ((s->isReadActive()) || (s->isWriteActive())) {
isSocket = 1;
}
}
unlock();
return isSocket;
}
bool CtrlBlock::allActive()
{
bool active = true;
RoutingList *rtList = NULL;
lock();
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
rtList = it->second->getRoutingList();
active = rtList->allActive();
if (!active)
break;
}
unlock();
return active;
}
void CtrlBlock::term()
{
gNotifier->freeze(enableID, NULL);
termState = true;
if (purifierProc) {
purifierProc->release();
delete purifierProc;
}
lock();
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
delete it->second;
}
embedAgents.clear();
errChildren.clear();
unlock();
if (handlerProc) {
handlerProc->release();
delete handlerProc;
}
clean();
}
EmbedAgent * CtrlBlock::getAgent(int hndl)
{
EmbedAgent *agent;
lock();
assert(embedAgents.find(hndl) != embedAgents.end());
agent = embedAgents[hndl];
unlock();
return agent;
}
void CtrlBlock::clean()
{
routerProc = NULL;
filterProc = NULL;
purifierProc = NULL;
routerInQueue = NULL;
filterInQueue = NULL;
filterOutQueue = NULL;
purifierOutQueue = NULL;
upQueue = NULL;
pollQueue = NULL;
monitorInQueue = NULL;
errorQueue = NULL;
parentStream = NULL;
if (observer != NULL) {
delete observer;
observer = NULL;
}
role = INVALID;
if (endInfo) {
::free(endInfo);
endInfo = NULL;
}
}
void CtrlBlock::enable()
{
}
void CtrlBlock::disable()
{
if (!isEnabled())
return;
lock();
if (getMyRole() == BACK_AGENT) {
cnt_disable++;
if (cnt_disable < (embedAgents.size() + 1)) {
unlock();
return;
}
}
unlock();
gNotifier->notify(enableID);
}
bool CtrlBlock::isEnabled()
{
return gNotifier->getState(enableID);
}
void CtrlBlock::releasePollQueue()
{
// so far, valid for polling mode only
assert(role != AGENT);
try {
if(observer != NULL) {
observer->notify();
} else {
log_error("CtrlBlock: releasePollQueue: observer is NULL");
}
if(pollQueue != NULL) {
Message *msg = new Message(Message::INVALID_POLL);
pollQueue->produce(msg);
} else {
log_error("CtrlBlock: releasePollQueue: pollQueue is NULL");
}
} catch (Exception &e) {
log_error("releasePollQueue: exception %s", e.getErrMsg());
} catch (std::bad_alloc) {
log_error("releasePollQueue: out of memory");
} catch (...) {
log_error("releasePollQueue: unknown exception");
}
}
void CtrlBlock::notifyChildHealthState(int hndl, int hState)
{
int num = 0;
int *cList = NULL;
bool found = false;
Message::Type typ = getErrMsgType(hState);
if (typ == Message::UNKNOWN)
return;
lock();
RoutingList *rtList = NULL;
AGENT_MAP::iterator it;
for (it = embedAgents.begin(); it != embedAgents.end(); it++) {
rtList = it->second->getRoutingList();
if (rtList->isSuccessorExist(hndl)) {
if (hndl < 0) {
num = rtList->numOfBEOfSuccessor(hndl);
assert(num);
cList = (int *) malloc(num * sizeof(int));
rtList->retrieveBEListOfSuccessor(hndl, cList);
} else {
num = 1;
cList = (int *) malloc(sizeof(int));
cList[0] = hndl;
}
found = true;
break;
}
}
if (!found) {
unlock();
return;
}
assert(cList != NULL);
try {
for (int i = 0; i < num; i++) {
errChildren.insert(cList[i]);
}
// if not fe, it should forward the broken msg to its parent
if (getMyRole() != FRONT_END) {
Message *msg = new Message();
Packer packer;
packer.packInt(num);
for (int i = 0; i < num; i++) {
packer.packInt(cList[i]);
}
char *bufs[1];
int sizes[1];
bufs[0] = packer.getPackedMsg();
sizes[0] = packer.getPackedMsgLen();
msg->build(SCI_FILTER_NULL, SCI_GROUP_ALL, 1, bufs, sizes, typ);
delete [] bufs[0];
getUpQueue()->produce(msg);
}
// so far, valid for polling mode only
if (getMyRole() != AGENT) {
sci_mode_t mode;
if (getMyRole() == FRONT_END)
mode = getEndInfo()->fe_info.mode;
else
mode = getEndInfo()->be_info.mode;
if (mode == SCI_POLLING) {
observer->notify();
Message *msg = new Message(typ);
pollQueue->produce(msg);
}
}
} catch (Exception &e) {
log_error("notifyChildHealthState: exception %s", e.getErrMsg());
} catch (std::bad_alloc) {
log_error("notifyChildHealthState: out of memory");
} catch (...) {
log_error("notifyChildHealthState: unknown exception");
}
unlock();
setChildHealthState(hState);
free(cList);
}
void CtrlBlock::notifyChildHealthState(Message * msg)
{
int num = 0;
int *cList = NULL;
Message::Type typ = msg->getType();
int hState = getErrState(typ);
if (hState == UNKNOWN) {
delete msg;
return;
}
lock();
// upqueue can be deleted when it is terminating
if (getTermState()) {
delete msg;
unlock();
return;
}
Packer packer(msg->getContentBuf());
num = packer.unpackInt();
cList = (int *) malloc(num * sizeof(int));
assert(cList != NULL);
try {
for (int i = 0; i < num; i++) {
cList[i] = packer.unpackInt();
errChildren.insert(cList[i]);
}
// if not fe, it should forward the broken msg to its parent
if (getMyRole() != FRONT_END) {
getUpQueue()->produce(msg);
} else {
delete msg;
}
// so far, valid for polling mode only
if (getMyRole() != AGENT) {
sci_mode_t mode;
if (getMyRole() == FRONT_END)
mode = getEndInfo()->fe_info.mode;
else
mode = getEndInfo()->be_info.mode;
if (mode == SCI_POLLING) {
observer->notify();
Message *tmpmsg = new Message(typ);
pollQueue->produce(tmpmsg);
}
}
} catch (Exception &e) {
log_error("notifyChildHealthState: exception %s", e.getErrMsg());
} catch (std::bad_alloc) {
log_error("notifyChildHealthState: out of memory");
} catch (...) {
log_error("notifyChildHealthState: unknown exception");
}
unlock();
setChildHealthState(hState);
free(cList);
}
int CtrlBlock::getErrChildren(int * num, int **list)
{
lock();
ERRORCHILDREN_LIST tmpErrChildren = errChildren;
unlock();
*num = tmpErrChildren.size();
*list = (int *) malloc(sizeof(int) * (*num));
::memset(*list, 0, sizeof(int) * (*num));
log_debug("getErrChildren: err Children: size = %d", *num);
ERRORCHILDREN_LIST::iterator it;
int i = 0;
for (it = tmpErrChildren.begin(); it != tmpErrChildren.end(); it++) {
(*list)[i] = *it;
log_debug("getErrChildren: err Children: list[%d] = %d", i, (*list)[i]);
i++;
}
return 0;
}
void CtrlBlock::setObserver(Observer *ob)
{
observer = ob;
}
extern void makeKey();
PrivateData * CtrlBlock::getPrivateData()
{
PrivateData *pData = (PrivateData *)pthread_getspecific(Thread::key);
if (!pData)
{
if (!purifierProc) {
EmbedAgent* agent = getAgent(handle);
if (!agent)
return NULL;
agent->registPrivateData();
} else {
pData = new PrivateData(purifierProc->getRoutingList(), purifierProc->getFilterList(), NULL);
int rc = pthread_once(&(Thread::once), makeKey);
rc = pthread_setspecific(Thread::key, pData);
}
pData = (PrivateData *)pthread_getspecific(Thread::key);
}
return pData;
}
Topology * CtrlBlock::getTopology()
{
PrivateData *pData = getPrivateData();
return pData->getRoutingList()->getTopology();
}
Observer * CtrlBlock::getObserver() {
return observer;
}
void CtrlBlock::setRouterInQueue(MessageQueue * queue)
{
routerInQueue = queue;
}
void CtrlBlock::setFilterInQueue(MessageQueue *queue)
{
filterInQueue = queue;
}
void CtrlBlock::setPollQueue(MessageQueue *queue)
{
pollQueue = queue;
}
void CtrlBlock::setMonitorInQueue(MessageQueue *queue)
{
monitorInQueue = queue;
}
void CtrlBlock::setErrorQueue(MessageQueue *queue)
{
errorQueue = queue;
}
MessageQueue * CtrlBlock::getRouterInQueue()
{
return routerInQueue;
}
MessageQueue * CtrlBlock::getFilterInQueue()
{
return filterInQueue;
}
MessageQueue * CtrlBlock::getPollQueue()
{
return pollQueue;
}
MessageQueue * CtrlBlock::getErrorQueue()
{
return errorQueue;
}
MessageQueue * CtrlBlock::getMonitorInQueue()
{
return monitorInQueue;
}
void CtrlBlock::setRouterProcessor(RouterProcessor *proc)
{
routerProc = proc;
}
void CtrlBlock::setFilterProcessor(FilterProcessor *proc)
{
filterProc = proc;
}
void CtrlBlock::setHandlerProcessor(HandlerProcessor *proc)
{
handlerProc = proc;
}
void CtrlBlock::setPurifierProcessor(PurifierProcessor *proc)
{
purifierProc = proc;
}
void CtrlBlock::setUpQueue(MessageQueue * queue)
{
upQueue = queue;
}
MessageQueue * CtrlBlock::getUpQueue()
{
return upQueue;
}
RouterProcessor * CtrlBlock::getRouterProcessor()
{
PrivateData *pData = getPrivateData();
return pData->getRouterProcessor();
}
FilterProcessor * CtrlBlock::getFilterProcessor()
{
PrivateData *pData = getPrivateData();
return pData->getFilterProcessor();
}
FilterList * CtrlBlock::getFilterList()
{
PrivateData *pData = getPrivateData();
return pData->getFilterList();
}
PurifierProcessor * CtrlBlock::getPurifierProcessor()
{
return purifierProc;
}
void CtrlBlock::setFlowctlThreshold(long long th)
{
thresHold = th;
}
long long CtrlBlock::getFlowctlThreshold()
{
return thresHold;
}
int CtrlBlock::getVersion()
{
return version;
}
int CtrlBlock::setUsername()
{
if (userName == "") {
int rc = 0;
long size = sysconf(_SC_GETPW_R_SIZE_MAX);
struct passwd pwd;
struct passwd *result = NULL;
char *pwdBuf = new char[size];
while(1) {
rc = getpwuid_r(::getuid(), &pwd, pwdBuf, size, &result);
if ((rc == EINTR) || (rc == EMFILE) || (rc == ENFILE)) {
SysUtil::sleep(WAIT_INTERVAL);
continue;
}
if (NULL == result) {
delete []pwdBuf;
log_error("CtrlBlock: fail to get the user info! errno = %d.", errno);
return SCI_ERR_INVALID_USER;
} else {
break;
}
}
userName = pwd.pw_name;
delete []pwdBuf;
}
return SCI_SUCCESS;
}
string & CtrlBlock::getUsername()
{
return userName;
}
RoutingList * CtrlBlock::getRoutingList()
{
PrivateData *pData = getPrivateData();
return pData->getRoutingList();
}
void CtrlBlock::addEmbedAgent(int hndl, EmbedAgent *agent)
{
lock();
embedAgents[hndl] = agent;
unlock();
}
void CtrlBlock::lock()
{
::pthread_mutex_lock(&mtx);
}
void CtrlBlock::unlock()
{
::pthread_mutex_unlock(&mtx);
}
void CtrlBlock::setFlowctlState(bool state)
{
flowctlState = state;
}
bool CtrlBlock::getFlowctlState()
{
return flowctlState;
}
void CtrlBlock::setChildHealthState(int state)
{
childHealthState = state;
}
int CtrlBlock::checkChildHealthState()
{
int rc = SCI_SUCCESS;
switch (childHealthState) {
case HEALTH:
rc = SCI_SUCCESS;
break;
case ERROR_CHILD_BROKEN:
rc = SCI_ERR_CHILD_BROKEN;
break;
case ERROR_DATA:
rc = SCI_ERR_DATA;
break;
case ERROR_THREAD:
rc = SCI_ERR_THREAD;
break;
default:
rc = SCI_ERR_THREAD;
break;
}
return rc;
}
Message::Type CtrlBlock::getErrMsgType(int hState)
{
Message::Type typ;
switch (hState) {
case HEALTH:
case UNKNOWN:
// If it is in health/unknown state, should not produce notify msg
typ = Message::UNKNOWN;
break;
case ERROR_CHILD_BROKEN:
typ = Message::SOCKET_BROKEN;
break;
case ERROR_DATA:
typ = Message::ERROR_DATA;
break;
case ERROR_THREAD:
typ = Message::ERROR_THREAD;
break;
default:
typ = Message::ERROR_THREAD;
break;
}
return typ;
}
int CtrlBlock::getErrState(Message::Type typ)
{
int hState;
switch (typ) {
case Message::SOCKET_BROKEN:
hState = ERROR_CHILD_BROKEN;
break;
case Message::ERROR_DATA:
hState = ERROR_DATA;
break;
case Message::ERROR_THREAD:
hState = ERROR_THREAD;
break;
default:
// If it is incorrect msg type
hState = UNKNOWN;
break;
}
return hState;
}