#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************

* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html

 Classes: RoutingList

 Description: Provide routing services for all threads.
   
 Author: Nicole Nie, Liu Wei, Tu HongJ

 History:
   Date     Who ID    Description
   -------- --- ---   -----------
   05/08/09 nieyy        Initial code (D156654)

****************************************************************************/

#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>

#include <vector>

#include "log.hpp"
#include "packer.hpp"
#include "group.hpp"
#include "tools.hpp"
#include "exception.hpp"
#include "stream.hpp"
#include "sshfunc.hpp"

#include "routinglist.hpp"
#include "eventntf.hpp"
#include "initializer.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "ctrlblock.hpp"
#include "filterproc.hpp"
#include "readerproc.hpp"
#include "topology.hpp"
#include "writerproc.hpp"
#include "eventntf.hpp"
#include "dgroup.hpp"


using namespace std;

const int MAX_SUCCESSOR_NUM = 1024;
const int MAX_SEGMENT_SIZE = 11680;
const int MIN_SEGMENT_SIZE = 1440;  // 1500 - 40 - 20 (ethernet MTU - tcp/ip header - message header)

RoutingList::RoutingList(int hndl)
    : handle(hndl), maxSegmentSize(MAX_SEGMENT_SIZE), filterProc(NULL), myDistriGroup(NULL), topology(NULL)
{
    char *envp = ::getenv("SCI_SEGMENT_SIZE");
    if (envp != NULL) {
        maxSegmentSize = atoi(envp);
        maxSegmentSize = maxSegmentSize > MIN_SEGMENT_SIZE ? maxSegmentSize : MIN_SEGMENT_SIZE;
    }

    if (handle == -1) {
        // this is a front end, not parent
        myDistriGroup = new DistributedGroup(0);
    } else {
        int pid = -1;
        envp = ::getenv("SCI_PARENT_ID");
        if (envp) {
            pid = ::atoi(envp);
        } else {
            throw Exception(Exception::INVALID_LAUNCH);
        }
        myDistriGroup = new DistributedGroup(pid);
    }

    if (gCtrlBlock->getMyRole() != CtrlBlock::BACK_END) {
        topology = new Topology(0); // 0 is an impossible agent ID so it will be changed when it receives a real topology
    }
    successorList = new int[MAX_SUCCESSOR_NUM];
    queueInfo.clear();
    routers.clear();
    ::pthread_mutex_init(&mtx, NULL); 
}

RoutingList::~RoutingList()
{  
    delete myDistriGroup;
    delete [] successorList;
    ::pthread_mutex_destroy(&mtx);
}

