439895 - No event callback should be invoked after the "failure" callback.

Fixed HttpSender and HttpReceiver to use a non-blocking collaborative
mechanism to notify callbacks.
Only the "failed" callback can run concurrently with other callbacks.
No other callback can run after the "complete" callback: a failure
concurrent with another callback will notify the "failed" callback,
finish the running callback and only then invoke the "complete" callback.
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
index ec90252..356413a 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
@@ -49,9 +49,8 @@
  * is available</li>
  * <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
  * <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
- * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only
- * method that may be invoked multiple times with different buffers containing different content</li>
- * <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
+ * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
+ * <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
  * </ol>
  * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
  * (for example, because of I/O exceptions).
@@ -69,7 +68,8 @@
 
     private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
     private final HttpChannel channel;
-    private volatile ContentDecoder decoder;
+    private ContentDecoder decoder;
+    private Throwable failure;
 
     protected HttpReceiver(HttpChannel channel)
     {
@@ -104,7 +104,7 @@
      */
     protected boolean responseBegin(HttpExchange exchange)
     {
-        if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
+        if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
             return false;
 
         HttpConversation conversation = exchange.getConversation();
@@ -127,6 +127,9 @@
         ResponseNotifier notifier = destination.getResponseNotifier();
         notifier.notifyBegin(conversation.getResponseListeners(), response);
 
+        if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
+            terminateResponse(exchange, failure);
+
         return true;
     }
 
@@ -152,7 +155,7 @@
                 case BEGIN:
                 case HEADER:
                 {
-                    if (updateResponseState(current, ResponseState.HEADER))
+                    if (updateResponseState(current, ResponseState.TRANSIENT))
                         break out;
                     break;
                 }
@@ -188,6 +191,9 @@
             }
         }
 
+        if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
+            terminateResponse(exchange, failure);
+
         return true;
     }
 
@@ -228,7 +234,7 @@
                 case BEGIN:
                 case HEADER:
                 {
-                    if (updateResponseState(current, ResponseState.HEADERS))
+                    if (updateResponseState(current, ResponseState.TRANSIENT))
                         break out;
                     break;
                 }
@@ -261,6 +267,9 @@
             }
         }
 
+        if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
+            terminateResponse(exchange, failure);
+
         return true;
     }
 
@@ -283,7 +292,7 @@
                 case HEADERS:
                 case CONTENT:
                 {
-                    if (updateResponseState(current, ResponseState.CONTENT))
+                    if (updateResponseState(current, ResponseState.TRANSIENT))
                         break out;
                     break;
                 }
@@ -312,6 +321,9 @@
         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
         notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback);
 
+        if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
+            terminateResponse(exchange, failure);
+
         return true;
     }
 
@@ -332,16 +344,17 @@
         if (!completed)
             return false;
 
-        // Reset to be ready for another response
+        responseState.set(ResponseState.IDLE);
+
+        // Reset to be ready for another response.
         reset();
 
         // Mark atomically the response as terminated and succeeded,
         // with respect to concurrency between request and response.
-        // If there is a non-null result, then both sender and
-        // receiver are reset and ready to be reused, and the
-        // connection closed/pooled (depending on the transport).
         Result result = exchange.terminateResponse(null);
 
+        // It is important to notify *after* we reset and terminate
+        // because the notification may trigger another request/response.
         HttpResponse response = exchange.getResponse();
         if (LOG.isDebugEnabled())
             LOG.debug("Response success {}", response);
@@ -349,17 +362,7 @@
         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
         notifier.notifySuccess(listeners, response);
 
-        if (result != null)
-        {
-            boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
-            if (!ordered)
-                channel.exchangeTerminated(result);
-            if (LOG.isDebugEnabled())
-                LOG.debug("Request/Response succeeded {}", response);
-            notifier.notifyComplete(listeners, result);
-            if (ordered)
-                channel.exchangeTerminated(result);
-        }
+        terminateResponse(exchange, result);
 
         return true;
     }
@@ -388,7 +391,20 @@
         if (!completed)
             return false;
 
-        // Dispose to avoid further responses
+        this.failure = failure;
+
+        // Update the state to avoid more response processing.
+        boolean fail;
+        while (true)
+        {
+            ResponseState current = responseState.get();
+            if (updateResponseState(current, ResponseState.FAILURE))
+            {
+                fail = current != ResponseState.TRANSIENT;
+                break;
+            }
+        }
+
         dispose();
 
         // Mark atomically the response as terminated and failed,
@@ -402,19 +418,45 @@
         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
         notifier.notifyFailure(listeners, response, failure);
 
