blob: a63b242e52f955c5efb54e455f03474b820c4720 [file] [log] [blame]
#ifndef _PRAGMA_COPYRIGHT_
#define _PRAGMA_COPYRIGHT_
#pragma comment(copyright, "%Z% %I% %W% %D% %T%\0")
#endif /* _PRAGMA_COPYRIGHT_ */
/****************************************************************************
* Copyright (c) 2008, 2010 IBM Corporation.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
Classes: Stream
Description: Data stream processing.
Author: Tu HongJ, Liu Wei
History:
Date Who ID Description
-------- --- --- -----------
10/06/08 tuhongj Initial code (D153875)
****************************************************************************/
#include "stream.hpp"
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "socket.hpp"
const int BUFFER_SIZE = 16384;
const int MAX_NETWORK_SIZE = 1024 * 1024 * 8; // Max length = 8M
const char END_OF_LINE = '\n';
void endl() {}
Stream::Stream()
: socket(NULL)
{
buffer = new char[BUFFER_SIZE];
cursor = buffer;
*cursor = '\0';
readActive = false;
writeActive = false;
}
Stream::~Stream()
{
if (socket != NULL)
delete socket;
delete [] buffer;
}
int Stream::init(const char *nodeAddr, in_port_t port)
{
if (socket != NULL) {
socket->close(Socket::BOTH);
} else {
socket = new Socket();
}
if ((nodeAddr == NULL) || (port <= 0))
return -1;
socket->connect(nodeAddr, port);
readActive = true;
writeActive = true;
return 0;
}
int Stream::init(int sockfd)
{
if (socket != NULL) {
socket->close(Socket::BOTH);
} else {
socket = new Socket();
}
socket->setFd(sockfd);
readActive = true;
writeActive = true;
return 0;
}
void Stream::read(char *buf, int size)
{
int n = 0;
int count = size;
char *p = buf;
while (n < size) {
count = size - n;
n += socket->recv(p, count);
p = buf + n;
}
}
void Stream::write(const char *buf, int size)
{
int len = size; // including '\0' at the end
int count = len;
char *p = (char *) buf;
while (len > 0) {
checkBuffer(len);
count = (len - BUFFER_SIZE) > 0 ? BUFFER_SIZE : len;
memcpy(cursor, p, count);
cursor += count;
p += count;
len -= count;
}
}
void Stream::stop()
{
stopRead();
stopWrite();
}
void Stream::stopRead()
{
if (readActive) {
readActive = false;
socket->close(Socket::READ);
}
}
void Stream::stopWrite()
{
if (writeActive) {
writeActive = false;
socket->close(Socket::WRITE);
}
}
bool Stream::isReadActive()
{
return readActive;
}
bool Stream::isWriteActive()
{
return writeActive;
}
Stream & Stream::flush()
{
try {
socket->send(buffer, cursor - buffer);
} catch (SocketException &e) {
cursor = buffer;;
throw;
} catch (...) {
cursor = buffer;
throw;
}
cursor = buffer;
return *this;
}
Stream & Stream::operator >> (char &value)
{
read(&value, sizeof(value));
return *this;
}
Stream & Stream::operator >> (bool &value)
{
read((char *)&value, sizeof(value));
return *this;
}
Stream & Stream::operator >> (int &value)
{
read((char *)&value, sizeof(value));
value = ntohl(value);
return *this;
}
Stream & Stream::operator >> (long &value)
{
int low = 0;
int high = 0;
int nbyte;
*this >> nbyte;
*this >> low;
nbyte -= sizeof(int);
if (nbyte != 0) {
*this >> high;
}
value = high << sizeof(int) | low;
return *this;
}
Stream & Stream::operator >> (char *value)
{
int len;
*this >> len;
read(value, len);
return *this;
}
Stream & Stream::operator >> (string &value)
{
int len;
char *buf = NULL;
*this >> len;
if ((len < 0) || (len > MAX_NETWORK_SIZE))
throw SocketException(SocketException::NET_ERR_DATA);
buf = new char[len];
read(buf, len);
value = buf;
delete [] buf;
return *this;
}
Stream & Stream::operator >> (struct iovec &value)
{
*this >> (long &)value.iov_len;
if (value.iov_len > (int)MAX_NETWORK_SIZE)
throw SocketException(SocketException::NET_ERR_DATA);
if (value.iov_len > 0) {
value.iov_base = new char[value.iov_len]; // must free it outside
read((char *)value.iov_base, value.iov_len);
}
return *this;
}
Stream & Stream::operator >> (EndOfLine)
{
char value;
*this >> value;
if (value != END_OF_LINE)
throw SocketException(SocketException::NET_ERR_DATA);
return *this;
}
Stream & Stream::operator << (char value)
{
checkBuffer(sizeof(value));
*cursor = value;
cursor += sizeof(value);
return *this;
}
Stream & Stream::operator << (bool value)
{
checkBuffer(sizeof(value));
*(bool *)cursor = value;
cursor += sizeof(value);
return *this;
}
Stream & Stream::operator << (int value)
{
int tmp = htonl(value);
checkBuffer(sizeof(value));
memcpy(cursor, &tmp, sizeof(tmp));
cursor += sizeof(value);
return *this;
}
Stream & Stream::operator << (long value)
{
*this << (int)sizeof(value);
if (sizeof(long) > sizeof(int)) {
int low = value << sizeof(int) >> sizeof(int);
int high = value >> sizeof(int);
*this << low << high;
} else {
*this << (int)value;
}
return *this;
}
Stream & Stream::operator << (const char *value)
{
int len = ::strlen(value) + 1; // including '\0' at the end
*this << len;
int count = len;
char *p = (char *)value;
while (len > 0) {
checkBuffer(len);
count = (len - BUFFER_SIZE) > 0 ? BUFFER_SIZE : len;
::memcpy(cursor, p, count);
cursor += count;
p += count;
len -= count;
}
return *this;
}
Stream & Stream::operator << (const string &value)
{
*this << value.c_str();
return *this;
}
Stream & Stream::operator << (struct iovec &value)
{
long len = (long)value.iov_len;
int count = len;
char *p = (char *)value.iov_base;
*this << len;
while (len > 0) {
checkBuffer(len);
count = (len - BUFFER_SIZE) > 0 ? BUFFER_SIZE : len;
::memcpy(cursor, p, count);
cursor += count;
p += count;
len -= count;
}
return *this;
}
Stream & Stream::operator << (EndOfLine)
{
*this << END_OF_LINE;
return flush();
}
void Stream::checkBuffer(int size)
{
if ((cursor - buffer + size) >= BUFFER_SIZE)
flush();
}
int Stream::getSocket()
{
if (socket != NULL) {
return socket->getFd();
} else {
return -1;
}
}