blob: 1f8c8b4857f57d67acd97ba894420aa74f478e5e [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
* <p>
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
* maintains two states: the content state that tells whether there is content to consume and the EOF
* state that tells whether an EOF has arrived.
* Only once the content has been consumed the content state is moved to the EOF state.
*/
public class HttpInput extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final static Content EOF_CONTENT = new EofContent("EOF");
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
private final Deque<Content> _inputQ = new ArrayDeque<>();
private final HttpChannelState _channelState;
private ReadListener _listener;
private State _state = STREAM;
private long _contentConsumed;
private long _blockingTimeoutAt = -1;
public HttpInput(HttpChannelState state)
{
_channelState=state;
if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
_blockingTimeoutAt=0;
}
protected HttpChannelState getHttpChannelState()
{
return _channelState;
}
public void recycle()
{
synchronized (_inputQ)
{
Content item = _inputQ.poll();
while (item != null)
{
item.failed(null);
item = _inputQ.poll();
}
_listener = null;
_state = STREAM;
_contentConsumed = 0;
}
}
@Override
public int available()
{
int available=0;
boolean woken=false;
synchronized (_inputQ)
{
Content content = _inputQ.peek();
if (content==null)
{
try
{
produceContent();
}
catch(IOException e)
{
woken=failed(e);
}
content = _inputQ.peek();
}
if (content!=null)
available= remaining(content);
}
if (woken)
wake();
return available;
}
private void wake()
{
_channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read==0)
throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
synchronized (_inputQ)
{
if (_blockingTimeoutAt>=0 && !isAsync())
_blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
while(true)
{
Content item = nextContent();
if (item!=null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}",this,len,item);
int l = get(item, b, off, len);
consumeNonContent();
return l;
}
if (!_state.blockForContent(this))
return _state.noContent();
}
}
}
/**
* Called when derived implementations should attempt to
* produce more Content and add it via {@link #addContent(Content)}.
* For protocols that are constantly producing (eg HTTP2) this can
* be left as a noop;
* @throws IOException if unable to produce content
*/
protected void produceContent() throws IOException
{
}
/**
* Get the next content from the inputQ, calling {@link #produceContent()}
* if need be. EOF is processed and state changed.
*
* @return the content or null if none available.
* @throws IOException if retrieving the content fails
*/
protected Content nextContent() throws IOException
{
Content content = pollContent();
if (content==null && !isFinished())
{
produceContent();
content = pollContent();
}
return content;
}
/** Poll the inputQ for Content.
* Consumed buffers and {@link PoisonPillContent}s are removed and
* EOF state updated if need be.
* @return Content or null
*/
protected Content pollContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
{
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
if (content==EOF_CONTENT)
{
if (_listener==null)
_state=EOF;
else
{
_state=AEOF;
boolean woken = _channelState.onReadReady(); // force callback?
if (woken)
wake();
}
}
else if (content==EARLY_EOF_CONTENT)
_state=EARLY_EOF;
content = _inputQ.peek();
}
return content;
}
/**
*/
protected void consumeNonContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
{
// Defer EOF until read
if (content instanceof EofContent)
break;
// Consume all other empty content
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peek();
}
}
/**
* Get the next readable from the inputQ, calling {@link #produceContent()}
* if need be. EOF is NOT processed and state is not changed.
*
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
*/
protected Content nextReadable() throws IOException
{
Content content = pollReadable();
if (content==null && !isFinished())
{
produceContent();
content = pollReadable();
}
return content;
}
/** Poll the inputQ for Content or EOF.
* Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
* EOF state is not updated.
* @return Content, EOF or null
*/
protected Content pollReadable()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue except EOF
while (content != null)
{
if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
return content;
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peek();
}
return null;
}
/**
* @param item the content
* @return how many bytes remain in the given content
*/
protected int remaining(Content item)
{
return item.remaining();
}
/**
* Copies the given content into the given byte buffer.
*
* @param content the content to copy from
* @param buffer the buffer to copy into
* @param offset the buffer offset to start copying from
* @param length the space available in the buffer
* @return the number of bytes actually copied
*/
protected int get(Content content, byte[] buffer, int offset, int length)
{
int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l);
_contentConsumed+=l;
return l;
}
/**
* Consumes the given content.
* Calls the content succeeded if all content consumed.
*
* @param content the content to consume
* @param length the number of bytes to consume
*/
protected void skip(Content content, int length)
{
int l = Math.min(content.remaining(), length);
ByteBuffer buffer = content.getContent();
buffer.position(buffer.position()+l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
pollContent(); // hungry succeed
}
/**
* Blocks until some content or some end-of-file event arrives.
*
* @throws IOException if the wait is interrupted
*/
protected void blockForContent() throws IOException
{
try
{
long timeout=0;
if (_blockingTimeoutAt>=0)
{
timeout=_blockingTimeoutAt-System.currentTimeMillis();
if (timeout<=0)
throw new TimeoutException();
}
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={} ...", this,timeout);
if (timeout>0)
_inputQ.wait(timeout);
else
_inputQ.wait();
if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt)
throw new TimeoutException();
}
catch (Throwable e)
{
throw (IOException)new InterruptedIOException().initCause(e);
}
}
/**
* Adds some content to the start of this input stream.
* <p>Typically used to push back content that has
* been read, perhaps mutated. The bytes prepended are
* deducted for the contentConsumed total</p>
* @param item the content to add
* @return true if content channel woken for read
*/
public boolean prependContent(Content item)
{
boolean woken=false;
synchronized (_inputQ)
{
_inputQ.push(item);
_contentConsumed-=item.remaining();
if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}", this, item);
if (_listener==null)
_inputQ.notify();
else
woken=_channelState.onReadPossible();
}
return woken;
}
/**
* Adds some content to this input stream.
*
* @param item the content to add
* @return true if content channel woken for read
*/
public boolean addContent(Content item)
{
boolean woken=false;
synchronized (_inputQ)
{
_inputQ.offer(item);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, item);
if (_listener==null)
_inputQ.notify();
else
woken=_channelState.onReadPossible();
}
return woken;
}
public boolean hasContent()
{
synchronized (_inputQ)
{
return _inputQ.size()>0;
}
}
public void unblock()
{
synchronized (_inputQ)
{
_inputQ.notify();
}
}
public long getContentConsumed()
{
synchronized (_inputQ)
{
return _contentConsumed;
}
}
/**
* This method should be called to signal that an EOF has been
* detected before all the expected content arrived.
* <p>
* Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return.
* @return true if content channel woken for read
*/
public boolean earlyEOF()
{
return addContent(EARLY_EOF_CONTENT);
}
/**
* This method should be called to signal that all the expected
* content arrived.
* @return true if content channel woken for read
*/
public boolean eof()
{
return addContent(EOF_CONTENT);
}
public boolean consumeAll()
{
synchronized (_inputQ)
{
try
{
while (!isFinished())
{
Content item = nextContent();
if (item == null)
break; // Let's not bother blocking
skip(item, remaining(item));
}
return isFinished() && !isError();
}
catch (IOException e)
{
LOG.debug(e);
return false;
}
}
}
public boolean isError()
{
synchronized (_inputQ)
{
return _state instanceof ErrorState;
}
}
public boolean isAsync()
{
synchronized (_inputQ)
{
return _state==ASYNC;
}
}
@Override
public boolean isFinished()
{
synchronized (_inputQ)
{
return _state instanceof EOFState;
}
}
@Override
public boolean isReady()
{
try
{
synchronized (_inputQ)
{
if (_listener == null )
return true;
if (_state instanceof EOFState)
return true;
if (nextReadable()!=null)
return true;
_channelState.onReadUnready();
}
return false;
}
catch(IOException e)
{
LOG.ignore(e);
return true;
}
}
@Override
public void setReadListener(ReadListener readListener)
{
readListener = Objects.requireNonNull(readListener);
boolean woken=false;
try
{
synchronized (_inputQ)
{
if (_listener != null)
throw new IllegalStateException("ReadListener already set");
if (_state != STREAM)
throw new IllegalStateException("State "+STREAM+" != " + _state);
_state = ASYNC;
_listener = readListener;
boolean content=nextContent()!=null;
if (content)
woken = _channelState.onReadReady();
else
_channelState.onReadUnready();
}
}
catch(IOException e)
{
throw new RuntimeIOException(e);
}
if (woken)
wake();
}
public boolean failed(Throwable x)
{
boolean woken=false;
synchronized (_inputQ)
{
if (_state instanceof ErrorState)
LOG.warn(x);
else
_state = new ErrorState(x);
if (_listener==null)
_inputQ.notify();
else
woken=_channelState.onReadPossible();
}
return woken;
}
/* ------------------------------------------------------------ */
/*
* <p>
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
* runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
* to setup classloaders etc.
* </p>
*/
@Override
public void run()
{
final Throwable error;
final ReadListener listener;
boolean aeof=false;
synchronized (_inputQ)
{
if (_state==EOF)
return;
if (_state==AEOF)
{
_state=EOF;
aeof=true;
}
listener = _listener;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
}
try
{
if (error!=null)
{
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
listener.onError(error);
}
else if (aeof)
{
listener.onAllDataRead();
}
else
{
listener.onDataAvailable();
}
}
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
try
{
if (aeof || error==null)
{
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
listener.onError(e);
}
}
catch (Throwable e2)
{
LOG.warn(e2.toString());
LOG.debug(e2);
throw new RuntimeIOException(e2);
}
}
}
@Override
public String toString()
{
return String.format("%s@%x[c=%d,s=%s]",
getClass().getSimpleName(),
hashCode(),
_contentConsumed,
_state);
}
public static class PoisonPillContent extends Content
{
private final String _name;
public PoisonPillContent(String name)
{
super(BufferUtil.EMPTY_BUFFER);
_name=name;
}
@Override
public String toString()
{
return _name;
}
}
public static class EofContent extends PoisonPillContent
{
EofContent(String name)
{
super(name);
}
}
public static class Content implements Callback
{
private final ByteBuffer _content;
public Content(ByteBuffer content)
{
_content=content;
}
@Override
public boolean isNonBlocking()
{
return true;
}
public ByteBuffer getContent()
{
return _content;
}
public boolean hasContent()
{
return _content.hasRemaining();
}
public int remaining()
{
return _content.remaining();
}
@Override
public String toString()
{
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
}
}
protected static abstract class State
{
public boolean blockForContent(HttpInput in) throws IOException
{
return false;
}
public int noContent() throws IOException
{
return -1;
}
}
protected static class EOFState extends State
{
}
protected class ErrorState extends EOFState
{
final Throwable _error;
ErrorState(Throwable error)
{
_error=error;
}
public Throwable getError()
{
return _error;
}
@Override
public int noContent() throws IOException
{
if (_error instanceof IOException)
throw (IOException)_error;
throw new IOException(_error);
}
@Override
public String toString()
{
return "ERROR:"+_error;
}
}
protected static final State STREAM = new State()
{
@Override
public boolean blockForContent(HttpInput input) throws IOException
{
input.blockForContent();
return true;
}
@Override
public String toString()
{
return "STREAM";
}
};
protected static final State ASYNC = new State()
{
@Override
public int noContent() throws IOException
{
return 0;
}
@Override
public String toString()
{
return "ASYNC";
}
};
protected static final State EARLY_EOF = new EOFState()
{
@Override
public int noContent() throws IOException
{
throw new EofException("Early EOF");
}
@Override
public String toString()
{
return "EARLY_EOF";
}
};
protected static final State EOF = new EOFState()
{
@Override
public String toString()
{
return "EOF";
}
};
protected static final State AEOF = new EOFState()
{
@Override
public String toString()
{
return "AEOF";
}
};
}