blob: e5d31f082a70852596dfe6fe99b7b7b8d6816cd1 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.Objects;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
* <p/>
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
* maintains two states: the content state that tells whether there is content to consume and the EOF
* state that tells whether an EOF has arrived.
* Only once the content has been consumed the content state is moved to the EOF state.
*/
public abstract class HttpInput<T> extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final byte[] _oneByteBuffer = new byte[1];
private final Object _lock;
private HttpChannelState _channelState;
private ReadListener _listener;
private Throwable _onError;
private boolean _notReady;
private State _contentState = STREAM;
private State _eofState;
private long _contentRead;
protected HttpInput()
{
this(null);
}
protected HttpInput(Object lock)
{
_lock = lock == null ? this : lock;
}
public void init(HttpChannelState state)
{
synchronized (lock())
{
_channelState = state;
}
}
public final Object lock()
{
return _lock;
}
public void recycle()
{
synchronized (lock())
{
_listener = null;
_onError = null;
_notReady = false;
_contentState = STREAM;
_eofState = null;
_contentRead = 0;
}
}
@Override
public int available()
{
try
{
synchronized (lock())
{
T item = getNextContent();
return item == null ? 0 : remaining(item);
}
}
catch (IOException e)
{
throw new RuntimeIOException(e);
}
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
synchronized (lock())
{
T item = getNextContent();
if (item == null)
{
_contentState.waitForContent(this);
item = getNextContent();
if (item == null)
return _contentState.noContent();
}
int l = get(item, b, off, len);
_contentRead += l;
return l;
}
}
/**
* A convenience method to call nextContent and to check the return value, which if null then the
* a check is made for EOF and the state changed accordingly.
*
* @return Content or null if none available.
* @throws IOException
* @see #nextContent()
*/
protected T getNextContent() throws IOException
{
T content = nextContent();
if (content == null)
{
synchronized (lock())
{
if (_eofState != null)
{
LOG.debug("{} eof {}", this, _eofState);
_contentState = _eofState;
}
}
}
return content;
}
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed.
* <p/>
* Calls to {@link #get(Object, byte[], int, int)}
* or {@link #consume(Object, int)} are required to consume data from the content.
*
* @return the content or null if none available.
* @throws IOException if retrieving the content fails
*/
protected abstract T nextContent() throws IOException;
/**
* @param item the content
* @return how many bytes remain in the given content
*/
protected abstract int remaining(T item);
/**
* Copies the given content into the given byte buffer.
*
* @param item the content to copy from
* @param buffer the buffer to copy into
* @param offset the buffer offset to start copying from
* @param length the space available in the buffer
* @return the number of bytes actually copied
*/
protected abstract int get(T item, byte[] buffer, int offset, int length);
/**
* Consumes the given content.
*
* @param item the content to consume
* @param length the number of bytes to consume
*/
protected abstract void consume(T item, int length);
/**
* Blocks until some content or some end-of-file event arrives.
*
* @throws IOException if the wait is interrupted
*/
protected abstract void blockForContent() throws IOException;
/**
* Adds some content to this input stream.
*
* @param item the content to add
*/
public abstract void content(T item);
protected boolean onAsyncRead()
{
synchronized (lock())
{
if (_listener == null)
return false;
}
_channelState.onReadPossible();
return true;
}
public long getContentRead()
{
synchronized (lock())
{
return _contentRead;
}
}
/**
* This method should be called to signal that an EOF has been
* detected before all the expected content arrived.
* <p/>
* Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return.
*/
public void earlyEOF()
{
synchronized (lock())
{
if (!isEOF())
{
LOG.debug("{} early EOF", this);
_eofState = EARLY_EOF;
if (_listener == null)
return;
}
}
_channelState.onReadPossible();
}
/**
* This method should be called to signal that all the expected
* content arrived.
*/
public void messageComplete()
{
synchronized (lock())
{
if (!isEOF())
{
LOG.debug("{} EOF", this);
_eofState = EOF;
if (_listener == null)
return;
}
}
_channelState.onReadPossible();
}
public void consumeAll()
{
synchronized (lock())
{
try
{
while (!isFinished())
{
T item = getNextContent();
if (item == null)
_contentState.waitForContent(this);
else
consume(item, remaining(item));
}
}
catch (IOException e)
{
LOG.debug(e);
}
}
}
public boolean isAsync()
{
synchronized (lock())
{
return _contentState==ASYNC;
}
}
/**
* @return whether an EOF has been detected, even though there may be content to consume.
*/
public boolean isEOF()
{
synchronized (lock())
{
return _eofState != null && _eofState.isEOF();
}
}
@Override
public boolean isFinished()
{
synchronized (lock())
{
return _contentState.isEOF();
}
}
@Override
public boolean isReady()
{
boolean finished;
synchronized (lock())
{
if (_contentState.isEOF())
return true;
if (_listener == null )
return true;
if (available() > 0)
return true;
if (_notReady)
return false;
_notReady = true;
finished = isFinished();
}
if (finished)
_channelState.onReadPossible();
else
unready();
return false;
}
protected void unready()
{
}
@Override
public void setReadListener(ReadListener readListener)
{
readListener = Objects.requireNonNull(readListener);
synchronized (lock())
{
if (_contentState != STREAM)
throw new IllegalStateException("state=" + _contentState);
_contentState = ASYNC;
_listener = readListener;
_notReady = true;
}
_channelState.onReadPossible();
}
public void failed(Throwable x)
{
synchronized (lock())
{
if (_onError == null)
LOG.warn(x);
else
_onError = x;
}
}
@Override
public void run()
{
final Throwable error;
final ReadListener listener;
boolean available = false;
final boolean eof;
synchronized (lock())
{
if (!_notReady || _listener == null)
return;
error = _onError;
listener = _listener;
try
{
T item = getNextContent();
available = item != null && remaining(item) > 0;
}
catch (Exception e)
{
failed(e);
}
eof = !available && isFinished();
_notReady = !available && !eof;
}
try
{
if (error != null)
listener.onError(error);
else if (available)
listener.onDataAvailable();
else if (eof)
listener.onAllDataRead();
else
unready();
}
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
listener.onError(e);
}
}
protected static abstract class State
{
public void waitForContent(HttpInput<?> in) throws IOException
{
}
public int noContent() throws IOException
{
return -1;
}
public boolean isEOF()
{
return false;
}
}
protected static final State STREAM = new State()
{
@Override
public void waitForContent(HttpInput<?> input) throws IOException
{
input.blockForContent();
}
@Override
public String toString()
{
return "STREAM";
}
};
protected static final State ASYNC = new State()
{
@Override
public int noContent() throws IOException
{
return 0;
}
@Override
public String toString()
{
return "ASYNC";
}
};
protected static final State EARLY_EOF = new State()
{
@Override
public int noContent() throws IOException
{
throw new EofException();
}
@Override
public boolean isEOF()
{
return true;
}
@Override
public String toString()
{
return "EARLY_EOF";
}
};
protected static final State EOF = new State()
{
@Override
public boolean isEOF()
{
return true;
}
@Override
public String toString()
{
return "EOF";
}
};
}