/*******************************************************************************
 * Copyright (c) 2000, 2011 IBM Corporation and others.
 * 
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * https://www.eclipse.org/legal/epl-2.0
 * 
 * SPDX-License-Identifier: EPL-2.0
 *
 *     IBM Corporation - initial API and implementation
 *******************************************************************************/
package org.eclipse.mylyn.commons.core.io;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.mylyn.commons.core.operations.OperationUtil;

/**
 * Wraps an input stream that blocks indefinitely to simulate timeouts on read(), skip(), and close(). The resulting
 * input stream is buffered and supports retrying operations that failed due to an InterruptedIOException. Supports
 * resuming partially completed operations after an InterruptedIOException REGARDLESS of whether the underlying stream
 * does unless the underlying stream itself generates InterruptedIOExceptions in which case it must also support
 * resuming. Check the bytesTransferred field to determine how much of the operation completed; conversely, at what
 * point to resume.
 * 
 * @since 3.7
 */
public class TimeoutInputStream extends FilterInputStream {
	// unsynchronized variables
	private final long readTimeout; // read() timeout in millis

	private final long closeTimeout; // close() timeout in millis, or -1

	// requests for the thread (synchronized)
	private boolean closeRequested = false; // if true, close requested

	// responses from the thread (synchronized)
	private final Future<?> future;

	private byte[] iobuffer; // circular buffer

	private int head = 0; // points to first unread byte

	private int length = 0; // number of remaining unread bytes

	private IOException ioe = null; // if non-null, contains a pending exception

	private boolean waitingForClose = false; // if true, thread is waiting for close()

	private boolean growWhenFull = false; // if true, buffer will grow when it is full

	private final CountDownLatch closeLatch = new CountDownLatch(1); // if 0, runThread() has finished