void RoutingList::parseCmd(Message *msg)
{
    bool notify = false;
    int rc = SCI_SUCCESS;
    if (msg->getType() == Message::GROUP_CREATE) {
        Packer packer(msg->getContentBuf());

        int num_bes = packer.unpackInt();
        int be_list[num_bes];
        for (int i=0; i<num_bes; i++) {
            be_list[i] = packer.unpackInt();
        }

        myDistriGroup->create(num_bes, be_list, msg->getGroup());
        bcast(msg->getGroup(), msg);

        if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
            notify = true;
        }
    } else if (msg->getType() == Message::GROUP_FREE) {
        sci_group_t group = msg->getGroup();
        
        bcast(group, msg);
        myDistriGroup->remove(group);

        if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
            notify = true;
        }
    } else if (msg->getType() == Message::GROUP_OPERATE) {
        Packer packer(msg->getContentBuf());

        sci_op_t op = (sci_op_t) packer.unpackInt();
        sci_group_t group1 = (sci_group_t) packer.unpackInt();
        sci_group_t group2 = (sci_group_t) packer.unpackInt();

        rc = myDistriGroup->operate(group1, group2, op, msg->getGroup());
        if (rc == SCI_SUCCESS) {
            bcast(msg->getGroup(), msg);
        }

        if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
            notify = true;
        }
    } else if (msg->getType() == Message::GROUP_OPERATE_EXT) {
        Packer packer(msg->getContentBuf());

        sci_op_t op = (sci_op_t) packer.unpackInt();
        sci_group_t group = (sci_group_t) packer.unpackInt();
        int num_bes = packer.unpackInt();
        int be_list[num_bes];
        for (int i=0; i<num_bes; i++) {
            be_list[i] = packer.unpackInt();
        }

        rc = myDistriGroup->operateExt(group, num_bes, be_list, op, msg->getGroup());
        if (rc == SCI_SUCCESS) {
            bcast(msg->getGroup(), msg);
        }

        if (gCtrlBlock->getMyRole() == CtrlBlock::FRONT_END) {
            notify = true;
        }
    } else if (msg->getType() == Message::GROUP_DROP) {
        myDistriGroup->dropSuccessor(msg->getID());
    } else if (msg->getType() == Message::GROUP_MERGE) {
        DistributedGroup subDistriGroup(-1);
        subDistriGroup.unpackMsg(*msg);

        if (subDistriGroup.getPID() == handle) {
            // if this message is from my son
            myDistriGroup->merge(msg->getID(), subDistriGroup, false);
        } else if (isSuccessorExist(subDistriGroup.getPID())){
            // if this message is from my grandson
            myDistriGroup->merge(msg->getID(), subDistriGroup, false);
        } else {
            // if this message is from my nephew
            myDistriGroup->merge(msg->getID(), subDistriGroup, true);

            // now update its parent id to me
            subDistriGroup.setPID(handle);

            // repack a message and send to my parent
            Message *newmsg = subDistriGroup.packMsg();
            filterProc->getOutQueue()->produce(newmsg);
        }
    } else {
        assert(!"should never be here");
    }

    if (notify) {
        void *ret = gNotifier->getRetVal(msg->getID());
        *((int *) ret) = rc;
        gNotifier->notify(msg->getID());
    }
}

void RoutingList::propagateGroupInfo()
{
    // propgate my group information to my parent
    Message *msg = myDistriGroup->packMsg();
    if (gCtrlBlock->getMyRole() == CtrlBlock::AGENT) {
        filterProc->getOutQueue()->produce(msg);
    } else if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_END) {
        gCtrlBlock->getUpQueue()->produce(msg);
    } else {
        assert(!"should not be here");
    }
}

int RoutingList::getSegments(Message *msg, Message ***segments, int ref)
{
    int i = 0;
    int segnum = (msg->getContentLen() + maxSegmentSize - 1) / maxSegmentSize + 1;
    int size = 0;
    char *ptr = msg->getContentBuf();
    sci_group_t gid = msg->getGroup();
    Message::Type typ = msg->getType();
    int mid = msg->getID();
    int mfid = msg->getFilterID();
    int hfid = mfid;
    int mlen = msg->getContentLen();
    *segments = (Message **)::malloc(segnum * sizeof(Message *));
    Message **segs = *segments;

    if ((mfid != SCI_FILTER_NULL) || (typ != Message::COMMAND)) {
        hfid = SCI_JOIN_SEGMENT;
    }
    ::memset(segs, 0, segnum * sizeof(Message *));
    segs[0] = new Message();
    segs[0]->build(hfid, gid, 0, NULL, NULL, Message::SEGMENT, segnum);
    segs[0]->setRefCount(ref);

    for (i = 1; i < segnum; i++) {
        segs[i] = new Message();
        size = (i < (segnum - 1)) ? maxSegmentSize : ((mlen - 1) % maxSegmentSize + 1);
        segs[i]->build(mfid, gid, 1, &ptr, &size, typ, mid);
        segs[i]->setRefCount(ref);
        ptr += size;
    }

    return segnum;
}

