blob: d8e4dfe51d92c795b9100365dce1c27816e42bcb [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: MessageQueue
Description: Messages manipulation.
Author: Tu HongJ, Nicole Nie, Liu Wei
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 tuhongj Initial code (D153875)
****************************************************************************/
#include "queue.hpp"
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include <assert.h>
#include "exception.hpp"
#include "ctrlblock.hpp"
#include "log.hpp"
#include "atomic.hpp"
#include "message.hpp"
const long long FLOWCTL_THRESHOLD = 1024 * 1024 * 1024 * 2LL;
volatile long long MessageQueue::thresHold = 0;
MessageQueue::MessageQueue()
{
::pthread_mutex_init(&mtx, NULL);
::sem_init(&sem, 0, 0);
}
MessageQueue::~MessageQueue()
{
Message *msg = NULL;
while (!queue.empty()) {
msg = queue.front();
queue.pop_front();
if (msg->decRefCount() == 0) {
delete msg;
}
}
queue.clear();
::pthread_mutex_destroy(&mtx);
::sem_destroy(&sem);
}
int MessageQueue::flowControl(int size)
{
if ((size > 0) && (thresHold > FLOWCTL_THRESHOLD)) {
sleep(1);
}
return 0;
}
int MessageQueue::multiProduce(Message **msgs, int num)
{
assert(msgs && (num > 0));
int i;
int len = 0;
for (i = 0; i < num; i++) {
assert(msgs[i]);
len += msgs[i]->getContentLen();
}
lock();
for (i = 0; i < num; i++) {
queue.push_back(msgs[i]);
::sem_post(&sem);
}
thresHold += len;
unlock();
flowControl(len);
return 0;
}
void MessageQueue::produce(Message *msg)
{
int len = 0;
if (!msg) {
::sem_post(&sem);
return;
}
len = msg->getContentLen();
lock();
queue.push_back(msg);
thresHold += len;
unlock();
::sem_post(&sem);
flowControl(len);
return;
}
int MessageQueue::multiConsume(Message **msgs, int num)
{
int i;
int len = 0;
for (i = 0; i < num; i++) {
if (sem_wait_i(&sem, -1) != 0) {
return NULL;
}
}
lock();
for (i = 0; i < num; i++) {
msgs[i] = queue.front();
queue.pop_front();
len += msgs[i]->getContentLen();
}
thresHold -= len;
unlock();
flowControl(-len);
return 0;
}
Message* MessageQueue::consume(int millisecs)
{
int len = 0;
if (sem_wait_i(&sem, millisecs*1000) != 0) {
return NULL;
}
Message *msg = NULL;
lock();
if (!queue.empty()) {
msg = queue.front();
len = msg->getContentLen();
thresHold -= len;
}
unlock();
flowControl(-len);
return msg;
}
void MessageQueue::remove()
{
Message *msg = NULL;
lock();
if (queue.empty()) {
unlock();
return;
}
msg = queue.front();
queue.pop_front();
unlock();
if (msg->decRefCount() == 0) {
delete msg;
}
}
int MessageQueue::getSize()
{
int size;
lock();
size = queue.size();
unlock();
return size;
}
int MessageQueue::sem_wait_i(sem_t *psem, int usecs)
{
int rc = 0;
if (usecs < 0) {
while (((rc = ::sem_wait(psem)) != 0) && (errno == EINTR));
return rc;
} else {
timespec ts;
::clock_gettime(CLOCK_REALTIME, &ts); // get current time
ts.tv_nsec += (usecs % 1000000) * 1000;
int ca = (ts.tv_nsec >= 1000000000) ? 1 : 0;
ts.tv_nsec %= 1000000000;
ts.tv_sec += (usecs / 1000000) + ca;
while (((rc=::sem_timedwait(psem, &ts))!=0) && (errno == EINTR));
return rc;
}
}
void MessageQueue::lock()
{
::pthread_mutex_lock(&mtx);
}
void MessageQueue::unlock()
{
::pthread_mutex_unlock(&mtx);
}