431642 - Implement ProxyServlet using Servlet 3.1 async I/O.

Intermediate commit that implements asynchronous content in HttpClient,
for the HTTP protocol, passing the tests.

This work needs to be extended to FCGI and SPDY and finally implement
the asynchronous proxy servlet.
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
index d7a7b6e..becab10 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
@@ -33,6 +33,7 @@
 import org.eclipse.jetty.http.HttpField;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
 
@@ -48,8 +49,8 @@
  * is available</li>
  * <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
  * <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
- * <li>{@link #responseContent(HttpExchange, ByteBuffer)}, when HTTP content is available; this is the only method
- * that may be invoked multiple times with different buffers containing different content</li>
+ * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only
+ * method that may be invoked multiple times with different buffers containing different content</li>
  * <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
  * </ol>
  * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
@@ -237,7 +238,7 @@
 
         HttpResponse response = exchange.getResponse();
         if (LOG.isDebugEnabled())
-            LOG.debug("Response headers {}{}{}", response, System.getProperty("line.separator"), response.getHeaders().toString().trim());
+            LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
         notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
 
@@ -269,7 +270,7 @@
      * @param buffer the response HTTP content buffer
      * @return whether the processing should continue
      */
-    protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer)
+    protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
     {
         out: while (true)
         {
@@ -292,18 +293,18 @@
 
         HttpResponse response = exchange.getResponse();
         if (LOG.isDebugEnabled())
-            LOG.debug("Response content {}{}{}", response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
+            LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
 
         ContentDecoder decoder = this.decoder;
         if (decoder != null)
         {
             buffer = decoder.decode(buffer);
             if (LOG.isDebugEnabled())
-                LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
+                LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
         }
 
         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
-        notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer);
+        notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback);
 
         return true;
     }
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java
index 4b82405..1b40103 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java
@@ -27,6 +27,8 @@
 import org.eclipse.jetty.client.api.Response;
 import org.eclipse.jetty.client.api.Result;
 import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingNestedCallback;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
 
@@ -106,35 +108,17 @@
         }
     }
 
-    public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer)
+    public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer, Callback callback)
     {
-        // TODO: we need to create a "cumulative" callback that keeps track of how many listeners
-        // TODO: are invoked, and how many of these actually invoked the callback, and eventually
-        // TODO: call the callback passed to this method.
-
-        // Slice the buffer to avoid that listeners peek into data they should not look at.
-        buffer = buffer.slice();
-        if (!buffer.hasRemaining())
-            return;
-        // Optimized to avoid allocations of iterator instances
-        for (int i = 0; i < listeners.size(); ++i)
-        {
-            Response.ResponseListener listener = listeners.get(i);
-            if (listener instanceof Response.ContentListener)
-            {
-                // The buffer was sliced, so we always clear it (position=0, limit=capacity)
-                // before passing it to the listener that may consume it.
-                buffer.clear();
-                notifyContent((Response.ContentListener)listener, response, buffer);
-            }
-        }
+        ContentCallback contentCallback = new ContentCallback(listeners, response, buffer, callback);
+        contentCallback.iterate();
     }
 
-    private void notifyContent(Response.ContentListener listener, Response response, ByteBuffer buffer)
+    private void notifyContent(Response.AsyncContentListener listener, Response response, ByteBuffer buffer, Callback callback)
     {
         try
         {
-            listener.onContent(response, buffer);
+            listener.onContent(response, buffer, callback);
         }
         catch (Throwable x)
         {
@@ -222,7 +206,8 @@
         }
         notifyHeaders(listeners, response);
         if (response instanceof ContentResponse)
-            notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
+            // TODO: handle callback
+            notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), new Callback.Adapter());
         notifySuccess(listeners, response);
     }
 
@@ -243,7 +228,8 @@
         }
         notifyHeaders(listeners, response);
         if (response instanceof ContentResponse)
-            notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
+            // TODO: handle callback
+            notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), new Callback.Adapter());
         notifyFailure(listeners, response, failure);
     }
 
@@ -252,4 +238,51 @@
         forwardFailure(listeners, response, responseFailure);
         notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
     }
+
+    private class ContentCallback extends IteratingNestedCallback
+    {
+        private final List<Response.ResponseListener> listeners;
+        private final Response response;
+        private final ByteBuffer buffer;
+        private int index;
+
+        private ContentCallback(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer, Callback callback)
+        {
+            super(callback);
+            this.listeners = listeners;
+            this.response = response;
+            // Slice the buffer to avoid that listeners peek into data they should not look at.
+            this.buffer = buffer.slice();
+        }
+
+        @Override
+        protected Action process() throws Exception
+        {
+            if (index == listeners.size())
+                return Action.SUCCEEDED;
+
+            Response.ResponseListener listener = listeners.get(index);
+            if (listener instanceof Response.AsyncContentListener)
+            {
+                // The buffer was sliced, so we always clear it
+                // (clear => position=0, limit=capacity) before
+                // passing it to the listener that may consume it.
+                buffer.clear();
+                ResponseNotifier.this.notifyContent((Response.AsyncContentListener)listener, response, buffer, this);
+                return Action.SCHEDULED;
+            }
+            else
+            {
+                succeeded();
+                return Action.SCHEDULED;
+            }
+        }
+
+        @Override
+        public void succeeded()
+        {
+            ++index;
+            super.succeeded();
+        }
+    }
 }
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
index 0447164..7d5af9e 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
@@ -20,6 +20,7 @@
 
 import java.io.EOFException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.HttpExchange;
