blob: ab72ba37a00f2a9cc8f0f570b94f55dad8d943b2 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2005, 2012 Oracle. All rights reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0, which accompanies this distribution
* and is available at https://www.eclipse.org/legal/epl-2.0/.
*
* Contributors:
* Oracle - initial API and implementation
******************************************************************************/
package org.eclipse.jpt.common.utility.internal.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import org.eclipse.jpt.common.utility.internal.ObjectTools;
/**
* Improved implementation of the JDK's piped streams. This pipe allows
* multiple threads to write and read the pipe and does not poll the pipe
* and does not kill the pipe if a thread writing to it dies.
*/
public class Pipe {
private final int bufferSize;
private final byte[] buffer;
private final long timeout;
private final InputStream in = new InputStreamAdapter();
private final OutputStream out = new OutputStreamAdapter();
private boolean readLap = false;
private int readIndex = 0;
private boolean writeLap = false;
private int writeIndex = 0;
private boolean inputStreamClosed = false;
private boolean outputStreamClosed = false;
// ********** constructors **********
/**
* Construct a pipe with the default buffer size (2048) and time-out
* value (zero, which means no time-out).
*/
public Pipe() {
this(2048);
}
/**
* Construct a pipe with the specified buffer size and the default
* time-out value (zero, which means no time-out).
*/
public Pipe(int bufferSize) {
this(bufferSize, 0L);
}
/**
* Construct a pipe with the specified buffer size and
* time-out value. A time-out value of zero will cause reads to
* block indefinitely.
*/
public Pipe(int bufferSize, long timeout) {
super();
this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize];
this.timeout = timeout;
}
// ********** accessors **********
/**
* Return the input stream half of the pipe.
* This stream will return the data written to
* the output stream half of the pipe.
*/
public InputStream getInputStream() {
return this.in;
}
/**
* Return the output stream half of the pipe.
* Data written to this stream will be returned
* by the input stream half of the pipe.
*/
public OutputStream getOutputStream() {
return this.out;
}
// ********** reading **********
/**
* Return the number of bytes currently available for reading
* from the input stream.
*/
synchronized int available() {
if (this.inputStreamClosed) {
return 0;
}
return this.bytesInPipe();
}
/**
* Return the number of bytes currently available for reading
* from the input stream.
*/
private int bytesInPipe() {
int diff = this.writeIndex - this.readIndex;
if (diff == 0) {
// the write is either at the same position or a complete lap ahead
return (this.writeLap == this.readLap) ? 0 : this.bufferSize;
}
return (diff > 0) ? diff : diff + this.bufferSize;
}
/**
* Read a byte from the pipe. This method will block
* until either a byte is available or a time-out occurs.
* If a time-out occurs, throw an InterruptedIOException.
*/
synchronized int read() throws IOException {
if (this.inputStreamClosed) {
throw new IOException("Pipe closed"); //$NON-NLS-1$
}
// wait for a byte to become available
long stop = System.currentTimeMillis() + this.timeout;
long remaining = this.timeout;
while (this.bytesInPipe() == 0) {
if (this.outputStreamClosed) {
return -1; // if the pipe is empty and the output stream is closed, we are done
}
try {
this.wait(remaining);
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
if (this.timeout > 0L) {
remaining = stop - System.currentTimeMillis();
if (remaining <= 0L) {
throw new InterruptedIOException(); // a time-out occurred
}
}
if (this.inputStreamClosed) {
throw new IOException("Pipe closed"); // the pipe might've been closed while we were waiting //$NON-NLS-1$
}
}
// a byte is available and the input stream is still open
byte b = this.buffer[this.readIndex];
this.readIndex++;
if (this.readIndex == this.bufferSize) {
this.readLap = ! this.readLap;
this.readIndex = 0;
}
// notify any waiting writers that there is free space in the pipe
this.notifyAll();
return b & 0xFF;
}
/**
* Read the specified number of bytes from the pipe and
* into the specified byte array at the specified offset.
* Return the actual number of bytes read.
* This method will block until either a byte is available
* or a time-out occurs. If a time-out occurs, throw an
* InterruptedIOException.
*/
synchronized int read(byte[] b, int off, int len) throws IOException {
if (this.inputStreamClosed) {
throw new IOException("Pipe closed"); //$NON-NLS-1$
}
// wait for a byte to become available
long stop = System.currentTimeMillis() + this.timeout;
long remaining = this.timeout;
int bytesInPipe = this.bytesInPipe();
while (bytesInPipe == 0) {
if (this.outputStreamClosed) {
return -1; // if the pipe is empty and the output stream is closed, we are done
}
try {
this.wait(remaining);
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
if (this.timeout > 0L) {
remaining = stop - System.currentTimeMillis();
if (remaining <= 0L) {
throw new InterruptedIOException(); // a time-out occurred
}
}
if (this.inputStreamClosed) {
throw new IOException("Pipe closed"); // the pipe might've been closed while we were waiting //$NON-NLS-1$
}
bytesInPipe = this.bytesInPipe();
}
// some bytes are available and the input stream is still open
int bytesRead = (len > bytesInPipe) ? bytesInPipe : len;
int copyLength1 = this.bufferSize - this.readIndex;
if (copyLength1 > bytesRead) {
copyLength1 = bytesRead;
}
System.arraycopy(this.buffer, this.readIndex, b, off, copyLength1);
this.readIndex += copyLength1;
if (this.readIndex == this.bufferSize) {
this.readLap = ! this.readLap;
this.readIndex = 0;
}
int copyLength2 = bytesRead - copyLength1;
if (copyLength2 > 0) {
System.arraycopy(this.buffer, 0, b, off + copyLength1, copyLength2);
this.readIndex += copyLength2;
}
// notify any waiting writers that there is free space in the pipe
this.notifyAll();
return bytesRead;
}
/**
* Close the input stream half of the pipe. This will effectively
* close the output stream half of the pipe also, preventing any
* further data from being written to the pipe.
*/
synchronized void closeInputStream() {
if (this.inputStreamClosed) {
throw new IllegalStateException("InputStream already closed"); //$NON-NLS-1$
}
this.inputStreamClosed = true;
this.outputStreamClosed = true;
this.notifyAll();
}
// ********** writing **********
/**
* Return the number of bytes currently available in the buffer
* for writing.
*/
private int freeSpace() {
int diff = this.readIndex - this.writeIndex;
if (diff == 0) {
// the write is either at the same position or a complete lap ahead
return (this.writeLap == this.readLap) ? this.bufferSize : 0;
}
return (diff > 0) ? diff : diff + this.bufferSize;
}
/**
* Write the specified byte to the pipe. This method will block
* indefinitely if the pipe is full.
*/
synchronized void write(int b) throws IOException {
if (this.outputStreamClosed) {
throw new IOException("Pipe closed"); //$NON-NLS-1$
}
// wait for some free space
while (this.freeSpace() == 0) {
try {
this.wait();
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
if (this.outputStreamClosed) {
throw new IOException("Pipe closed"); // the pipe might've been closed while we were waiting //$NON-NLS-1$
}
}
// free space is available and the output stream is still open
this.buffer[this.writeIndex] = (byte) b;
this.writeIndex++;
if (this.writeIndex == this.bufferSize) {
this.writeLap = ! this.writeLap;
this.writeIndex = 0;
}
this.notifyAll();
}
/**
* Write the specified bytes to the pipe. This method will block
* indefinitely if the pipe is full.
*/
synchronized void write(byte[] b, int off, int len) throws IOException {
if (this.outputStreamClosed) {
throw new IOException("Pipe closed"); //$NON-NLS-1$
}
while (len > 0) {
// wait for some free space
int freeSpace = this.freeSpace();
while (freeSpace == 0) {
try {
this.wait();
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
if (this.outputStreamClosed) {
throw new IOException("Pipe closed"); // the pipe might've been closed while we were waiting //$NON-NLS-1$
}
freeSpace = this.freeSpace();
}
// free space is available and the output stream is still open;
// calculate how many bytes we can put in the buffer this pass
int tempLength = (len > freeSpace) ? freeSpace : len;
int copyLength1 = this.bufferSize - this.writeIndex;
if (copyLength1 > tempLength) {
copyLength1 = tempLength;
}
System.arraycopy(b, off, this.buffer, this.writeIndex, copyLength1);
this.writeIndex += copyLength1;
if (this.writeIndex == this.bufferSize) {
this.writeLap = ! this.writeLap;
this.writeIndex = 0;
}
int copyLength2 = tempLength - copyLength1;
if (copyLength2 > 0) {
System.arraycopy(b, off + copyLength1, this.buffer, 0, copyLength2);
this.writeIndex += copyLength2;
}
// move to the next chunk of bytes
off += tempLength;
len -= tempLength;
// notify any waiting readers that there is data to be read from the pipe
this.notifyAll();
}
}
/**
* Close the output stream half of the pipe. Although no more data
* can be written to the pipe after this method is called, the input
* stream half of the pipe can remain open and will return the data
* still remaining in the buffer.
*/
synchronized void closeOutputStream() {
if (this.outputStreamClosed) {
throw new IllegalStateException("OutputStream already closed"); //$NON-NLS-1$
}
this.outputStreamClosed = true;
this.notifyAll();
}
// ********** standard methods **********
@Override
public String toString() {
return ObjectTools.toString(this);
}
// ********** inner classes **********
/**
* Adapt the pipe to the {@link InputStream} specification.
*/
private class InputStreamAdapter
extends InputStream
{
InputStreamAdapter() {
super();
}
@Override
public int available() throws IOException {
return Pipe.this.available();
}
@Override
public int read() throws IOException {
return Pipe.this.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return Pipe.this.read(b, off, len);
}
@Override
public void close() throws IOException {
Pipe.this.closeInputStream();
}
}
/**
* Adapt the pipe to the {@link OutputStream} specification.
*/
private class OutputStreamAdapter
extends OutputStream
{
OutputStreamAdapter() {
super();
}
@Override
public void write(int b) throws IOException {
Pipe.this.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
Pipe.this.write(b, off, len);
}
@Override
public void close() throws IOException {
Pipe.this.closeOutputStream();
}
}
}