blob: 262f8dbf026253545214eee6efe5b4bc734e38ec [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2000, 2011 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package org.eclipse.mylyn.commons.core.io;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.mylyn.commons.core.operations.OperationUtil;
/**
* Wraps an input stream that blocks indefinitely to simulate timeouts on read(), skip(), and close(). The resulting
* input stream is buffered and supports retrying operations that failed due to an InterruptedIOException. Supports
* resuming partially completed operations after an InterruptedIOException REGARDLESS of whether the underlying stream
* does unless the underlying stream itself generates InterruptedIOExceptions in which case it must also support
* resuming. Check the bytesTransferred field to determine how much of the operation completed; conversely, at what
* point to resume.
*
* @since 3.7
*/
public class TimeoutInputStream extends FilterInputStream {
// unsynchronized variables
private final long readTimeout; // read() timeout in millis
private final long closeTimeout; // close() timeout in millis, or -1
// requests for the thread (synchronized)
private boolean closeRequested = false; // if true, close requested
// responses from the thread (synchronized)
private final Future<?> future;
private byte[] iobuffer; // circular buffer
private int head = 0; // points to first unread byte
private int length = 0; // number of remaining unread bytes
private IOException ioe = null; // if non-null, contains a pending exception
private boolean waitingForClose = false; // if true, thread is waiting for close()
private boolean growWhenFull = false; // if true, buffer will grow when it is full
private final CountDownLatch closeLatch = new CountDownLatch(1); // if 0, runThread() has finished
/**
* Creates a timeout wrapper for an input stream.
*
* @param in
* the underlying input stream
* @param bufferSize
* the buffer size in bytes; should be large enough to mitigate Thread synchronization and context
* switching overhead
* @param readTimeout
* the number of milliseconds to block for a read() or skip() before throwing an InterruptedIOException;
* 0 blocks indefinitely
* @param closeTimeout
* the number of milliseconds to block for a close() before throwing an InterruptedIOException; 0 blocks
* indefinitely, -1 closes the stream in the background
*/
public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) {
super(in);
this.readTimeout = readTimeout;
this.closeTimeout = closeTimeout;
this.iobuffer = new byte[bufferSize];
this.future = OperationUtil.getExecutorService().submit(new Runnable() {
public void run() {
runThread();
}
});
}
public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout, boolean growWhenFull) {
this(in, bufferSize, readTimeout, closeTimeout);
this.growWhenFull = growWhenFull;
}
/**
* Wraps the underlying stream's method. It may be important to wait for a stream to actually be closed because it
* holds an implicit lock on a system resoure (such as a file) while it is open. Closing a stream may take time if
* the underlying stream is still servicing a previous request.
*
* @throws InterruptedIOException
* if the timeout expired
* @throws IOException
* if an i/o error occurs
*/
@Override
public void close() throws IOException {
if (closeLatch.getCount() == 0) {
return;
}
synchronized (this) {
closeRequested = true;
// interrupts waitUntilClose and triggers closing of stream
future.cancel(true);
checkError();
}
if (closeTimeout == -1) {
return;
}
boolean closed = false;
try {
closed = closeLatch.await(closeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
}
synchronized (this) {
checkError();
if (!closed) {
throw new InterruptedIOException();
}
}
}
/**
* Returns the number of unread bytes in the buffer.
*
* @throws IOException
* if an i/o error occurs
*/
@Override
public synchronized int available() throws IOException {
if (length == 0) {
checkError();
}
return length > 0 ? length : 0;
}
/**
* Reads a byte from the stream.
*
* @throws InterruptedIOException
* if the timeout expired and no data was received, bytesTransferred will be zero
* @throws IOException
* if an i/o error occurs
*/
@Override
public synchronized int read() throws IOException {
if (!syncFill()) {
return -1; // EOF reached
}
int b = iobuffer[head++] & 255;
if (head == iobuffer.length) {
head = 0;
}
length--;
notify();
return b;
}
/**
* Reads multiple bytes from the stream.
*
* @throws InterruptedIOException
* if the timeout expired and no data was received, bytesTransferred will be zero
* @throws IOException
* if an i/o error occurs
*/
@Override
public synchronized int read(byte[] buffer, int off, int len) throws IOException {
if (!syncFill()) {
return -1; // EOF reached
}
int pos = off;
if (len > length) {
len = length;
}
while (len-- > 0) {
buffer[pos++] = iobuffer[head++];
if (head == iobuffer.length) {
head = 0;
}
length--;
}
notify();
return pos - off;
}
/**
* Skips multiple bytes in the stream.
*
* @throws InterruptedIOException
* if the timeout expired before all of the bytes specified have been skipped, bytesTransferred may be
* non-zero
* @throws IOException
* if an i/o error occurs
*/
@Override
public synchronized long skip(long count) throws IOException {
long amount = 0;
try {
do {
if (!syncFill()) {
break; // EOF reached
}
int skip = (int) Math.min(count - amount, length);
head = (head + skip) % iobuffer.length;
length -= skip;
amount += skip;
} while (amount < count);
} catch (InterruptedIOException e) {
e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT
throw e;
}
notify();
return amount;
}
/**
* Mark is not supported by the wrapper even if the underlying stream does, returns false.
*/
@Override
public boolean markSupported() {
return false;
}
/**
* Waits for the buffer to fill if it is empty and the stream has not reached EOF.
*
* @return true if bytes are available, false if EOF has been reached
* @throws InterruptedIOException
* if EOF not reached but no bytes are available
*/
private boolean syncFill() throws IOException {
if (length != 0) {
return true;
}
checkError(); // check errors only after we have read all remaining bytes
if (waitingForClose) {
return false;
}
notify();
try {
wait(readTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
}
if (length != 0) {
return true;
}
checkError(); // check errors only after we have read all remaining bytes
if (waitingForClose) {
return false;
}
throw new InterruptedIOException();
}
/**
* If an exception is pending, throws it.
*/
private void checkError() throws IOException {
if (ioe != null) {
IOException e = ioe;
ioe = null;
throw e;
}
}
/**
* Runs the thread in the background.
*/
private void runThread() {
try {
readUntilDone();
} catch (IOException e) {
synchronized (this) {
ioe = e;
}
} finally {
waitUntilClosed();
try {
in.close();
} catch (IOException e) {
synchronized (this) {
ioe = e;
}
} finally {
closeLatch.countDown();
}
}
}
/**
* Waits until we have been requested to close the stream.
*/
private synchronized void waitUntilClosed() {
waitingForClose = true;
notify();
while (!closeRequested) {
try {
wait();
} catch (InterruptedException e) {
closeRequested = true; // alternate quit signal
}
}
}
/**
* Reads bytes into the buffer until EOF, closed, or error.
*/
private void readUntilDone() throws IOException {
while (!closeRequested) {
int off, len;
synchronized (this) {
while (isBufferFull()) {
if (closeRequested) {
return; // quit signal
}
waitForRead();
}
off = (head + length) % iobuffer.length;
len = ((head > off) ? head : iobuffer.length) - off;
}
int count;
try {
// the i/o operation might block without releasing the lock,
// so we do this outside of the synchronized block
count = in.read(iobuffer, off, len);
if (count == -1) {
return; // EOF encountered
}
} catch (InterruptedIOException e) {
count = e.bytesTransferred; // keep partial transfer
}
synchronized (this) {
length += count;
notify();
}
}
}
/*
* Wait for a read when the buffer is full (with the implication
* that space will become available in the buffer after the read
* takes place).
*/
private synchronized void waitForRead() {
try {
if (growWhenFull) {
// wait a second before growing to let reads catch up
wait(readTimeout);
} else {
wait();
}
} catch (InterruptedException e) {
closeRequested = true; // alternate quit signal
}
// If the buffer is still full, give it a chance to grow
if (growWhenFull && isBufferFull()) {
growBuffer();
}
}
private synchronized void growBuffer() {
int newSize = 2 * iobuffer.length;
if (newSize > iobuffer.length) {
if (Policy.DEBUG_STREAMS) {
System.out.println("InputStream growing to " + newSize + " bytes"); //$NON-NLS-1$ //$NON-NLS-2$
}
byte[] newBuffer = new byte[newSize];
int pos = 0;
int len = length;
while (len-- > 0) {
newBuffer[pos++] = iobuffer[head++];
if (head == iobuffer.length) {
head = 0;
}
}
iobuffer = newBuffer;
head = 0;
// length instance variable was not changed by this method
}
}
private boolean isBufferFull() {
return length == iobuffer.length;
}
}