int RoutingList::bcast(sci_group_t group, Message *msg)
{
    if (group > SCI_GROUP_ALL) {
        int hndl = querySuccessorId((int) group);
        if (hndl == INVLIDSUCCESSORID) {
            return SCI_ERR_GROUP_NOTFOUND;
        } else if (hndl == VALIDBACKENDIDS) {
            ucast((int)group, msg);
        } else {
            ucast(hndl, msg);
        }
        return SCI_SUCCESS;
    }
    
    if (!isGroupExist(group)) {
        return SCI_ERR_GROUP_NOTFOUND;
    }

    splitBcast(group, msg);
    
    return SCI_SUCCESS;
}

void RoutingList::splitBcast(sci_group_t group, Message *msg)
{
    int numSor = numOfSuccessor(group);
    retrieveSuccessorList(group, successorList);
    mcast(msg, successorList, numSor);
}

void RoutingList::mcast(Message *msg, int *sorList, int num)
{
    int i = 0;

    if (msg->getContentLen() <= maxSegmentSize) {
        msg->setRefCount(msg->getRefCount() + num);
        for (i = 0; i < num; i++) {
            queryQueue(sorList[i])->produce(msg);
        }
        return;
    }

    Message **segments;
    int segnum = getSegments(msg, &segments, num);
    for (i = 0; i < num; i++) {
        queryQueue(sorList[i])->multiProduce(segments, segnum);
    }
    ::free(segments);
}

void RoutingList::ucast(int successor_id, Message *msg, int refInc)
{
    log_debug("Processor Router: send msg to successor %d", successor_id);
    mcast(msg, &successor_id, refInc);

    return;
}

void RoutingList::initSubGroup(int successor_id, int start_be_id, int end_be_id)
{
    char qName[64] = {0};
    MessageQueue *queue = NULL;

    if (successor_id != VALIDBACKENDIDS) {
        queue = new MessageQueue();
        ::sprintf(qName, "Agent%d_inQ", successor_id);
        queue->setName(qName);
        mapQueue(successor_id, queue);
    } else {
        int i = 0;
        for (i = start_be_id; i <= end_be_id; i++) {
            queue = new MessageQueue();
            ::sprintf(qName, "BE%d_inQ", i);
            queue->setName(qName);
            mapQueue(i, queue);
        }
    }

    myDistriGroup->initSubGroup(successor_id, start_be_id, end_be_id);
}

int RoutingList::startReading(int hndl)
{
    ROUTING_MAP::iterator it = routers.find(hndl);
    assert(it != routers.end());
    ReaderProcessor *reader = it->second.processor->getPeerProcessor();
    reader->start();

    return 0;
}

int RoutingList::startReaders()
{
    ReaderProcessor *reader = NULL;
    ROUTING_MAP::iterator pit;
   
    for (pit = routers.begin(); pit != routers.end(); ++pit) {
        reader = pit->second.processor->getPeerProcessor();
        while (reader == NULL) {
            SysUtil::sleep(1000);
            reader = pit->second.processor->getPeerProcessor();
        }
        reader->start();
    }

    return 0;
}

int RoutingList::numOfStreams()
{
    return routers.size();
}

int RoutingList::getStreamsSockfds(int *fds)
{
    int i = 0;
    ROUTING_MAP::iterator it;

    for (it = routers.begin(); it != routers.end(); ++it) {
        fds[i] = it->second.stream->getSocket();
        i++;
    }

    return i;
}

int RoutingList::startRouting(int hndl, Stream *stream)
{
    char name[64] = {0};
    MessageQueue *inQ = queryQueue(hndl);
    while (inQ == NULL) {
        SysUtil::sleep(1000);
        inQ = queryQueue(hndl);
    }

    routers[hndl].stream = stream;
    ReaderProcessor *reader = new ReaderProcessor(hndl);
    reader->setInStream(stream);
    reader->setOutQueue(filterProc->getInQueue());
    ::sprintf(name, "Reader%d", hndl);
    reader->setName(name);

    WriterProcessor *writer = new WriterProcessor(hndl);
    writer->setInQueue(inQ);
    writer->setOutStream(stream);
    ::sprintf(name, "Writer%d", hndl);
    writer->setName(name);

    // reader is a peer processor of writer
    writer->setPeerProcessor(reader);
    routers[hndl].processor = writer;

    writer->start(); 

    return 0;
}

