blob: b835c6c61ae8ad2caf0bf238493ae1fe0affcc29 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
private static final boolean REQUEST_BUFFER_DIRECT=false;
private static final boolean HEADER_BUFFER_DIRECT=false;
private static final boolean CHUNK_BUFFER_DIRECT=false;
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
private final HttpConfiguration _config;
private final Connector _connector;
private final ByteBufferPool _bufferPool;
private final HttpInput _input;
private final HttpGenerator _generator;
private final HttpChannelOverHttp _channel;
private final HttpParser _parser;
private final AtomicInteger _contentBufferReferences=new AtomicInteger();
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
/**
* Get the current connection that this thread is dispatched to.
* Note that a thread may be processing a request asynchronously and
* thus not be dispatched to the connection.
* @return the current HttpConnection or null
* @see Request#getAttribute(String) for a more general way to access the HttpConnection
*/
public static HttpConnection getCurrentConnection()
{
return __currentConnection.get();
}
protected static HttpConnection setCurrentConnection(HttpConnection connection)
{
HttpConnection last=__currentConnection.get();
__currentConnection.set(connection);
return last;
}
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
{
super(endPoint, connector.getExecutor());
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_generator = newHttpGenerator();
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();
_parser = newHttpParser();
if (LOG.isDebugEnabled())
LOG.debug("New HTTP Connection {}", this);
}
public HttpConfiguration getHttpConfiguration()
{
return _config;
}
protected HttpGenerator newHttpGenerator()
{
return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
}
protected HttpChannelOverHttp newHttpChannel()
{
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
}
protected HttpParser newHttpParser()
{
return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
}
protected HttpParser.RequestHandler newRequestHandler()
{
return _channel;
}
public Server getServer()
{
return _connector.getServer();
}
public Connector getConnector()
{
return _connector;
}
public HttpChannel getHttpChannel()
{
return _channel;
}
public HttpParser getParser()
{
return _parser;
}
public HttpGenerator getGenerator()
{
return _generator;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return getEndPoint().isOptimizedForDirectBuffers();
}
@Override
public int getMessagesIn()
{
return getHttpChannel().getRequests();
}
@Override
public int getMessagesOut()
{
return getHttpChannel().getRequests();
}
@Override
public ByteBuffer onUpgradeFrom()
{
if (BufferUtil.hasContent(_requestBuffer))
{
ByteBuffer buffer = _requestBuffer;
_requestBuffer=null;
return buffer;
}
return null;
}
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}",this);
ByteBuffer buffer=_requestBuffer;
_requestBuffer=null;
_bufferPool.release(buffer);
}
}
public ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
return _requestBuffer;
}
public boolean isRequestBufferEmpty()
{
return BufferUtil.isEmpty(_requestBuffer);
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable enter {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
HttpConnection last=setCurrentConnection(this);
try
{
while (true)
{
// Fill the request buffer (if needed)
int filled = fillRequestBuffer();
// Parse the request buffer
boolean handle = parseRequestBuffer();
// If there was a connection upgrade, the other
// connection took over, nothing more to do here.
if (getEndPoint().getConnection()!=this)
break;
// Handle close parser
if (_parser.isClose() || _parser.isClosed())
{
close();
break;
}
// Handle channel event
if (handle)
{
boolean suspended = !_channel.handle();
// We should break iteration if we have suspended or changed connection or this is not the handling thread.
if (suspended || getEndPoint().getConnection() != this)
break;
}
// Continue or break?
else if (filled<=0)
{
if (filled==0)
fillInterested();
break;
}
}
}
finally
{
setCurrentConnection(last);
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable exit {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
}
}
/* ------------------------------------------------------------ */
/** Fill and parse data looking for content
* @return true if an {@link RequestHandler} method was called and it returned true;
*/
protected boolean fillAndParseForContent()
{
boolean handled=false;
while (_parser.inContentState())
{
int filled = fillRequestBuffer();
boolean handle = parseRequestBuffer();
handled|=handle;
if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
break;
}
return handled;
}
/* ------------------------------------------------------------ */
private int fillRequestBuffer()
{
if (_contentBufferReferences.get()>0)
{
LOG.warn("{} fill with unconsumed content!",this);
return 0;
}
if (BufferUtil.isEmpty(_requestBuffer))
{
// Can we fill?
if(getEndPoint().isInputShutdown())
{
// No pretend we read -1
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled -1 {}",this,BufferUtil.toDetailString(_requestBuffer));
return -1;
}
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
_requestBuffer = getRequestBuffer();
// fill
try
{
int filled = getEndPoint().fill(_requestBuffer);
if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
// tell parser
if (filled < 0)
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}",this,filled,BufferUtil.toDetailString(_requestBuffer));
return filled;
}
catch (IOException e)
{
LOG.debug(e);
return -1;
}
}
return 0;
}
/* ------------------------------------------------------------ */
private boolean parseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}",this,handle,_parser);
// recycle buffer ?
if (_contentBufferReferences.get()==0)
releaseRequestBuffer();
return handle;
}
/* ------------------------------------------------------------ */
@Override
public void onCompleted()
{
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection);
_channel.getState().upgrade();
getEndPoint().upgrade(connection);
_channel.recycle();
_parser.reset();
_generator.reset();
if (_contentBufferReferences.get()==0)
releaseRequestBuffer();
else
{
LOG.warn("{} lingering content references?!?!",this);
_requestBuffer=null; // Not returned to pool!
_contentBufferReferences.set(0);
}
return;
}
}
// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
{
// close to seek EOF
_parser.close();
}
else if (_parser.inContentState() && _generator.isPersistent())
{
// If we are async, then we have problems to complete neatly
if (_channel.getRequest().getHttpInput().isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed async input {}", this);
_channel.abort(new IOException("unconsumed input"));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {}", this);
// Complete reading the request
if (!_channel.getRequest().getHttpInput().consumeAll())
_channel.abort(new IOException("unconsumed input"));
}
}
// Reset the channel, parsers and generator
_channel.recycle();
if (_generator.isPersistent() && !_parser.isClosed())
_parser.reset();
else
_parser.close();
// Not in a race here with onFillable, because it has given up control before calling handle.
// in a slight race with #completed, but not sure what to do with that anyway.
if (_chunk!=null)
_bufferPool.release(_chunk);
_chunk=null;
_generator.reset();
// if we are not called from the onfillable thread, schedule completion
if (getCurrentConnection()!=this)
{
// If we are looking for the next request
if (_parser.isStart())
{
// if the buffer is empty
if (BufferUtil.isEmpty(_requestBuffer))
{
// look for more data
fillInterested();
}
// else if we are still running
else if (getConnector().isRunning())
{
// Dispatched to handle a pipelined request
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isRunning())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
{
getEndPoint().close();
}
}
// else the parser must be closed, so seek the EOF if we are still open
else if (getEndPoint().isOpen())
fillInterested();
}
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
_parser.close();
super.onFillInterestedFailed(cause);
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onClose()
{
_sendCallback.close();
super.onClose();
}
@Override
public void run()
{
onFillable();
}
@Override
public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
{
if (info == null)
{
if (!lastContent && BufferUtil.isEmpty(content))
{
callback.succeeded();
return;
}
}
else
{
// If we are still expecting a 100 continues when we commit
if (_channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
}
if(_sendCallback.reset(info,head,content,lastContent,callback))
_sendCallback.iterate();
}
HttpInput.Content newContent(ByteBuffer c)
{
return new Content(c);
}
@Override
public void abort(Throwable failure)
{
// Do a direct close of the output, as this may indicate to a client that the
// response is bad either with RST or by abnormal completion of chunked response.
getEndPoint().close();
}
@Override
public boolean isPushSupported()
{
return false;
}
@Override
public void push(org.eclipse.jetty.http.MetaData.Request request)
{
LOG.debug("ignore push in {}",this);
}
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()
{
getEndPoint().fillInterested(_blockingReadCallback);
}
public void blockingReadException(Throwable e)
{
_blockingReadCallback.failed(e);
}
@Override
public String toString()
{
return String.format("%s[p=%s,g=%s,c=%s]",
super.toString(),
_parser,
_generator,
_channel);
}
private class Content extends HttpInput.Content
{
public Content(ByteBuffer content)
{
super(content);
_contentBufferReferences.incrementAndGet();
}
@Override
public void succeeded()
{
if (_contentBufferReferences.decrementAndGet()==0)
releaseRequestBuffer();
}
@Override
public void failed(Throwable x)
{
succeeded();
}
}
private class BlockingReadCallback implements Callback
{
@Override
public void succeeded()
{
_input.unblock();
}
@Override
public void failed(Throwable x)
{
_input.failed(x);
}
@Override
public boolean isNonBlocking()
{
// This callback does not block, rather it wakes up the
// thread that is blocked waiting on the read.
return true;
}
}
private class AsyncReadCallback implements Callback
{
@Override
public void succeeded()
{
if (fillAndParseForContent())
_channel.handle();
else if (!_input.isFinished())
asyncReadFillInterested();
}
@Override
public void failed(Throwable x)
{
if (_input.failed(x))
_channel.handle();
}
}
private class SendCallback extends IteratingCallback
{
private MetaData.Response _info;
private boolean _head;
private ByteBuffer _content;
private boolean _lastContent;
private Callback _callback;
private ByteBuffer _header;
private boolean _shutdownOut;
private SendCallback()
{
super(true);
}
@Override
public boolean isNonBlocking()
{
return _callback.isNonBlocking();
}
private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
{
if (reset())
{
_info = info;
_head = head;
_content = content;
_lastContent = last;
_callback = callback;
_header = null;
_shutdownOut = false;
return true;
}
if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}
@Override
public Action process() throws Exception
{
if (_callback==null)
throw new IllegalStateException();
ByteBuffer chunk = _chunk;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_header),
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
{
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
{
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
}
case FLUSH:
{
// Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
if (_head || _generator.isNoContent())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
}
// If we have a header
if (BufferUtil.hasContent(_header))
{
if (BufferUtil.hasContent(_content))
{
if (BufferUtil.hasContent(chunk))
getEndPoint().write(this, _header, chunk, _content);
else
getEndPoint().write(this, _header, _content);
}
else
getEndPoint().write(this, _header);
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, chunk, _content);
else
getEndPoint().write(this, chunk);
}
else if (BufferUtil.hasContent(_content))
{
getEndPoint().write(this, _content);
}
else
{
succeeded(); // nothing to write
}
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
_shutdownOut=true;
continue;
}
case DONE:
{
return Action.SUCCEEDED;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse="+result);
}
}
}
}
private void releaseHeader()
{
ByteBuffer h=_header;
_header=null;
if (h!=null)
_bufferPool.release(h);
}
@Override
protected void onCompleteSuccess()
{
releaseHeader();
_callback.succeeded();
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public void onCompleteFailure(final Throwable x)
{
releaseHeader();
failedCallback(_callback,x);
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public String toString()
{
return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
}
}
}