@@ -32,12 +33,13 @@
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.io.ByteBufferPool;
 import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.EofException;
 import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
 
 public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
 {
     private final HttpParser parser = new HttpParser(this);
+    private ByteBuffer buffer;
     private boolean shutdown;
 
     public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
@@ -58,63 +60,75 @@
 
     public void receive()
     {
-        HttpConnectionOverHTTP connection = getHttpConnection();
-        EndPoint endPoint = connection.getEndPoint();
         HttpClient client = getHttpDestination().getHttpClient();
         ByteBufferPool bufferPool = client.getByteBufferPool();
-        ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
-        try
+        buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
+        if (process())
         {
-            while (true)
+            bufferPool.release(buffer);
+            // Don't linger the buffer around if we are idle.
+            buffer = null;
+        }
+    }
+
+    private boolean process()
+    {
+        HttpConnectionOverHTTP connection = getHttpConnection();
+        EndPoint endPoint = connection.getEndPoint();
+        ByteBuffer buffer = this.buffer;
+        while (true)
+        {
+            try
             {
                 // Connection may be closed in a parser callback
                 if (connection.isClosed())
                 {
-                    LOG.debug("{} closed", connection);
-                    break;
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("{} closed", connection);
+                    return true;
+                }
+
+                if (!parse(buffer))
+                    return false;
+
+                int read = endPoint.fill(buffer);
+                // Avoid boxing of variable 'read'
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Read {} bytes from {}", read, endPoint);
+
+                if (read > 0)
+                {
+                    if (!parse(buffer))
+                        return false;
+                }
+                else if (read == 0)
+                {
+                    fillInterested();
+                    return true;
                 }
                 else
                 {
-                    int read = endPoint.fill(buffer);
-                    if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'
-                        LOG.debug("Read {} bytes from {}", read, endPoint);
-                    if (read > 0)
-                    {
-                        parse(buffer);
-                    }
-                    else if (read == 0)
-                    {
-                        fillInterested();
-                        break;
-                    }
-                    else
-                    {
-                        shutdown();
-                        break;
-                    }
+                    shutdown();
+                    return true;
                 }
             }
-        }
-        catch (EofException x)
-        {
-            LOG.ignore(x);
-            failAndClose(x);
-        }
-        catch (Exception x)
-        {
-            LOG.debug(x);
-            failAndClose(x);
-        }
-        finally
-        {
-            bufferPool.release(buffer);
+            catch (Throwable x)
+            {
+                LOG.debug(x);
+                failAndClose(x);
+                return true;
+            }
         }
     }
 
-    private void parse(ByteBuffer buffer)
+    private boolean parse(ByteBuffer buffer)
     {
         while (buffer.hasRemaining())
-            parser.parseNext(buffer);
+        {
+            if (parser.parseNext(buffer))
+                return parser.isStart();
+        }
+        return true;
     }
 
     private void fillInterested()
@@ -195,13 +209,33 @@
         if (exchange == null)
             return false;
 
-        // TODO: need to create the callback here, then check whether it has completed
-        // TODO: after the call to responseContent. If it has, return false.
-        // TODO: if it has not, return true, and when will be invoked, we need to
-        // TODO: proceed with parsing.
+        final AtomicBoolean completed = new AtomicBoolean();
+        Callback callback = new Callback()
+        {
+            @Override
+            public void succeeded()
+            {
+                if (!completed.compareAndSet(false, true))
+                {
+                    LOG.debug("Content consumed asynchronously, resuming processing");
+                    if (process())
+                    {
+                        // TODO: release the buffer to the pool !
+                    }
+                }
+            }
 
-        responseContent(exchange, buffer);
-        return false;
+            @Override
+            public void failed(Throwable x)
+            {
+                failAndClose(x);
+            }
+        };
+        responseContent(exchange, buffer, callback);
+        // Return false to have the parser continue parsing.
+        // TODO: there is a race here: when this thread returns true, the parser is still running
+        // TODO: some stateful code that may be changed concurrently by the callback thread.
+        return completed.compareAndSet(false, true);
     }
 
     @Override
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
index 73ad961..55a82f7 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
@@ -235,10 +235,14 @@
             synchronized (lock)
             {
                 chunk = current;
-                --size;
-                lock.notify();
+                if (chunk != null)
+                {
+                    --size;
+                    lock.notify();
+                }
             }
