blob: 4430ed4d95b8d3df9482bdc21ad1ca0614e89c1c [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: MessageQueue
Description: Messages manipulation.
Author: Tu HongJ, Nicole Nie, Liu Wei
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 tuhongj Initial code (D153875)
01/16/12 ronglli Fix the issue of semaphore overflow
****************************************************************************/
#include "queue.hpp"
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include <assert.h>
#ifdef __APPLE__
#include <mach/mach.h>
#endif /* __APPLE__ */
#include "exception.hpp"
#include "ctrlblock.hpp"
#include "log.hpp"
#include "atomic.hpp"
#include "message.hpp"
#include "tools.hpp"
MessageQueue::MessageQueue(bool ctl)
: thresHold(0), flowCtl(ctl)
{
state = true;
::pthread_mutex_init(&mtx, NULL);
#ifndef __APPLE__
::sem_init(&sem, 0, 0);
#else /* __APPLE__ */
task = ::mach_task_self();
::semaphore_create(task, &sem, SYNC_POLICY_FIFO, 0);
#endif /* __APPLE__ */
}
MessageQueue::~MessageQueue()
{
Message *msg = NULL;
while (!queue.empty()) {
msg = queue.front();
queue.pop_front();
if (decRefCount(msg->getRefCount()) == 0) {
delete msg;
}
}
queue.clear();
::pthread_mutex_destroy(&mtx);
#ifndef __APPLE__
::sem_destroy(&sem);
#else /* __APPLE__ */
::semaphore_destroy(task, sem);
#endif /* __APPLE__ */
}
int MessageQueue::flowControl(int size)
{
long long flowctlThreshold = gCtrlBlock->getFlowctlThreshold();
if(flowCtl) {
if ((gCtrlBlock->getMyRole() != CtrlBlock::BACK_END) && (size > 0)) {
while ((thresHold > flowctlThreshold) && (gCtrlBlock->getFlowctlState())) {
SysUtil::sleep(WAIT_INTERVAL);
}
}
}
return 0;
}
int MessageQueue::multiProduce(Message **msgs, int num)
{
assert(msgs && (num > 0));
int i;
int len = 0;
for (i = 0; i < num; i++) {
assert(msgs[i]);
len += msgs[i]->getContentLen();
}
lock();
for (i = 0; i < num; i++) {
queue.push_back(msgs[i]);
}
thresHold += len;
unlock();
release();
flowControl(len);
return 0;
}
void MessageQueue::release()
{
int cnt = 0;
#ifndef __APPLE__
while (::sem_post(&sem) != 0) {
#else /* __APPLE__ */
while (::semaphore_signal(sem) != 0) {
#endif /* __APPLE__ */
if (!state)
break;
if (!gCtrlBlock->getFlowctlState()) {
if (cnt > 10) {
state = false;
break;
}
cnt++;
}
SysUtil::sleep(WAIT_INTERVAL);
}
}
int MessageQueue::sem_getvalue_i()
{
int i = -1;
#ifndef __APPLE__
::sem_getvalue(&sem, &i);
#endif /* __APPLE__ */
return i;
}
void MessageQueue::produce(Message *msg)
{
int len = 0;
if (!msg) {
return;
}
len = msg->getContentLen();
lock();
queue.push_back(msg);
thresHold += len;
unlock();
#ifdef _SCI_DEBUG
#ifndef __APPLE__
int val = sem_getvalue_i();
log_debug("queue %s: produce: sem value = %ld, thresHold = %ld", name.c_str(), val, thresHold);
#endif /* __APPLE__ */
#endif
release();
flowControl(len);
return;
}
int MessageQueue::multiConsume(Message **msgs, int num)
{
int i;
int len = 0;
for (i = 0; i < num; i++) {
if (sem_wait_i(-1) != 0) {
return -1;
}
}
lock();
for (i = 0; i < num; i++) {
msgs[i] = queue.front();
queue.pop_front();
len += msgs[i]->getContentLen();
}
thresHold -= len;
unlock();
return 0;
}
Message* MessageQueue::consume(int millisecs)
{
int len = 0;
if (sem_wait_i(millisecs*1000) != 0) {
return NULL;
}
Message *msg = NULL;
lock();
if (!queue.empty()) {
msg = queue.front();
len = msg->getContentLen();
thresHold -= len;
}
unlock();
return msg;
}
void MessageQueue::remove()
{
Message *msg = NULL;
lock();
if (queue.empty()) {
unlock();
return;
}
msg = queue.front();
queue.pop_front();
unlock();
if (decRefCount(msg->getRefCount()) == 0) {
delete msg;
}
}
int MessageQueue::getSize()
{
int size;
lock();
size = queue.size();
unlock();
return size;
}
bool MessageQueue::getState()
{
return state;
}
void MessageQueue::setName(char *str)
{
name = str;
if (name == "filterInQ") {
flowCtl = true;
}
}
string MessageQueue::getName()
{
return name;
}
int MessageQueue::sem_wait_i(int usecs)
{
int rc = 0;
#ifdef _SCI_DEBUG
#ifndef __APPLE__
int val = sem_getvalue_i();
log_debug("queue %s: sem value = %ld, thresHold = %ld", name.c_str(), val, thresHold);
#endif /* __APPLE__ */
#endif
#ifndef __APPLE__
if (usecs < 0) {
while (((rc = ::sem_wait(&sem)) != 0) && (errno == EINTR));
return rc;
} else {
timespec ts;
::clock_gettime(CLOCK_REALTIME, &ts); // get current time
ts.tv_nsec += (usecs % 1000000) * 1000;
int ca = (ts.tv_nsec >= 1000000000) ? 1 : 0;
ts.tv_nsec %= 1000000000;
ts.tv_sec += (usecs / 1000000) + ca;
while (((rc=::sem_timedwait(&sem, &ts))!=0) && (errno == EINTR));
return rc;
}
#else /* __APPLE__ */
if (usecs < 0) {
::semaphore_wait(sem);
} else {
mach_timespec_t ts;
struct timeval tv;
::gettimeofday(&tv, NULL); // get current time
ts.tv_nsec = (tv.tv_usec + (usecs % 1000000)) * 1000;
int ca = (ts.tv_nsec >= 1000000000) ? 1 : 0;
ts.tv_nsec %= 1000000000;
ts.tv_sec = tv.tv_sec + (usecs / 1000000) + ca;
::semaphore_timedwait(sem, ts);
}
return 0;
#endif /* __APPLE__ */
}
void MessageQueue::lock()
{
::pthread_mutex_lock(&mtx);
}
void MessageQueue::unlock()
{
::pthread_mutex_unlock(&mtx);
}