Merged branch 'jetty-9.3.x' into 'master'.
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
index 0444f78..5b49c8e 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
@@ -182,7 +182,7 @@
         Connection old_connection = getConnection();
 
         if (LOG.isDebugEnabled())
-            LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection);
+            LOG.debug("{} upgrading from {} to {}", this, old_connection, newConnection);
 
         ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom)
                 ?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null;
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java
index 71a75bc..7fd0b77 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java
@@ -18,6 +18,7 @@
 
 package org.eclipse.jetty.proxy;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -51,6 +52,7 @@
 import org.eclipse.jetty.server.handler.HandlerWrapper;
 import org.eclipse.jetty.util.BufferUtil;
 import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
 import org.eclipse.jetty.util.TypeUtil;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
@@ -160,21 +162,17 @@
     protected void doStart() throws Exception
     {
         if (executor == null)
-        {
-            setExecutor(getServer().getThreadPool());
-        }
+            executor = getServer().getThreadPool();
+
         if (scheduler == null)
-        {
-            setScheduler(new ScheduledExecutorScheduler());
-            addBean(getScheduler());
-        }
+            addBean(scheduler = new ScheduledExecutorScheduler());
+
         if (bufferPool == null)
-        {
-            setByteBufferPool(new MappedByteBufferPool());
-            addBean(getByteBufferPool());
-        }
+            addBean(bufferPool = new MappedByteBufferPool());
+
         addBean(selector = newSelectorManager());
         selector.setConnectTimeout(getConnectTimeout());
+
         super.doStart();
     }
 
@@ -191,16 +189,8 @@
             String serverAddress = request.getRequestURI();
             if (LOG.isDebugEnabled())
                 LOG.debug("CONNECT request for {}", serverAddress);
-            try
-            {
-                handleConnect(baseRequest, request, response, serverAddress);
-            }
-            catch (Exception x)
-            {
-                // TODO
-                LOG.warn("ConnectHandler " + baseRequest.getHttpURI() + " " + x);
-                LOG.debug(x);
-            }
+
+            handleConnect(baseRequest, request, response, serverAddress);
         }
         else
         {
@@ -249,32 +239,40 @@
                 return;
             }
 
-            SocketChannel channel = SocketChannel.open();
-            channel.socket().setTcpNoDelay(true);
-            channel.configureBlocking(false);
-
-            AsyncContext asyncContext = request.startAsync();
-            asyncContext.setTimeout(0);
-
             HttpTransport transport = baseRequest.getHttpChannel().getHttpTransport();
-            
             // TODO Handle CONNECT over HTTP2!
             if (!(transport instanceof HttpConnection))
             {
                 if (LOG.isDebugEnabled())
-                    LOG.debug("CONNECT forbidden for {}", transport);
+                    LOG.debug("CONNECT not supported for {}", transport);
                 sendConnectResponse(request, response, HttpServletResponse.SC_FORBIDDEN);
                 return;
             }
 
-            InetSocketAddress address = newConnectAddress(host, port);
+            AsyncContext asyncContext = request.startAsync();
+            asyncContext.setTimeout(0);
+
             if (LOG.isDebugEnabled())
-                LOG.debug("Connecting to {}", address);
-            ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
-            if (channel.connect(address))
-                selector.accept(channel, connectContext);
-            else
-                selector.connect(channel, connectContext);
+                LOG.debug("Connecting to {}:{}", host, port);
+
+            connectToServer(request, host, port, new Promise<SocketChannel>()
+            {
+                @Override
+                public void succeeded(SocketChannel channel)
+                {
+                    ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
+                    if (channel.isConnected())
+                        selector.accept(channel, connectContext);
+                    else
+                        selector.connect(channel, connectContext);
+                }
+
+                @Override
+                public void failed(Throwable x)
+                {
+                    onConnectFailure(request, response, asyncContext, x);
+                }
+            });
         }
         catch (Exception x)
         {
@@ -282,37 +280,59 @@
         }
     }
 
-    /* ------------------------------------------------------------ */
-    /** Create the address the connect channel will connect to.
-     * @param host The host from the connect request
-     * @param port The port from the connect request
+    protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise)
+    {
+        SocketChannel channel = null;
+        try
+        {
+            channel = SocketChannel.open();
+            channel.socket().setTcpNoDelay(true);
+            channel.configureBlocking(false);
+            InetSocketAddress address = newConnectAddress(host, port);
+            channel.connect(address);
+            promise.succeeded(channel);
+        }
+        catch (Throwable x)
+        {
+            close(channel);
+            promise.failed(x);
+        }
+    }
+
+    private void close(Closeable closeable)
+    {
+        try
+        {
+            if (closeable != null)
+                closeable.close();
+        }
+        catch (Throwable x)
+        {
+            LOG.ignore(x);
+        }
+    }
+
+    /**
+     * Creates the server address to connect to.
+     *
+     * @param host The host from the CONNECT request
+     * @param port The port from the CONNECT request
      * @return The InetSocketAddress to connect to.
      */
     protected InetSocketAddress newConnectAddress(String host, int port)
     {
         return new InetSocketAddress(host, port);
     }
-    
+
     protected void onConnectSuccess(ConnectContext connectContext, UpstreamConnection upstreamConnection)
     {
-        HttpConnection httpConnection = connectContext.getHttpConnection();
-        ByteBuffer requestBuffer = httpConnection.getRequestBuffer();
-        ByteBuffer buffer = BufferUtil.EMPTY_BUFFER;
-        int remaining = requestBuffer.remaining();
-        if (remaining > 0)
-        {
-            buffer = bufferPool.acquire(remaining, requestBuffer.isDirect());
-            BufferUtil.flipToFill(buffer);
-            buffer.put(requestBuffer);
-            buffer.flip();
-        }
-
         ConcurrentMap<String, Object> context = connectContext.getContext();
         HttpServletRequest request = connectContext.getRequest();
         prepareContext(request, context);
 
+        HttpConnection httpConnection = connectContext.getHttpConnection();
         EndPoint downstreamEndPoint = httpConnection.getEndPoint();
-        DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context, buffer);
+        DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context);
         downstreamConnection.setInputBufferSize(getBufferSize());
 
         upstreamConnection.setConnection(downstreamConnection);
