blob: 7e66eacd2bc9ef13cc069d56a899ada261ffe997 [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: Initializer
Description: Prepare the environment when startup, which includes:
1) Processor threads
2) Message queue
3) Others like environment variables
Author: Nicole Nie
History:
Date Who ID Description
-------- --- --- -----------
02/10/09 nieyy Initial code (D153875)
****************************************************************************/
#include "initializer.hpp"
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <pwd.h>
#include "sci.h"
#include "log.hpp"
#include "socket.hpp"
#include "stream.hpp"
#include "exception.hpp"
#include "ctrlblock.hpp"
#include "routinglist.hpp"
#include "statemachine.hpp"
#include "topology.hpp"
#include "queue.hpp"
#include "message.hpp"
#include "readerproc.hpp"
#include "writerproc.hpp"
#include "filterproc.hpp"
#include "handlerproc.hpp"
#include "routerproc.hpp"
#include "purifierproc.hpp"
#include "observer.hpp"
#include "listener.hpp"
#include "parent.hpp"
#include "errdetector.hpp"
#include "errhandler.hpp"
#include "errinjector.hpp"
#include "eventntf.hpp"
#include "allocator.hpp"
#define SCI_DAEMON_PORT 6688
Initializer* Initializer::instance = NULL;
Initializer::Initializer()
{
}
Initializer::~Initializer()
{
instance = NULL;
}
int Initializer::init(int hndl)
{
int rc = SCI_SUCCESS;
int level = Log::INFORMATION;
char dir[MAX_PATH_LEN] = "/opt/sci/log";
char *envp = ::getenv("SCI_LOG_DIRECTORY");
if (envp != NULL) {
::strncpy(dir, envp, sizeof(dir));
}
envp = ::getenv("SCI_LOG_LEVEL");
if (envp != NULL)
level = ::atoi(envp);
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
Log::getInstance()->init(dir, "fe.log", level);
log_debug("I am a front end, my handle is %d", hndl);
} else if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
Log::getInstance()->init(dir, "scia.log", level);
log_debug("I am an agent, my handle is %d", hndl);
} else {
Log::getInstance()->init(dir, "be.log", level);
log_debug("I am a back end, my handle is %d", hndl);
}
try {
if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
initListener();
rc = initFE(hndl);
} else if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
initListener();
rc = initAgent(hndl);
} else {
rc = initBE(hndl);
}
} catch (Exception &e) {
log_error("Initializer: exception %s", e.getErrMsg());
gStateMachine->parse(StateMachine::FATAL_EXCEPTION);
return SCI_ERR_INITIALIZE_FAILED;
} catch (ThreadException &e) {
log_error("Initializer: thread exception %d", e.getErrCode());
gStateMachine->parse(StateMachine::FATAL_EXCEPTION);
return SCI_ERR_INITIALIZE_FAILED;
} catch (SocketException &e) {
log_error("Initializer: socket exception: %s", e.getErrMsg().c_str());
gStateMachine->parse(StateMachine::FATAL_EXCEPTION);
return SCI_ERR_INITIALIZE_FAILED;
} catch (std::bad_alloc) {
log_error("Initializer: out of memory");
gStateMachine->parse(StateMachine::FATAL_EXCEPTION);
return SCI_ERR_INITIALIZE_FAILED;
} catch (...) {
log_error("Initializer: unknown exception");
gStateMachine->parse(StateMachine::FATAL_EXCEPTION);
return SCI_ERR_INITIALIZE_FAILED;
}
return rc;
}
void Initializer::recoverAgent(Stream * stream)
{
assert(stream);
int hndl = gCtrlBlock->getMyHandle();
ReaderProcessor *reader = new ReaderProcessor(hndl);
reader->setName("ReaderP");
reader->setInStream(stream);
reader->setOutQueue(gCtrlBlock->getRouterInQueue());
reader->setOutErrorQueue(gCtrlBlock->getErrorQueue());
WriterProcessor *writer = new WriterProcessor(hndl);
writer->setName("WriterP");
writer->setInQueue(gCtrlBlock->getFilterOutQueue());
writer->setOutStream(stream);
// writer is a peer processor of reader
reader->setPeerProcessor(writer);
gCtrlBlock->registerProcessor(reader);
gCtrlBlock->registerProcessor(writer);
gRoutingList->propagateGroupInfo();
reader->start();
writer->start();
}
void Initializer::recoverBE(Stream * stream)
{
assert(stream);
int hndl = gCtrlBlock->getMyHandle();
WriterProcessor *writer = new WriterProcessor(hndl);
writer->setInQueue(gCtrlBlock->getUpQueue());
writer->setOutStream(stream);
PurifierProcessor *purifier = new PurifierProcessor(hndl);
purifier->setInStream(stream);
purifier->setOutQueue(gCtrlBlock->getPurifierOutQueue());
// writer is a peer processor of purifier
purifier->setPeerProcessor(writer);
if (gCtrlBlock->getEndInfo()->be_info.mode == SCI_POLLING) {
// interrupt mode
purifier->setObserver(gCtrlBlock->getObserver());
}
gCtrlBlock->registerProcessor(writer);
gCtrlBlock->registerProcessor(purifier);
gRoutingList->propagateGroupInfo();
writer->start();
purifier->start();
}
int Initializer::initFE(int hndl)
{
char *envp = NULL;
Topology *topo = new Topology(hndl);
gCtrlBlock->setTopology(topo);
int rc = topo->init();
if (rc != SCI_SUCCESS)
return rc;
gAllocator->reset();
MessageQueue *routerInQ = new MessageQueue();
routerInQ->setName("routerInQ");
gCtrlBlock->registerQueue(routerInQ);
gCtrlBlock->setRouterInQueue(routerInQ);
RouterProcessor *router = new RouterProcessor();
gCtrlBlock->setRouterProcessor(router);
gCtrlBlock->registerProcessor(router);
router->setInQueue(routerInQ);
MessageQueue *inq = new MessageQueue();
inq->setName("filterInQ");
gCtrlBlock->registerQueue(inq);
gCtrlBlock->setFilterInQueue(inq);
MessageQueue *outq = new MessageQueue();
outq->setName("filterOutQ");
gCtrlBlock->registerQueue(outq);
gCtrlBlock->setFilterOutQueue(outq);
FilterProcessor *filter = new FilterProcessor();
gCtrlBlock->registerProcessor(filter);
gCtrlBlock->setFilterProcessor(filter);
filter->setInQueue(inq);
filter->setOutQueue(outq);
ErrorDetector *errDetector = NULL;
ErrorHandler *errHandler = NULL;
ErrorInjector *errInjector = NULL;
envp = ::getenv("SCI_ENABLE_FAILOVER");
if (envp != NULL) {
if (::strcmp(envp, "yes") == 0) {
MessageQueue *errInQ = new MessageQueue();
errInQ->setName("errInQ");
gCtrlBlock->registerQueue(errInQ);
gCtrlBlock->setErrorQueue(errInQ);
errDetector = new ErrorDetector(hndl);
gCtrlBlock->registerProcessor(errDetector);
errDetector->setInQueue(errInQ);
if (gCtrlBlock->getEndInfo()->fe_info.err_hndlr != NULL) {
MessageQueue *errOutQ = new MessageQueue();
errOutQ->setName("errOutQ");
gCtrlBlock->registerQueue(errOutQ);
errDetector->setOutQueue(errOutQ);
errHandler = new ErrorHandler(hndl);
gCtrlBlock->registerProcessor(errHandler);
errHandler->setInQueue(errOutQ);
}
// see if we have error injection thread
envp = ::getenv("SCI_DEBUG_USE_INJECTOR");
if (envp != NULL) {
if (::strcasecmp(envp, "yes") == 0) {
errInjector = new ErrorInjector();
gCtrlBlock->setErrorInjector(errInjector);
errInjector->setInjOutQueue(errInQ);
}
}
}
}
HandlerProcessor *handler = NULL;
if (gCtrlBlock->getEndInfo()->fe_info.mode == SCI_INTERRUPT) {
// interrupt mode
handler = new HandlerProcessor();
gCtrlBlock->registerProcessor(handler);
handler->setInQueue(outq);
} else {
// polling mode
Observer *ob = new Observer();
gCtrlBlock->setObserver(ob);
gCtrlBlock->setPollQueue(outq);
filter->setObserver(ob);
}
gStateMachine->parse(StateMachine::DATASTRUC_CREATED);
router->start();
filter->start();
if (errDetector) {
errDetector->start();
}
if (errHandler) {
errHandler->start();
}
if (errInjector) {
errInjector->start();
}
if (handler) {
handler->start();
}
int msgID = gNotifier->allocate();
Message *topoMsg = topo->packMsg();
topoMsg->setID(msgID);
routerInQ->produce(topoMsg);
gNotifier->freeze(msgID, &rc);
return rc;
}
int Initializer::initAgent(int hndl)
{
string nodeAddr;
int port = -1;
// get hostname and port no from environment variable.
char *envp = ::getenv("SCI_WORK_DIRECTORY");
if (envp != NULL) {
::chdir(envp);
log_debug("Change working directory to %s", envp);
}
envp = ::getenv("SCI_PARENT_HOSTNAME");
if (envp != NULL) {
nodeAddr = envp;
}
envp = ::getenv("SCI_PARENT_PORT");
if (envp != NULL) {
port = ::atoi(envp);
}
log_debug("My parent host is %s, parent port id %d, my ID is %d", nodeAddr.c_str(), port, hndl);
Stream *stream = new Stream();
stream->init(nodeAddr.c_str(), port);
*stream << gCtrlBlock->getJobKey() << hndl << endl;
gCtrlBlock->registerStream(stream);
gCtrlBlock->setParentStream(stream);
gStateMachine->parse(StateMachine::PARENT_CONNECTED);
ErrorDetector *errDetector = NULL;
// err detector need to be created before relay processor
envp = ::getenv("SCI_ENABLE_FAILOVER");
if (envp != NULL) {
if (::strcmp(envp, "yes") == 0) {
MessageQueue *errInQ = new MessageQueue();
errInQ->setName("errInQ");
gCtrlBlock->registerQueue(errInQ);
gCtrlBlock->setErrorQueue(errInQ);
errDetector = new ErrorDetector(hndl);
gCtrlBlock->registerProcessor(errDetector);
errDetector->setInQueue(errInQ);
}
}
MessageQueue *routerInQ = new MessageQueue();
routerInQ->setName("routerInQ");
gCtrlBlock->registerQueue(routerInQ);
gCtrlBlock->setRouterInQueue(routerInQ);
ReaderProcessor *reader = new ReaderProcessor(hndl);
reader->setName("ReaderP");
gCtrlBlock->registerProcessor(reader);
reader->setInStream(stream);
reader->setOutQueue(routerInQ);
reader->setOutErrorQueue(gCtrlBlock->getErrorQueue());
RouterProcessor *router = new RouterProcessor();
gCtrlBlock->registerProcessor(router);
gCtrlBlock->setRouterProcessor(router);
router->setInQueue(routerInQ);
MessageQueue *filterInQ = new MessageQueue();
filterInQ->setName("filterInQ");
gCtrlBlock->setFilterInQueue(filterInQ);
gCtrlBlock->registerQueue(filterInQ);
MessageQueue *filterOutQ = new MessageQueue();
filterOutQ->setName("filterOutQ");
gCtrlBlock->setFilterOutQueue(filterOutQ);
gCtrlBlock->registerQueue(filterOutQ);
WriterProcessor *writer = new WriterProcessor(hndl);
writer->setName("WriterP");
gCtrlBlock->registerProcessor(writer);
writer->setInQueue(filterOutQ);
writer->setOutStream(stream);
// writer is a peer processor of reader
reader->setPeerProcessor(writer);
FilterProcessor *filter = new FilterProcessor();
gCtrlBlock->registerProcessor(filter);
gCtrlBlock->setFilterProcessor(filter);
filter->setInQueue(filterInQ);
filter->setOutQueue(filterOutQ);
gStateMachine->parse(StateMachine::DATASTRUC_CREATED);
reader->start();
writer->start();
router->start();
filter->start();
if (errDetector) {
errDetector->start();
}
return SCI_SUCCESS;
}
int Initializer::initBE(int hndl)
{
char *envp = ::getenv("SCI_USE_EXTLAUNCHER");
if (envp != NULL) {
if (::strcasecmp(envp, "yes") == 0) {
initExtBE(hndl);
}
}
string nodeAddr;
int port = -1;
// get hostname and port no from environment variable.
envp = ::getenv("SCI_WORK_DIRECTORY");
if (envp != NULL) {
chdir(envp);
log_debug("Change working directory to %s", envp);
}
envp = ::getenv("SCI_PARENT_HOSTNAME");
if (envp != NULL) {
nodeAddr = envp;
}
envp = ::getenv("SCI_PARENT_PORT");
if (envp != NULL) {
port = ::atoi(envp);
}
log_debug("My parent host is %s, parent port id %d, my ID is %d", nodeAddr.c_str(), port, hndl);
Stream *stream = new Stream();
stream->init(nodeAddr.c_str(), port);
*stream << gCtrlBlock->getJobKey() << hndl << endl;
gCtrlBlock->registerStream(stream);
gCtrlBlock->setParentStream(stream);
gStateMachine->parse(StateMachine::PARENT_CONNECTED);
ErrorDetector *errDetector = NULL;
ErrorHandler *errHandler = NULL;
// err detector need to be created before purifier processor
envp = ::getenv("SCI_ENABLE_FAILOVER");
if (envp != NULL) {
if (::strcmp(envp, "yes") == 0) {
MessageQueue *errInQ = new MessageQueue();
errInQ->setName("errInQ");
gCtrlBlock->registerQueue(errInQ);
gCtrlBlock->setErrorQueue(errInQ);
errDetector = new ErrorDetector(hndl);
gCtrlBlock->registerProcessor(errDetector);
errDetector->setInQueue(errInQ);
if (gCtrlBlock->getEndInfo()->be_info.err_hndlr != NULL) {
MessageQueue *errOutQ = new MessageQueue();
errOutQ->setName("errOutQ");
gCtrlBlock->registerQueue(errOutQ);
errDetector->setOutQueue(errOutQ);
errHandler = new ErrorHandler(hndl);
gCtrlBlock->registerProcessor(errHandler);
errHandler->setInQueue(errOutQ);
}
}
}
MessageQueue *userQ = new MessageQueue();
userQ->setName("userQ");
gCtrlBlock->registerQueue(userQ);
gCtrlBlock->setUpQueue(userQ);
envp = ::getenv("SCI_SYNC_INIT");
if (envp && (strcasecmp(envp, "yes") == 0)) {
int msgID = atoi(::getenv("SCI_INIT_ACKID"));
Message *msg = new Message();
msg->build(SCI_FILTER_NULL, hndl, 0, NULL, NULL, Message::INIT_ACK, msgID);
userQ->produce(msg);
}
MessageQueue *sysQ = new MessageQueue();
sysQ->setName("sysQ");
gCtrlBlock->setPurifierOutQueue(sysQ);
gCtrlBlock->registerQueue(sysQ);
WriterProcessor *writer = new WriterProcessor(hndl);
gCtrlBlock->registerProcessor(writer);
writer->setInQueue(userQ);
writer->setOutStream(stream);
PurifierProcessor *purifier = new PurifierProcessor(hndl);
gCtrlBlock->registerProcessor(purifier);
purifier->setInStream(stream);
purifier->setOutQueue(sysQ);
purifier->setOutErrorQueue(gCtrlBlock->getErrorQueue());
// writer is a peer processor of purifier
purifier->setPeerProcessor(writer);
HandlerProcessor *handler = NULL;
if (gCtrlBlock->getEndInfo()->be_info.mode == SCI_INTERRUPT) {
// interrupt mode
handler = new HandlerProcessor();
gCtrlBlock->registerProcessor(handler);
handler->setInQueue(sysQ);
} else {
// polling mode
Observer *ob = new Observer();
gCtrlBlock->setObserver(ob);
gCtrlBlock->setPollQueue(sysQ);
purifier->setObserver(ob);
}
gRoutingList->addBE(SCI_GROUP_ALL, VALIDBACKENDIDS, hndl);
gStateMachine->parse(StateMachine::DATASTRUC_CREATED);
writer->start();
purifier->start();
if (errDetector) {
errDetector->start();
}
if (errHandler) {
errHandler->start();
}
if (handler) {
handler->start();
}
return SCI_SUCCESS;
}
void Initializer::initExtBE(int hndl)
{
string envStr;
char hostname[256];
Stream stream;
struct passwd *pwd = ::getpwuid(::getuid());
string username = pwd->pw_name;
::gethostname(hostname, sizeof(hostname));
stream.init(hostname, SCI_DAEMON_PORT);
stream << username << (int) Launcher::REQUEST << gCtrlBlock->getJobKey() << hndl << endl;
stream >> envStr >> endl;
stream.stop();
string key, val;
char *st = (char *) envStr.c_str();
char *p = st + envStr.size();
while (p > st) {
p--;
if ((*p) == '=') {
*p = '\0';
val = (p+1);
} else if ((*p) == ';') {
*p = '\0';
key = (p+1);
::setenv(key.c_str(), val.c_str(), 1);
}
}
}
void Initializer::initListener()
{
Listener *listener = new Listener(gCtrlBlock->getMyHandle());
gCtrlBlock->setListener(listener);
listener->init();
listener->start();
}