+        if (fail)
+        {
+            terminateResponse(exchange, result);
+        }
+        else
+        {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
+        }
+
+        return true;
+    }
+
+    private void terminateResponse(HttpExchange exchange, Throwable failure)
+    {
+        Result result = exchange.terminateResponse(failure);
+        terminateResponse(exchange, result);
+    }
+
+    private void terminateResponse(HttpExchange exchange, Result result)
+    {
+        HttpResponse response = exchange.getResponse();
+
+        if (LOG.isDebugEnabled())
+            LOG.debug("Response complete {}", response);
+
         if (result != null)
         {
             boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
             if (!ordered)
                 channel.exchangeTerminated(result);
             if (LOG.isDebugEnabled())
-                LOG.debug("Request/Response failed {}", response);
+                LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
+            List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
+            ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
             notifier.notifyComplete(listeners, result);
             if (ordered)
                 channel.exchangeTerminated(result);
         }
-
-        return true;
     }
 
     /**
@@ -427,7 +469,6 @@
     protected void reset()
     {
         decoder = null;
-        responseState.set(ResponseState.IDLE);
     }
 
     /**
@@ -440,7 +481,6 @@
     protected void dispose()
     {
         decoder = null;
-        responseState.set(ResponseState.FAILURE);
     }
 
     public boolean abort(Throwable cause)
@@ -465,6 +505,10 @@
     private enum ResponseState
     {
         /**
+         * One of the response*() methods is being executed.
+         */
+        TRANSIENT,
+        /**
          * The response is not yet received, the initial state
          */
         IDLE,
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 2630206..85d4d0b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -65,7 +65,8 @@
     private final IteratingCallback contentCallback = new ContentCallback();
     private final Callback lastCallback = new LastContentCallback();
     private final HttpChannel channel;
-    private volatile HttpContent content;
+    private HttpContent content;
+    private Throwable failure;
 
     protected HttpSender(HttpChannel channel)
     {
@@ -197,34 +198,40 @@
 
     protected boolean queuedToBegin(Request request)
     {
-        if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN))
+        if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
             return false;
         if (LOG.isDebugEnabled())
             LOG.debug("Request begin {}", request);
         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
         notifier.notifyBegin(request);
+        if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
+            terminateRequest(getHttpExchange(), failure, false);
         return true;
     }
 
     protected boolean beginToHeaders(Request request)
     {
-        if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS))
+        if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
             return false;
         if (LOG.isDebugEnabled())
             LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
         notifier.notifyHeaders(request);
+        if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
+            terminateRequest(getHttpExchange(), failure, false);
         return true;
     }
 
     protected boolean headersToCommit(Request request)
     {
-        if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT))
+        if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
             return false;
         if (LOG.isDebugEnabled())
             LOG.debug("Request committed {}", request);
         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
         notifier.notifyCommit(request);
+        if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
+            terminateRequest(getHttpExchange(), failure, true);
         return true;
     }
 
@@ -236,21 +243,19 @@
             case COMMIT:
             case CONTENT:
             {
-                if (!updateRequestState(current, RequestState.CONTENT))
+                if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
                     return false;
                 if (LOG.isDebugEnabled())
                     LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
                 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
                 notifier.notifyContent(request, content);
+                if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
+                    terminateRequest(getHttpExchange(), failure, true);
                 return true;
             }
-            case FAILURE:
-            {
-                return false;
-            }
             default:
             {
-                throw new IllegalStateException(current.toString());
+                return false;
             }
         }
     }
@@ -269,43 +274,28 @@
                 if (!completed)
                     return false;
 
-                // Reset to be ready for another request
+                requestState.set(RequestState.QUEUED);
+
+                // Reset to be ready for another request.
                 reset();
 
                 // Mark atomically the request as terminated and succeeded,
                 // with respect to concurrency between request and response.
                 Result result = exchange.terminateRequest(null);
 
-                // It is important to notify completion *after* we reset because
-                // the notification may trigger another request/response
                 Request request = exchange.getRequest();
                 if (LOG.isDebugEnabled())
                     LOG.debug("Request success {}", request);
                 HttpDestination destination = getHttpChannel().getHttpDestination();
                 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
 
-                if (result != null)
-                {
-                    boolean ordered = destination.getHttpClient().isStrictEventOrdering();
-                    if (!ordered)
-                        channel.exchangeTerminated(result);
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Request/Response succeded {}", request);
-                    HttpConversation conversation = exchange.getConversation();
-                    destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
-                    if (ordered)
-                        channel.exchangeTerminated(result);
-                }
+                terminateRequest(exchange, null, true, result);
 
                 return true;
             }
-            case FAILURE:
-            {
-                return false;
-            }
             default:
             {
-                throw new IllegalStateException(current.toString());
+                return false;
             }
         }
     }
@@ -322,8 +312,22 @@
         if (!completed)
             return false;
 