int RoutingList::stopRouting()
{
    // waiting for all processor threads terminate
    ROUTING_MAP::iterator pit;
    for (pit = routers.begin(); pit != routers.end(); ++pit) {
        pit->second.processor->release();
        delete pit->second.processor;
    }

    routers.clear();
    queueInfo.clear();

    return 0;
}

bool RoutingList::allRouted()
{
    if (gCtrlBlock->getMyRole() == CtrlBlock::BACK_AGENT) {
        char *envp = getenv("SCI_EMBED_AGENT");
        if ((envp != NULL) && (strcasecmp(envp, "yes") == 0)) {
            return (queueInfo.size() == (routers.size() + 1));  // queueInfo contains itself
        }
    }

    return (queueInfo.size() == routers.size()); 
}

void RoutingList::addBE(sci_group_t group, int successor_id, int be_id, bool init)
{
    if (init) {
        char qName[64] = {0};
        int qID = 0;
        MessageQueue *queue = new MessageQueue();

        if (successor_id == VALIDBACKENDIDS) {
            qID = be_id;
            ::sprintf(qName, "BE%d_inQ", qID);
        } else {
            qID = successor_id;
            ::sprintf(qName, "Agent%d_inQ", qID);
        }
        queue->setName(qName);
        mapQueue(qID, queue);
    }

    myDistriGroup->addBE(group, successor_id, be_id);
}

void RoutingList::removeBE(int be_id)
{
    myDistriGroup->removeBE(be_id);
}

void RoutingList::removeGroup(sci_group_t group)
{
    myDistriGroup->remove(group);
}

void RoutingList::updateParentId(int pid)
{
    ::setenv("SCI_PARENT_ID", SysUtil::itoa(pid).c_str(), 1);
    myDistriGroup->setPID(pid);
}

bool RoutingList::isGroupExist(sci_group_t group)
{
    return myDistriGroup->isGroupExist(group);
}

bool RoutingList::isSuccessorExist(int successor_id)
{
    return myDistriGroup->isSuccessorExist(successor_id);
}


int RoutingList::numOfBE(sci_group_t group)
{
    return myDistriGroup->numOfBE(group);
}

int RoutingList::numOfSuccessor(sci_group_t group)
{
    return myDistriGroup->numOfSuccessor(group);
}

int RoutingList::numOfBEOfSuccessor(int successor_id)
{
    if (successor_id >= 0) {
        // if it is a back end
        return 1;
    }

    return myDistriGroup->numOfBEOfSuccessor(successor_id);
}

int RoutingList::querySuccessorId(int be_id)
{
    return myDistriGroup->querySuccessorId(be_id);
}

void RoutingList::retrieveBEList(sci_group_t group, int * ret_val)
{
    assert(ret_val);
    myDistriGroup->retrieveBEList(group, ret_val);
}

void RoutingList::retrieveSuccessorList(sci_group_t group, int * ret_val)
{
    assert(ret_val);
    myDistriGroup->retrieveSuccessorList(group, ret_val);
}

void RoutingList::mapQueue(int hndl, MessageQueue *queue)
{
    lock();
    queueInfo[hndl] = queue;
    unlock();
}

MessageQueue * RoutingList::queryQueue(int hndl)
{       
    MessageQueue *queue = NULL;

    lock();
    QUEUE_MAP::iterator qit = queueInfo.find(hndl);
    if (qit != queueInfo.end()) {
        queue = (*qit).second;
    }
    unlock();

    return queue;
}

void RoutingList::lock()
{
    ::pthread_mutex_lock(&mtx);
}

void RoutingList::unlock()
{
    ::pthread_mutex_unlock(&mtx);
}

Topology * RoutingList::getTopology()
{
    return topology;
}

FilterProcessor * RoutingList::getFilterProcessor()
{
    return filterProc;
}

void RoutingList::setFilterProcessor(FilterProcessor *proc)
{
    filterProc = proc;
}