	/**
	 * Creates a timeout wrapper for an input stream.
	 * 
	 * @param in
	 *            the underlying input stream
	 * @param bufferSize
	 *            the buffer size in bytes; should be large enough to mitigate Thread synchronization and context
	 *            switching overhead
	 * @param readTimeout
	 *            the number of milliseconds to block for a read() or skip() before throwing an InterruptedIOException;
	 *            0 blocks indefinitely
	 * @param closeTimeout
	 *            the number of milliseconds to block for a close() before throwing an InterruptedIOException; 0 blocks
	 *            indefinitely, -1 closes the stream in the background
	 */
	public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) {
		super(in);
		this.readTimeout = readTimeout;
		this.closeTimeout = closeTimeout;
		this.iobuffer = new byte[bufferSize];
		this.future = OperationUtil.getExecutorService().submit(new Runnable() {
			public void run() {
				runThread();
			}
		});
	}

	public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout, boolean growWhenFull) {
		this(in, bufferSize, readTimeout, closeTimeout);
		this.growWhenFull = growWhenFull;
	}

	/**
	 * Wraps the underlying stream's method. It may be important to wait for a stream to actually be closed because it
	 * holds an implicit lock on a system resoure (such as a file) while it is open. Closing a stream may take time if
	 * the underlying stream is still servicing a previous request.
	 * 
	 * @throws InterruptedIOException
	 *             if the timeout expired
	 * @throws IOException
	 *             if an i/o error occurs
	 */
	@Override
	public void close() throws IOException {
		if (closeLatch.getCount() == 0) {
			return;
		}
		synchronized (this) {
			closeRequested = true;
			// interrupts waitUntilClose and triggers closing of stream 
			future.cancel(true);
			checkError();
		}
		if (closeTimeout == -1) {
			return;
		}
		boolean closed = false;
		try {
			closed = closeLatch.await(closeTimeout, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
		}
		synchronized (this) {
			checkError();
			if (!closed) {
				throw new InterruptedIOException();
			}
		}
	}

	/**
	 * Returns the number of unread bytes in the buffer.
	 * 
	 * @throws IOException
	 *             if an i/o error occurs
	 */
	@Override
	public synchronized int available() throws IOException {
		if (length == 0) {
			checkError();
		}
		return length > 0 ? length : 0;
	}

	/**
	 * Reads a byte from the stream.
	 * 
	 * @throws InterruptedIOException
	 *             if the timeout expired and no data was received, bytesTransferred will be zero
	 * @throws IOException
	 *             if an i/o error occurs
	 */
	@Override
	public synchronized int read() throws IOException {
		if (!syncFill()) {
			return -1; // EOF reached
		}
		int b = iobuffer[head++] & 255;
		if (head == iobuffer.length) {
			head = 0;
		}
		length--;
		notify();
		return b;
	}

	/**
	 * Reads multiple bytes from the stream.
	 * 
	 * @throws InterruptedIOException
	 *             if the timeout expired and no data was received, bytesTransferred will be zero
	 * @throws IOException
	 *             if an i/o error occurs
	 */
	@Override
	public synchronized int read(byte[] buffer, int off, int len) throws IOException {
		if (!syncFill()) {
			return -1; // EOF reached
		}
		int pos = off;
		if (len > length) {
			len = length;
		}
		while (len-- > 0) {
			buffer[pos++] = iobuffer[head++];
			if (head == iobuffer.length) {
				head = 0;
			}
			length--;
		}
		notify();
		return pos - off;
	}

	/**
	 * Skips multiple bytes in the stream.
	 * 
	 * @throws InterruptedIOException
	 *             if the timeout expired before all of the bytes specified have been skipped, bytesTransferred may be
	 *             non-zero
	 * @throws IOException
	 *             if an i/o error occurs
	 */
	@Override
	public synchronized long skip(long count) throws IOException {
		long amount = 0;
		try {
			do {
				if (!syncFill()) {
					break; // EOF reached
				}
				int skip = (int) Math.min(count - amount, length);
				head = (head + skip) % iobuffer.length;
				length -= skip;
				amount += skip;
			} while (amount < count);
		} catch (InterruptedIOException e) {
			e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT
			throw e;
		}
		notify();
		return amount;
	}

	/**
	 * Mark is not supported by the wrapper even if the underlying stream does, returns false.
	 */
	@Override
	public boolean markSupported() {
		return false;
	}

	/**
	 * Waits for the buffer to fill if it is empty and the stream has not reached EOF.
	 * 
	 * @return true if bytes are available, false if EOF has been reached
	 * @throws InterruptedIOException
	 *             if EOF not reached but no bytes are available
	 */
	private boolean syncFill() throws IOException {
		if (length != 0) {
			return true;
		}
		checkError(); // check errors only after we have read all remaining bytes
		if (waitingForClose) {
			return false;
		}
		notify();
		try {
			wait(readTimeout);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
		}
		if (length != 0) {
			return true;
		}
		checkError(); // check errors only after we have read all remaining bytes
		if (waitingForClose) {
			return false;
		}
		throw new InterruptedIOException();
	}

	/**
	 * If an exception is pending, throws it.
	 */
	private void checkError() throws IOException {
		if (ioe != null) {
			IOException e = ioe;
			ioe = null;
			throw e;
		}
	}

	/**
	 * Runs the thread in the background.
	 */
	private void runThread() {
		try {
			readUntilDone();
		} catch (IOException e) {
			synchronized (this) {
				ioe = e;
			}
		} finally {
			waitUntilClosed();
			try {
				in.close();
			} catch (IOException e) {
				synchronized (this) {
					ioe = e;
				}
			} finally {
				closeLatch.countDown();
			}
		}
	}

	/**
	 * Waits until we have been requested to close the stream.
	 */
	private synchronized void waitUntilClosed() {
		waitingForClose = true;
		notify();
		while (!closeRequested) {
			try {
				wait();
			} catch (InterruptedException e) {
				closeRequested = true; // alternate quit signal
			}
		}
	}

	/**
	 * Reads bytes into the buffer until EOF, closed, or error.
	 */
	private void readUntilDone() throws IOException {
		while (!closeRequested) {
			int off, len;
			synchronized (this) {
				while (isBufferFull()) {
					if (closeRequested) {
						return; // quit signal
					}
					waitForRead();
				}
				off = (head + length) % iobuffer.length;
				len = ((head > off) ? head : iobuffer.length) - off;
			}
			int count;
			try {
				// the i/o operation might block without releasing the lock,
				// so we do this outside of the synchronized block
				count = in.read(iobuffer, off, len);
				if (count == -1) {
					return; // EOF encountered
				}
			} catch (InterruptedIOException e) {
				count = e.bytesTransferred; // keep partial transfer
			}
			synchronized (this) {
				length += count;
				notify();
			}
		}
	}

	/*
	 * Wait for a read when the buffer is full (with the implication
	 * that space will become available in the buffer after the read 
	 * takes place).
	 */
	private synchronized void waitForRead() {
		try {
			if (growWhenFull) {
				// wait a second before growing to let reads catch up
				wait(readTimeout);
			} else {
				wait();
			}
		} catch (InterruptedException e) {
			closeRequested = true; // alternate quit signal
		}
		// If the buffer is still full, give it a chance to grow
		if (growWhenFull && isBufferFull()) {
			growBuffer();
		}
	}

	private synchronized void growBuffer() {
		int newSize = 2 * iobuffer.length;
		if (newSize > iobuffer.length) {
			if (Policy.DEBUG_STREAMS) {
				System.out.println("InputStream growing to " + newSize + " bytes"); //$NON-NLS-1$ //$NON-NLS-2$
			}
			byte[] newBuffer = new byte[newSize];
			int pos = 0;
			int len = length;
			while (len-- > 0) {
				newBuffer[pos++] = iobuffer[head++];
				if (head == iobuffer.length) {
					head = 0;
				}
			}
			iobuffer = newBuffer;
			head = 0;
			// length instance variable was not changed by this method
		}
	}

	private boolean isBufferFull() {
		return length == iobuffer.length;
	}
}