-        // Dispose to avoid further requests
-        RequestState requestState = dispose();
+        this.failure = failure;
+
+        // Update the state to avoid more request processing.
+        RequestState current;
+        boolean fail;
+        while (true)
+        {
+            current = requestState.get();
+            if (updateRequestState(current, RequestState.FAILURE))
+            {
+                fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
+                break;
+            }
+        }
+
+        dispose();
 
         // Mark atomically the request as terminated and failed,
         // with respect to concurrency between request and response.
@@ -335,8 +339,36 @@
         HttpDestination destination = getHttpChannel().getHttpDestination();
         destination.getRequestNotifier().notifyFailure(request, failure);
 
-        boolean notCommitted = isBeforeCommit(requestState);
-        if (result == null && notCommitted && request.getAbortCause() == null)
+        if (fail)
+        {
+            terminateRequest(exchange, failure, !isBeforeCommit(current), result);
+        }
+        else
+        {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
+        }
+
+        return true;
+    }
+
+    private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
+    {
+        if (exchange != null)
+        {
+            Result result = exchange.terminateRequest(failure);
+            terminateRequest(exchange, failure, committed, result);
+        }
+    }
+
+    private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
+    {
+        Request request = exchange.getRequest();
+
+        if (LOG.isDebugEnabled())
+            LOG.debug("Terminating request {}", request);
+
+        if (failure != null && !committed && result == null && request.getAbortCause() == null)
         {
             // Complete the response from here
             if (exchange.responseComplete())
@@ -349,18 +381,17 @@
 
         if (result != null)
         {
+            HttpDestination destination = getHttpChannel().getHttpDestination();
             boolean ordered = destination.getHttpClient().isStrictEventOrdering();
             if (!ordered)
                 channel.exchangeTerminated(result);
             if (LOG.isDebugEnabled())
-                LOG.debug("Request/Response failed {}", request);
+                LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
             HttpConversation conversation = exchange.getConversation();
             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
             if (ordered)
                 channel.exchangeTerminated(result);
         }
-
-        return true;
     }
 
     /**
@@ -398,23 +429,14 @@
     {
         content.close();
         content = null;
-        requestState.set(RequestState.QUEUED);
         senderState.set(SenderState.IDLE);
     }
 
-    protected RequestState dispose()
+    protected void dispose()
     {
-        while (true)
-        {
-            RequestState current = requestState.get();
-            if (updateRequestState(current, RequestState.FAILURE))
-            {
-                HttpContent content = this.content;
-                if (content != null)
-                    content.close();
-                return current;
-            }
-        }
+        HttpContent content = this.content;
+        if (content != null)
+            content.close();
     }
 
     public void proceed(HttpExchange exchange, Throwable failure)
@@ -485,7 +507,7 @@
         return abortable && anyToFailure(failure);
     }
 
-    protected boolean updateRequestState(RequestState from, RequestState to)
+    private boolean updateRequestState(RequestState from, RequestState to)
     {
         boolean updated = requestState.compareAndSet(from, to);
         if (!updated)
@@ -505,6 +527,7 @@
     {
         switch (requestState)
         {
+            case TRANSIENT:
             case QUEUED:
             case BEGIN:
             case HEADERS:
@@ -518,6 +541,7 @@
     {
         switch (requestState)
         {
+            case TRANSIENT_CONTENT:
             case COMMIT:
             case CONTENT:
                 return true;
@@ -534,9 +558,17 @@
     /**
      * The request states {@link HttpSender} goes through when sending a request.
      */
-    protected enum RequestState
+    private enum RequestState
     {
         /**
+         * One of the state transition methods is being executed.
+         */
+        TRANSIENT,
+        /**
+         * The content transition method is being executed.
+         */
+        TRANSIENT_CONTENT,
+        /**
          * The request is queued, the initial state
          */
         QUEUED,
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
index 6b0364d..c8ce041 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
@@ -196,12 +196,11 @@
     }
 
     @Override
-    protected RequestState dispose()
+    protected void dispose()
     {
         generator.abort();
-        RequestState result = super.dispose();
+        super.dispose();
         shutdownOutput();
-        return result;
     }
 
     private void shutdownOutput()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java
index 5a68967..789b525 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java
@@ -145,27 +145,40 @@
     }
 
     @Override
+    public void onSuccess(Response response)
+    {
+        if (LOG.isDebugEnabled())
+            LOG.debug("Queuing end of content {}{}", EOF, "");
+        queue.offer(EOF);
+        signal();
+    }
+
+    @Override
+    public void onFailure(Response response, Throwable failure)
+    {
+        fail(failure);
+        signal();
+    }
+
+    @Override
     public void onComplete(Result result)
     {
+        if (result.isFailed() && failure == null)
+            fail(result.getFailure());
         this.result = result;
-        if (result.isSucceeded())
-        {
-            if (LOG.isDebugEnabled())
-                LOG.debug("Queuing end of content {}{}", EOF, "");
-            queue.offer(EOF);
-        }
-        else
-        {
-            if (LOG.isDebugEnabled())
-                LOG.debug("Queuing failure {} {}", FAILURE, failure);
-            queue.offer(FAILURE);
-            this.failure = result.getFailure();
-            responseLatch.countDown();
-        }
         resultLatch.countDown();
         signal();
     }
 
