blob: 78637c923e2406f806424d2f60532bd1fa7d8aea [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: EmbedAgent
Description: embedded agent in front-end back-end
Author: Tu HongJ
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 tuhongj Initial code ()
****************************************************************************/
#include "tools.hpp"
#include "embedagent.hpp"
#include "ctrlblock.hpp"
#include "writerproc.hpp"
#include "filterproc.hpp"
#include "handlerproc.hpp"
#include "routerproc.hpp"
#include "topology.hpp"
#include "message.hpp"
#include "eventntf.hpp"
#include "queue.hpp"
#include "stream.hpp"
#include "routinglist.hpp"
#include "privatedata.hpp"
#include "filterlist.hpp"
EmbedAgent::EmbedAgent()
: handle(-1), inStream(NULL), outStream(NULL), filterInQ(NULL), filterOutQ(NULL), routerInQ(NULL), filterProc(NULL), routerProc(NULL), writerProc(NULL), routingList(NULL), filterList(NULL)
{}
EmbedAgent::~EmbedAgent()
{
if (routerProc) {
routerProc->release();
delete routerProc;
}
if (filterProc) {
filterProc->release();
delete filterProc;
}
if (writerProc) {
writerProc->release();
delete writerProc;
}
if (routingList)
delete routingList;
if (filterList)
delete filterList;
}
int EmbedAgent::init(int hndl, Stream *stream, MessageQueue *inQ, MessageQueue *outQ)
{
handle = hndl;
filterList = new FilterList();
routingList = new RoutingList(hndl);
routerProc = new RouterProcessor(hndl, routingList, filterList);
filterInQ = new MessageQueue();
filterInQ->setName("filterInQ");
filterProc = new FilterProcessor(hndl, filterList);
filterProc->setInQueue(filterInQ);
if (outQ != NULL) {
filterProc->setOutQueue(outQ);
} else {
filterOutQ = new MessageQueue();
filterOutQ->setName("filterOutQ");
filterProc->setOutQueue(filterOutQ);
}
PrivateData *pDataFilter = new PrivateData(routingList, filterList, filterProc, routerProc);
filterProc->setSpecific(pDataFilter);
gCtrlBlock->setUpQueue(filterInQ);
PrivateData *pDataRouter = new PrivateData(routingList, filterList, filterProc, routerProc);
routerProc->setSpecific(pDataRouter);
routingList->setFilterProcessor(filterProc);
if (stream) {
inStream = stream;
routerProc->setInStream(stream);
writerProc = new WriterProcessor(hndl);
writerProc->setName("WriterP");
writerProc->setInQueue(filterOutQ);
writerProc->setOutStream(stream);
routerProc->setPeerProcessor(writerProc);
} else if (inQ) {
routerProc->setInQueue(inQ);
} else {
routerInQ = new MessageQueue();
routerInQ->setName("routerInQ");
routerProc->setInQueue(routerInQ);
gCtrlBlock->setRouterInQueue(routerInQ);
gCtrlBlock->setRouterProcessor(routerProc);
}
gCtrlBlock->addEmbedAgent(handle, this);
return 0;
}
RoutingList * EmbedAgent::getRoutingList()
{
return routingList;
}
MessageQueue * EmbedAgent::getRouterInQ()
{
return routerInQ;
}
MessageQueue * EmbedAgent::getUpQueue()
{
return filterOutQ;
}
int EmbedAgent::work()
{
int rc = 0;
routerProc->start();
filterProc->start();
if (writerProc)
writerProc->start();
if (gCtrlBlock->getMyRole() != CtrlBlock::BACK_AGENT) {
rc = registPrivateData();
}
return rc;
}
int EmbedAgent::syncWait()
{
int rc = 0;
gNotifier->freeze(routingList->getTopology()->getInitID(), &rc);
return rc;
}
FilterProcessor * EmbedAgent::getFilterProcessor()
{
return filterProc;
}
extern void makeKey();
int EmbedAgent::registPrivateData()
{
PrivateData *pDataMain = new PrivateData(routingList, filterList, filterProc, routerProc);
int rc = pthread_once(&(Thread::once), makeKey);
rc = pthread_setspecific(Thread::key, pDataMain);
return rc;
}
PrivateData * EmbedAgent::genPrivateData()
{
PrivateData *pData = new PrivateData(routingList, filterList, filterProc, routerProc);
return pData;
}