| // |
| // ======================================================================== |
| // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.server; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayDeque; |
| import java.util.Deque; |
| import java.util.Objects; |
| import java.util.Queue; |
| import java.util.concurrent.TimeoutException; |
| |
| import javax.servlet.ReadListener; |
| import javax.servlet.ServletInputStream; |
| |
| import org.eclipse.jetty.io.EofException; |
| import org.eclipse.jetty.io.RuntimeIOException; |
| import org.eclipse.jetty.util.BufferUtil; |
| import org.eclipse.jetty.util.Callback; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| |
| /** |
| * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}. |
| * <p> |
| * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class |
| * maintains two states: the content state that tells whether there is content to consume and the EOF |
| * state that tells whether an EOF has arrived. |
| * Only once the content has been consumed the content state is moved to the EOF state. |
| */ |
| public class HttpInput extends ServletInputStream implements Runnable |
| { |
| private final static Logger LOG = Log.getLogger(HttpInput.class); |
| private final static Content EOF_CONTENT = new EofContent("EOF"); |
| private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF"); |
| |
| private final byte[] _oneByteBuffer = new byte[1]; |
| private final Deque<Content> _inputQ = new ArrayDeque<>(); |
| private final HttpChannelState _channelState; |
| private ReadListener _listener; |
| private State _state = STREAM; |
| private long _contentConsumed; |
| private long _blockingTimeoutAt = -1; |
| |
| public HttpInput(HttpChannelState state) |
| { |
| _channelState=state; |
| if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0) |
| _blockingTimeoutAt=0; |
| } |
| |
| protected HttpChannelState getHttpChannelState() |
| { |
| return _channelState; |
| } |
| |
| public void recycle() |
| { |
| synchronized (_inputQ) |
| { |
| Content item = _inputQ.poll(); |
| while (item != null) |
| { |
| item.failed(null); |
| item = _inputQ.poll(); |
| } |
| _listener = null; |
| _state = STREAM; |
| _contentConsumed = 0; |
| } |
| } |
| |
| @Override |
| public int available() |
| { |
| int available=0; |
| boolean woken=false; |
| synchronized (_inputQ) |
| { |
| Content content = _inputQ.peek(); |
| if (content==null) |
| { |
| try |
| { |
| produceContent(); |
| } |
| catch(IOException e) |
| { |
| woken=failed(e); |
| } |
| content = _inputQ.peek(); |
| } |
| |
| if (content!=null) |
| available= remaining(content); |
| } |
| |
| if (woken) |
| wake(); |
| return available; |
| } |
| |
| private void wake() |
| { |
| _channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel()); |
| } |
| |
| |
| @Override |
| public int read() throws IOException |
| { |
| int read = read(_oneByteBuffer, 0, 1); |
| if (read==0) |
| throw new IllegalStateException("unready read=0"); |
| return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; |
| } |
| |
| @Override |
| public int read(byte[] b, int off, int len) throws IOException |
| { |
| synchronized (_inputQ) |
| { |
| if (_blockingTimeoutAt>=0 && !isAsync()) |
| _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); |
| |
| while(true) |
| { |
| Content item = nextContent(); |
| if (item!=null) |
| { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} read {} from {}",this,len,item); |
| int l = get(item, b, off, len); |
| |
| consumeNonContent(); |
| |
| return l; |
| } |
| |
| if (!_state.blockForContent(this)) |
| return _state.noContent(); |
| } |
| } |
| } |
| |
| /** |
| * Called when derived implementations should attempt to |
| * produce more Content and add it via {@link #addContent(Content)}. |
| * For protocols that are constantly producing (eg HTTP2) this can |
| * be left as a noop; |
| * @throws IOException if unable to produce content |
| */ |
| protected void produceContent() throws IOException |
| { |
| } |
| |
| /** |
| * Get the next content from the inputQ, calling {@link #produceContent()} |
| * if need be. EOF is processed and state changed. |
| * |
| * @return the content or null if none available. |
| * @throws IOException if retrieving the content fails |
| */ |
| protected Content nextContent() throws IOException |
| { |
| Content content = pollContent(); |
| if (content==null && !isFinished()) |
| { |
| produceContent(); |
| content = pollContent(); |
| } |
| return content; |
| } |
| |
| /** Poll the inputQ for Content. |
| * Consumed buffers and {@link PoisonPillContent}s are removed and |
| * EOF state updated if need be. |
| * @return Content or null |
| */ |
| protected Content pollContent() |
| { |
| // Items are removed only when they are fully consumed. |
| Content content = _inputQ.peek(); |
| // Skip consumed items at the head of the queue. |
| while (content != null && remaining(content) == 0) |
| { |
| _inputQ.poll(); |
| content.succeeded(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} consumed {}", this, content); |
| |
| if (content==EOF_CONTENT) |
| { |
| if (_listener==null) |
| _state=EOF; |
| else |
| { |
| _state=AEOF; |
| boolean woken = _channelState.onReadReady(); // force callback? |
| if (woken) |
| wake(); |
| } |
| } |
| else if (content==EARLY_EOF_CONTENT) |
| _state=EARLY_EOF; |
| |
| content = _inputQ.peek(); |
| } |
| |
| return content; |
| } |
| |
| /** |
| */ |
| protected void consumeNonContent() |
| { |
| // Items are removed only when they are fully consumed. |
| Content content = _inputQ.peek(); |
| // Skip consumed items at the head of the queue. |
| while (content != null && remaining(content) == 0) |
| { |
| // Defer EOF until read |
| if (content instanceof EofContent) |
| break; |
| |
| // Consume all other empty content |
| _inputQ.poll(); |
| content.succeeded(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} consumed {}", this, content); |
| content = _inputQ.peek(); |
| } |
| } |
| |
| /** |
| * Get the next readable from the inputQ, calling {@link #produceContent()} |
| * if need be. EOF is NOT processed and state is not changed. |
| * |
| * @return the content or EOF or null if none available. |
| * @throws IOException if retrieving the content fails |
| */ |
| protected Content nextReadable() throws IOException |
| { |
| Content content = pollReadable(); |
| if (content==null && !isFinished()) |
| { |
| produceContent(); |
| content = pollReadable(); |
| } |
| return content; |
| } |
| |
| /** Poll the inputQ for Content or EOF. |
| * Consumed buffers and non EOF {@link PoisonPillContent}s are removed. |
| * EOF state is not updated. |
| * @return Content, EOF or null |
| */ |
| protected Content pollReadable() |
| { |
| // Items are removed only when they are fully consumed. |
| Content content = _inputQ.peek(); |
| |
| // Skip consumed items at the head of the queue except EOF |
| while (content != null) |
| { |
| if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0) |
| return content; |
| |
| _inputQ.poll(); |
| content.succeeded(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} consumed {}", this, content); |
| content = _inputQ.peek(); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param item the content |
| * @return how many bytes remain in the given content |
| */ |
| protected int remaining(Content item) |
| { |
| return item.remaining(); |
| } |
| |
| /** |
| * Copies the given content into the given byte buffer. |
| * |
| * @param content the content to copy from |
| * @param buffer the buffer to copy into |
| * @param offset the buffer offset to start copying from |
| * @param length the space available in the buffer |
| * @return the number of bytes actually copied |
| */ |
| protected int get(Content content, byte[] buffer, int offset, int length) |
| { |
| int l = Math.min(content.remaining(), length); |
| content.getContent().get(buffer, offset, l); |
| _contentConsumed+=l; |
| return l; |
| } |
| |
| /** |
| * Consumes the given content. |
| * Calls the content succeeded if all content consumed. |
| * |
| * @param content the content to consume |
| * @param length the number of bytes to consume |
| */ |
| protected void skip(Content content, int length) |
| { |
| int l = Math.min(content.remaining(), length); |
| ByteBuffer buffer = content.getContent(); |
| buffer.position(buffer.position()+l); |
| _contentConsumed+=l; |
| if (l>0 && !content.hasContent()) |
| pollContent(); // hungry succeed |
| |
| } |
| |
| /** |
| * Blocks until some content or some end-of-file event arrives. |
| * |
| * @throws IOException if the wait is interrupted |
| */ |
| protected void blockForContent() throws IOException |
| { |
| try |
| { |
| long timeout=0; |
| if (_blockingTimeoutAt>=0) |
| { |
| timeout=_blockingTimeoutAt-System.currentTimeMillis(); |
| if (timeout<=0) |
| throw new TimeoutException(); |
| } |
| |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} blocking for content timeout={} ...", this,timeout); |
| if (timeout>0) |
| _inputQ.wait(timeout); |
| else |
| _inputQ.wait(); |
| |
| if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt) |
| throw new TimeoutException(); |
| } |
| catch (Throwable e) |
| { |
| throw (IOException)new InterruptedIOException().initCause(e); |
| } |
| } |
| |
| /** |
| * Adds some content to the start of this input stream. |
| * <p>Typically used to push back content that has |
| * been read, perhaps mutated. The bytes prepended are |
| * deducted for the contentConsumed total</p> |
| * @param item the content to add |
| * @return true if content channel woken for read |
| */ |
| public boolean prependContent(Content item) |
| { |
| boolean woken=false; |
| synchronized (_inputQ) |
| { |
| _inputQ.push(item); |
| _contentConsumed-=item.remaining(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} prependContent {}", this, item); |
| |
| if (_listener==null) |
| _inputQ.notify(); |
| else |
| woken=_channelState.onReadPossible(); |
| } |
| |
| return woken; |
| } |
| |
| /** |
| * Adds some content to this input stream. |
| * |
| * @param item the content to add |
| * @return true if content channel woken for read |
| */ |
| public boolean addContent(Content item) |
| { |
| boolean woken=false; |
| synchronized (_inputQ) |
| { |
| _inputQ.offer(item); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("{} addContent {}", this, item); |
| |
| if (_listener==null) |
| _inputQ.notify(); |
| else |
| woken=_channelState.onReadPossible(); |
| } |
| |
| return woken; |
| } |
| |
| public boolean hasContent() |
| { |
| synchronized (_inputQ) |
| { |
| return _inputQ.size()>0; |
| } |
| } |
| |
| public void unblock() |
| { |
| synchronized (_inputQ) |
| { |
| _inputQ.notify(); |
| } |
| } |
| |
| public long getContentConsumed() |
| { |
| synchronized (_inputQ) |
| { |
| return _contentConsumed; |
| } |
| } |
| |
| /** |
| * This method should be called to signal that an EOF has been |
| * detected before all the expected content arrived. |
| * <p> |
| * Typically this will result in an EOFException being thrown |
| * from a subsequent read rather than a -1 return. |
| * @return true if content channel woken for read |
| */ |
| public boolean earlyEOF() |
| { |
| return addContent(EARLY_EOF_CONTENT); |
| } |
| |
| /** |
| * This method should be called to signal that all the expected |
| * content arrived. |
| * @return true if content channel woken for read |
| */ |
| public boolean eof() |
| { |
| return addContent(EOF_CONTENT); |
| } |
| |
| public boolean consumeAll() |
| { |
| synchronized (_inputQ) |
| { |
| try |
| { |
| while (!isFinished()) |
| { |
| Content item = nextContent(); |
| if (item == null) |
| break; // Let's not bother blocking |
| |
| skip(item, remaining(item)); |
| } |
| return isFinished() && !isError(); |
| } |
| catch (IOException e) |
| { |
| LOG.debug(e); |
| return false; |
| } |
| } |
| } |
| |
| public boolean isError() |
| { |
| synchronized (_inputQ) |
| { |
| return _state instanceof ErrorState; |
| } |
| } |
| |
| public boolean isAsync() |
| { |
| synchronized (_inputQ) |
| { |
| return _state==ASYNC; |
| } |
| } |
| |
| @Override |
| public boolean isFinished() |
| { |
| synchronized (_inputQ) |
| { |
| return _state instanceof EOFState; |
| } |
| } |
| |
| |
| @Override |
| public boolean isReady() |
| { |
| try |
| { |
| synchronized (_inputQ) |
| { |
| if (_listener == null ) |
| return true; |
| if (_state instanceof EOFState) |
| return true; |
| if (nextReadable()!=null) |
| return true; |
| |
| _channelState.onReadUnready(); |
| } |
| return false; |
| } |
| catch(IOException e) |
| { |
| LOG.ignore(e); |
| return true; |
| } |
| } |
| |
| @Override |
| public void setReadListener(ReadListener readListener) |
| { |
| readListener = Objects.requireNonNull(readListener); |
| boolean woken=false; |
| try |
| { |
| synchronized (_inputQ) |
| { |
| if (_listener != null) |
| throw new IllegalStateException("ReadListener already set"); |
| if (_state != STREAM) |
| throw new IllegalStateException("State "+STREAM+" != " + _state); |
| |
| _state = ASYNC; |
| _listener = readListener; |
| boolean content=nextContent()!=null; |
| |
| if (content) |
| woken = _channelState.onReadReady(); |
| else |
| _channelState.onReadUnready(); |
| } |
| } |
| catch(IOException e) |
| { |
| throw new RuntimeIOException(e); |
| } |
| |
| if (woken) |
| wake(); |
| } |
| |
| public boolean failed(Throwable x) |
| { |
| boolean woken=false; |
| synchronized (_inputQ) |
| { |
| if (_state instanceof ErrorState) |
| LOG.warn(x); |
| else |
| _state = new ErrorState(x); |
| |
| if (_listener==null) |
| _inputQ.notify(); |
| else |
| woken=_channelState.onReadPossible(); |
| } |
| |
| return woken; |
| } |
| |
| /* ------------------------------------------------------------ */ |
| /* |
| * <p> |
| * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a |
| * runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)} |
| * to setup classloaders etc. |
| * </p> |
| */ |
| @Override |
| public void run() |
| { |
| final Throwable error; |
| final ReadListener listener; |
| boolean aeof=false; |
| |
| synchronized (_inputQ) |
| { |
| if (_state==EOF) |
| return; |
| |
| if (_state==AEOF) |
| { |
| _state=EOF; |
| aeof=true; |
| } |
| |
| listener = _listener; |
| error = _state instanceof ErrorState?((ErrorState)_state).getError():null; |
| } |
| |
| try |
| { |
| if (error!=null) |
| { |
| _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); |
| listener.onError(error); |
| } |
| else if (aeof) |
| { |
| listener.onAllDataRead(); |
| } |
| else |
| { |
| listener.onDataAvailable(); |
| } |
| } |
| catch (Throwable e) |
| { |
| LOG.warn(e.toString()); |
| LOG.debug(e); |
| try |
| { |
| if (aeof || error==null) |
| { |
| _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); |
| listener.onError(e); |
| } |
| } |
| catch (Throwable e2) |
| { |
| LOG.warn(e2.toString()); |
| LOG.debug(e2); |
| throw new RuntimeIOException(e2); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("%s@%x[c=%d,s=%s]", |
| getClass().getSimpleName(), |
| hashCode(), |
| _contentConsumed, |
| _state); |
| } |
| |
| public static class PoisonPillContent extends Content |
| { |
| private final String _name; |
| public PoisonPillContent(String name) |
| { |
| super(BufferUtil.EMPTY_BUFFER); |
| _name=name; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return _name; |
| } |
| } |
| |
| public static class EofContent extends PoisonPillContent |
| { |
| EofContent(String name) |
| { |
| super(name); |
| } |
| } |
| |
| public static class Content implements Callback |
| { |
| private final ByteBuffer _content; |
| |
| public Content(ByteBuffer content) |
| { |
| _content=content; |
| } |
| |
| @Override |
| public boolean isNonBlocking() |
| { |
| return true; |
| } |
| |
| |
| public ByteBuffer getContent() |
| { |
| return _content; |
| } |
| |
| public boolean hasContent() |
| { |
| return _content.hasRemaining(); |
| } |
| |
| public int remaining() |
| { |
| return _content.remaining(); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content)); |
| } |
| } |
| |
| |
| protected static abstract class State |
| { |
| public boolean blockForContent(HttpInput in) throws IOException |
| { |
| return false; |
| } |
| |
| public int noContent() throws IOException |
| { |
| return -1; |
| } |
| } |
| |
| protected static class EOFState extends State |
| { |
| } |
| |
| protected class ErrorState extends EOFState |
| { |
| final Throwable _error; |
| ErrorState(Throwable error) |
| { |
| _error=error; |
| } |
| |
| public Throwable getError() |
| { |
| return _error; |
| } |
| |
| @Override |
| public int noContent() throws IOException |
| { |
| if (_error instanceof IOException) |
| throw (IOException)_error; |
| throw new IOException(_error); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "ERROR:"+_error; |
| } |
| } |
| |
| protected static final State STREAM = new State() |
| { |
| @Override |
| public boolean blockForContent(HttpInput input) throws IOException |
| { |
| input.blockForContent(); |
| return true; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "STREAM"; |
| } |
| }; |
| |
| protected static final State ASYNC = new State() |
| { |
| @Override |
| public int noContent() throws IOException |
| { |
| return 0; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "ASYNC"; |
| } |
| }; |
| |
| protected static final State EARLY_EOF = new EOFState() |
| { |
| @Override |
| public int noContent() throws IOException |
| { |
| throw new EofException("Early EOF"); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "EARLY_EOF"; |
| } |
| }; |
| |
| protected static final State EOF = new EOFState() |
| { |
| @Override |
| public String toString() |
| { |
| return "EOF"; |
| } |
| }; |
| |
| protected static final State AEOF = new EOFState() |
| { |
| @Override |
| public String toString() |
| { |
| return "AEOF"; |
| } |
| }; |
| |
| } |