+    private void fail(Throwable failure)
+    {
+        if (LOG.isDebugEnabled())
+            LOG.debug("Queuing failure {} {}", FAILURE, failure);
+        queue.offer(FAILURE);
+        this.failure = failure;
+        responseLatch.countDown();
+    }
+
     protected boolean await()
     {
         try
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java
new file mode 100644
index 0000000..a608942
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java
@@ -0,0 +1,198 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HttpResponseConcurrentAbortTest extends AbstractHttpClientServerTest
+{
+    private final CountDownLatch callbackLatch = new CountDownLatch(1);
+    private final CountDownLatch failureLatch = new CountDownLatch(1);
+    private final CountDownLatch completeLatch = new CountDownLatch(1);
+    private final AtomicBoolean success = new AtomicBoolean();
+
+    public HttpResponseConcurrentAbortTest(SslContextFactory sslContextFactory)
+    {
+        super(sslContextFactory);
+    }
+
+    @Test
+    public void testAbortOnBegin() throws Exception
+    {
+        start(new EmptyServerHandler());
+
+        client.newRequest("localhost", connector.getLocalPort())
+                .scheme(scheme)
+                .onResponseBegin(new Response.BeginListener()
+                {
+                    @Override
+                    public void onBegin(Response response)
+                    {
+                        abort(response);
+                    }
+                })
+                .send(new TestResponseListener());
+        Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(completeLatch.await(6, TimeUnit.SECONDS));
+        Assert.assertTrue(success.get());
+    }
+
+    @Test
+    public void testAbortOnHeader() throws Exception
+    {
+        start(new EmptyServerHandler());
+
+        client.newRequest("localhost", connector.getLocalPort())
+                .scheme(scheme)
+                .onResponseHeader(new Response.HeaderListener()
+                {
+                    @Override
+                    public boolean onHeader(Response response, HttpField field)
+                    {
+                        abort(response);
+                        return true;
+                    }
+                })
+                .send(new TestResponseListener());
+        Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(success.get());
+    }
+
+    @Test
+    public void testAbortOnHeaders() throws Exception
+    {
+        start(new EmptyServerHandler());
+
+        client.newRequest("localhost", connector.getLocalPort())
+                .scheme(scheme)
+                .onResponseHeaders(new Response.HeadersListener()
+                {
+                    @Override
+                    public void onHeaders(Response response)
+                    {
+                        abort(response);
+                    }
+                })
+                .send(new TestResponseListener());
+        Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(success.get());
+    }
+
+    @Test
+    public void testAbortOnContent() throws Exception
+    {
+        start(new AbstractHandler()
+        {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+            {
+                baseRequest.setHandled(true);
+                OutputStream output = response.getOutputStream();
+                output.write(1);
+                output.flush();
+            }
+        });
+
+        client.newRequest("localhost", connector.getLocalPort())
+                .scheme(scheme)
+                .onResponseContent(new Response.ContentListener()
+                {
+                    @Override
+                    public void onContent(Response response, ByteBuffer content)
+                    {
+                        abort(response);
+                    }
+                })
+                .send(new TestResponseListener());
+        Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(success.get());
+    }
+
+    private void abort(final Response response)
+    {
+        Logger logger = Log.getLogger(getClass());
+
+        new Thread("abort")
+        {
+            @Override
+            public void run()
+            {
+                response.abort(new Exception());
+            }
+        }.start();
+
+        try
+        {
+            // The failure callback must be executed asynchronously.
+            boolean latched = failureLatch.await(4, TimeUnit.SECONDS);
+            success.set(latched);
+            logger.info("SIMON - STEP 1");
+
+            // The complete callback must not be executed
+            // until we return from this callback.
+            latched = completeLatch.await(1, TimeUnit.SECONDS);
+            success.set(!latched);
+            logger.info("SIMON - STEP 2");
+
+            callbackLatch.countDown();
+        }
+        catch (InterruptedException x)
+        {
+            throw new RuntimeException(x);
+        }
+    }
+
+    private class TestResponseListener extends Response.Listener.Adapter
+    {
+        @Override
+        public void onFailure(Response response, Throwable failure)
+        {
+            failureLatch.countDown();
+        }
+
+        @Override
+        public void onComplete(Result result)
+        {
+            Assert.assertTrue(result.isFailed());
+            completeLatch.countDown();
+        }
+    }
+}