@@ -324,6 +344,7 @@
         sendConnectResponse(request, response, HttpServletResponse.SC_OK);
 
         upgradeConnection(request, response, downstreamConnection);
+
         connectContext.getAsyncContext().complete();
     }
 
@@ -349,7 +370,8 @@
         }
         catch (IOException x)
         {
-            // TODO: nothing we can do, close the connection
+            if (LOG.isDebugEnabled())
+                LOG.debug("Could not send CONNECT response", x);
         }
     }
 
@@ -367,9 +389,9 @@
         return true;
     }
 
-    protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context, ByteBuffer buffer)
+    protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context)
     {
-        return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context, buffer);
+        return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context);
     }
 
     protected UpstreamConnection newUpstreamConnection(EndPoint endPoint, ConnectContext connectContext)
@@ -396,13 +418,17 @@
      *
      * @param endPoint the endPoint to read from
      * @param buffer   the buffer to read data into
+     * @param context  the context information related to the connection
      * @return the number of bytes read (possibly 0 since the read is non-blocking)
      *         or -1 if the channel has been closed remotely
      * @throws IOException if the endPoint cannot be read
      */
-    protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
+    protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException
     {
-        return endPoint.fill(buffer);
+        int read = endPoint.fill(buffer);
+        if (LOG.isDebugEnabled())
+            LOG.debug("{} read {} bytes", this, read);
+        return read;
     }
 
     /**
@@ -411,8 +437,9 @@
      * @param endPoint the endPoint to write to
      * @param buffer   the buffer to write
      * @param callback the completion callback to invoke
+     * @param context  the context information related to the connection
      */
-    protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
+    protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context)
     {
         if (LOG.isDebugEnabled())
             LOG.debug("{} writing {} bytes", this, buffer.remaining());
@@ -494,14 +521,9 @@
         @Override
         protected void connectionFailed(SocketChannel channel, final Throwable ex, final Object attachment)
         {
-            getExecutor().execute(new Runnable()
-            {
-                public void run()
-                {
-                    ConnectContext connectContext = (ConnectContext)attachment;
-                    onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex);
-                }
-            });
+            close(channel);
+            ConnectContext connectContext = (ConnectContext)attachment;
+            onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex);
         }
     }
 
@@ -561,37 +583,36 @@
         public void onOpen()
         {
             super.onOpen();
-            getExecutor().execute(new Runnable()
-            {
-                public void run()
-                {
-                    onConnectSuccess(connectContext, UpstreamConnection.this);
-                    fillInterested();
-                }
-            });
+            onConnectSuccess(connectContext, UpstreamConnection.this);
+            fillInterested();
         }
 
         @Override
         protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
         {
-            return ConnectHandler.this.read(endPoint, buffer);
+            return ConnectHandler.this.read(endPoint, buffer, getContext());
         }
 
         @Override
         protected void write(EndPoint endPoint, ByteBuffer buffer,Callback callback)
         {
-            ConnectHandler.this.write(endPoint, buffer, callback);
+            ConnectHandler.this.write(endPoint, buffer, callback, getContext());
         }
     }
 
-    public class DownstreamConnection extends ProxyConnection
+    public class DownstreamConnection extends ProxyConnection implements Connection.UpgradeTo
     {
-        private final ByteBuffer buffer;
+        private ByteBuffer buffer;
 
-        public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context, ByteBuffer buffer)
+        public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
         {
             super(endPoint, executor, bufferPool, context);
-            this.buffer = buffer;
+        }
+
+        @Override
+        public void onUpgradeTo(ByteBuffer buffer)
+        {
+            this.buffer = buffer == null ? BufferUtil.EMPTY_BUFFER : buffer;
         }
 
         @Override
@@ -623,13 +644,13 @@
         @Override
         protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
         {
-            return ConnectHandler.this.read(endPoint, buffer);
+            return ConnectHandler.this.read(endPoint, buffer, getContext());
         }
 
         @Override
         protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
         {
-            ConnectHandler.this.write(endPoint, buffer, callback);
+            ConnectHandler.this.write(endPoint, buffer, callback, getContext());
         }
     }
 }
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java
index f601975..780e706 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java
@@ -27,6 +27,8 @@
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.Locale;
 import java.util.concurrent.ConcurrentMap;
@@ -36,12 +38,15 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.eclipse.jetty.io.EndPoint;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
 import org.eclipse.jetty.util.B64Code;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -631,12 +636,33 @@
             }
 
             @Override
+            protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise)
+            {
+                Assert.assertEquals(contextValue, request.getAttribute(contextKey));
+                super.connectToServer(request, host, port, promise);
+            }
+
+            @Override
             protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
             {
                 // Transfer data from the HTTP request to the connection context
                 Assert.assertEquals(contextValue, request.getAttribute(contextKey));
                 context.put(contextKey, request.getAttribute(contextKey));
             }
+
+            @Override
+            protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException
+            {
+                Assert.assertEquals(contextValue, context.get(contextKey));
+                return super.read(endPoint, buffer, context);
+            }
+
+            @Override
+            protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context)
+            {
+                Assert.assertEquals(contextValue, context.get(contextKey));
+                super.write(endPoint, buffer, callback, context);
+            }
         });
         proxy.start();