-            chunk.callback.succeeded();
+            if (chunk != null)
+                chunk.callback.succeeded();
         }
 
         @Override
@@ -251,7 +255,8 @@
                 failure = x;
                 lock.notify();
             }
-            chunk.callback.failed(x);
+            if (chunk != null)
+                chunk.callback.failed(x);
         }
     }
 
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java
new file mode 100644
index 0000000..56fcd1a
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java
@@ -0,0 +1,127 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
+{
+    public HttpClientAsyncContentTest(SslContextFactory sslContextFactory)
+    {
+        super(sslContextFactory);
+    }
+
+    @Test
+    public void testSmallAsyncContent() throws Exception
+    {
+        start(new AbstractHandler()
+        {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+            {
+                ServletOutputStream output = response.getOutputStream();
+                output.write(65);
+                output.flush();
+                output.write(66);
+            }
+        });
+
+        final AtomicInteger contentCount = new AtomicInteger();
+        final AtomicReference<Callback> callbackRef = new AtomicReference<>();
+        final AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
+        final CountDownLatch completeLatch = new CountDownLatch(1);
+        client.newRequest("localhost", connector.getLocalPort())
+                .scheme(scheme)
+                .onResponseContentAsync(new Response.AsyncContentListener()
+                {
+                    @Override
+                    public void onContent(Response response, ByteBuffer content, Callback callback)
+                    {
+                        contentCount.incrementAndGet();
+                        callbackRef.set(callback);
+                        contentLatch.get().countDown();
+                    }
+                })
+                .send(new Response.CompleteListener()
+                {
+                    @Override
+                    public void onComplete(Result result)
+                    {
+                        completeLatch.countDown();
+                    }
+                });
+
+        Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS));
+        Callback callback = callbackRef.get();
+
+        // Wait a while to be sure that the parsing does not proceed.
+        TimeUnit.MILLISECONDS.sleep(1000);
+
+        Assert.assertEquals(1, contentCount.get());
+
+        // Succeed the content callback to proceed with parsing.
+        callbackRef.set(null);
+        contentLatch.set(new CountDownLatch(1));
+        callback.succeeded();
+
+        Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS));
+        callback = callbackRef.get();
+
+        // Wait a while to be sure that the parsing does not proceed.
+        TimeUnit.MILLISECONDS.sleep(1000);
+
+        Assert.assertEquals(2, contentCount.get());
+        Assert.assertEquals(1, completeLatch.getCount());
+
+        // Succeed the content callback to proceed with parsing.
+        callbackRef.set(null);
+        contentLatch.set(new CountDownLatch(1));
+        callback.succeeded();
+
+        Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertEquals(2, contentCount.get());
+    }
+
+    public void test() throws Exception
+    {
+        try (Socket socket = new Socket())
+        {
+            System.out.println("socket = " + socket);
+        }
+    }
+}
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java
index 1c33cf0..36d91e0 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java
@@ -30,6 +30,7 @@
 import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.io.IdleTimeout;
+import org.eclipse.jetty.util.Callback;
 
 public class HttpChannelOverFCGI extends HttpChannel
 {
@@ -105,7 +106,8 @@
     protected boolean content(ByteBuffer buffer)
     {
         HttpExchange exchange = getHttpExchange();
-        return exchange != null && receiver.responseContent(exchange, buffer);
+        // TODO: handle callback properly
+        return exchange != null && receiver.responseContent(exchange, buffer, new Callback.Adapter());
     }
 
     protected boolean responseSuccess()
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpReceiverOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpReceiverOverFCGI.java
index a222c8c..2cdd34d 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpReceiverOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpReceiverOverFCGI.java
@@ -24,6 +24,7 @@
 import org.eclipse.jetty.client.HttpExchange;
 import org.eclipse.jetty.client.HttpReceiver;
 import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.util.Callback;
 
 public class HttpReceiverOverFCGI extends HttpReceiver
 {
@@ -51,9 +52,9 @@
     }
 
     @Override
-    protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer)
+    protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
     {
-        return super.responseContent(exchange, buffer);
+        return super.responseContent(exchange, buffer, callback);
     }
 
     @Override
diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java
index 40ba14d..7708fd2 100644
--- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java
+++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java
@@ -123,7 +123,8 @@
         {
             int length = dataInfo.length();
             // TODO: avoid data copy here
-            boolean process = responseContent(exchange, dataInfo.asByteBuffer(false));
+            // TODO: handle callback properly
+            boolean process = responseContent(exchange, dataInfo.asByteBuffer(false), new Callback.Adapter());
             dataInfo.consume(length);
 
             if (process)