439895 - No event callback should be invoked after the "failure" callback.
Fixed HttpSender and HttpReceiver to use a non-blocking collaborative
mechanism to notify callbacks.
Only the "failed" callback can run concurrently with other callbacks.
No other callback can run after the "complete" callback: a failure
concurrent with another callback will notify the "failed" callback,
finish the running callback and only then invoke the "complete" callback.
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
index ec90252..356413a 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
@@ -49,9 +49,8 @@
* is available</li>
* <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
* <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
- * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only
- * method that may be invoked multiple times with different buffers containing different content</li>
- * <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
+ * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
+ * <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
* </ol>
* At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
* (for example, because of I/O exceptions).
@@ -69,7 +68,8 @@
private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
private final HttpChannel channel;
- private volatile ContentDecoder decoder;
+ private ContentDecoder decoder;
+ private Throwable failure;
protected HttpReceiver(HttpChannel channel)
{
@@ -104,7 +104,7 @@
*/
protected boolean responseBegin(HttpExchange exchange)
{
- if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
+ if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
return false;
HttpConversation conversation = exchange.getConversation();
@@ -127,6 +127,9 @@
ResponseNotifier notifier = destination.getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response);
+ if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
+ terminateResponse(exchange, failure);
+
return true;
}
@@ -152,7 +155,7 @@
case BEGIN:
case HEADER:
{
- if (updateResponseState(current, ResponseState.HEADER))
+ if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break;
}
@@ -188,6 +191,9 @@
}
}
+ if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
+ terminateResponse(exchange, failure);
+
return true;
}
@@ -228,7 +234,7 @@
case BEGIN:
case HEADER:
{
- if (updateResponseState(current, ResponseState.HEADERS))
+ if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break;
}
@@ -261,6 +267,9 @@
}
}
+ if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
+ terminateResponse(exchange, failure);
+
return true;
}
@@ -283,7 +292,7 @@
case HEADERS:
case CONTENT:
{
- if (updateResponseState(current, ResponseState.CONTENT))
+ if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break;
}
@@ -312,6 +321,9 @@
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback);
+ if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
+ terminateResponse(exchange, failure);
+
return true;
}
@@ -332,16 +344,17 @@
if (!completed)
return false;
- // Reset to be ready for another response
+ responseState.set(ResponseState.IDLE);
+
+ // Reset to be ready for another response.
reset();
// Mark atomically the response as terminated and succeeded,
// with respect to concurrency between request and response.
- // If there is a non-null result, then both sender and
- // receiver are reset and ready to be reused, and the
- // connection closed/pooled (depending on the transport).
Result result = exchange.terminateResponse(null);
+ // It is important to notify *after* we reset and terminate
+ // because the notification may trigger another request/response.
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response success {}", response);
@@ -349,17 +362,7 @@
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifySuccess(listeners, response);
- if (result != null)
- {
- boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
- if (!ordered)
- channel.exchangeTerminated(result);
- if (LOG.isDebugEnabled())
- LOG.debug("Request/Response succeeded {}", response);
- notifier.notifyComplete(listeners, result);
- if (ordered)
- channel.exchangeTerminated(result);
- }
+ terminateResponse(exchange, result);
return true;
}
@@ -388,7 +391,20 @@
if (!completed)
return false;
- // Dispose to avoid further responses
+ this.failure = failure;
+
+ // Update the state to avoid more response processing.
+ boolean fail;
+ while (true)
+ {
+ ResponseState current = responseState.get();
+ if (updateResponseState(current, ResponseState.FAILURE))
+ {
+ fail = current != ResponseState.TRANSIENT;
+ break;
+ }
+ }
+
dispose();
// Mark atomically the response as terminated and failed,
@@ -402,19 +418,45 @@
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
+ if (fail)
+ {
+ terminateResponse(exchange, result);
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
+ }
+
+ return true;
+ }
+
+ private void terminateResponse(HttpExchange exchange, Throwable failure)
+ {
+ Result result = exchange.terminateResponse(failure);
+ terminateResponse(exchange, result);
+ }
+
+ private void terminateResponse(HttpExchange exchange, Result result)
+ {
+ HttpResponse response = exchange.getResponse();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Response complete {}", response);
+
if (result != null)
{
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
- LOG.debug("Request/Response failed {}", response);
+ LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
+ List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
+ ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
channel.exchangeTerminated(result);
}
-
- return true;
}
/**
@@ -427,7 +469,6 @@
protected void reset()
{
decoder = null;
- responseState.set(ResponseState.IDLE);
}
/**
@@ -440,7 +481,6 @@
protected void dispose()
{
decoder = null;
- responseState.set(ResponseState.FAILURE);
}
public boolean abort(Throwable cause)
@@ -465,6 +505,10 @@
private enum ResponseState
{
/**
+ * One of the response*() methods is being executed.
+ */
+ TRANSIENT,
+ /**
* The response is not yet received, the initial state
*/
IDLE,
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 2630206..85d4d0b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -65,7 +65,8 @@
private final IteratingCallback contentCallback = new ContentCallback();
private final Callback lastCallback = new LastContentCallback();
private final HttpChannel channel;
- private volatile HttpContent content;
+ private HttpContent content;
+ private Throwable failure;
protected HttpSender(HttpChannel channel)
{
@@ -197,34 +198,40 @@
protected boolean queuedToBegin(Request request)
{
- if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN))
+ if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request begin {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request);
+ if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
+ terminateRequest(getHttpExchange(), failure, false);
return true;
}
protected boolean beginToHeaders(Request request)
{
- if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS))
+ if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyHeaders(request);
+ if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
+ terminateRequest(getHttpExchange(), failure, false);
return true;
}
protected boolean headersToCommit(Request request)
{
- if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT))
+ if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request committed {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyCommit(request);
+ if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
+ terminateRequest(getHttpExchange(), failure, true);
return true;
}
@@ -236,21 +243,19 @@
case COMMIT:
case CONTENT:
{
- if (!updateRequestState(current, RequestState.CONTENT))
+ if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyContent(request, content);
+ if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
+ terminateRequest(getHttpExchange(), failure, true);
return true;
}
- case FAILURE:
- {
- return false;
- }
default:
{
- throw new IllegalStateException(current.toString());
+ return false;
}
}
}
@@ -269,43 +274,28 @@
if (!completed)
return false;
- // Reset to be ready for another request
+ requestState.set(RequestState.QUEUED);
+
+ // Reset to be ready for another request.
reset();
// Mark atomically the request as terminated and succeeded,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(null);
- // It is important to notify completion *after* we reset because
- // the notification may trigger another request/response
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request success {}", request);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifySuccess(exchange.getRequest());
- if (result != null)
- {
- boolean ordered = destination.getHttpClient().isStrictEventOrdering();
- if (!ordered)
- channel.exchangeTerminated(result);
- if (LOG.isDebugEnabled())
- LOG.debug("Request/Response succeded {}", request);
- HttpConversation conversation = exchange.getConversation();
- destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
- if (ordered)
- channel.exchangeTerminated(result);
- }
+ terminateRequest(exchange, null, true, result);
return true;
}
- case FAILURE:
- {
- return false;
- }
default:
{
- throw new IllegalStateException(current.toString());
+ return false;
}
}
}
@@ -322,8 +312,22 @@
if (!completed)
return false;
- // Dispose to avoid further requests
- RequestState requestState = dispose();
+ this.failure = failure;
+
+ // Update the state to avoid more request processing.
+ RequestState current;
+ boolean fail;
+ while (true)
+ {
+ current = requestState.get();
+ if (updateRequestState(current, RequestState.FAILURE))
+ {
+ fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
+ break;
+ }
+ }
+
+ dispose();
// Mark atomically the request as terminated and failed,
// with respect to concurrency between request and response.
@@ -335,8 +339,36 @@
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
- boolean notCommitted = isBeforeCommit(requestState);
- if (result == null && notCommitted && request.getAbortCause() == null)
+ if (fail)
+ {
+ terminateRequest(exchange, failure, !isBeforeCommit(current), result);
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
+ }
+
+ return true;
+ }
+
+ private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
+ {
+ if (exchange != null)
+ {
+ Result result = exchange.terminateRequest(failure);
+ terminateRequest(exchange, failure, committed, result);
+ }
+ }
+
+ private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
+ {
+ Request request = exchange.getRequest();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Terminating request {}", request);
+
+ if (failure != null && !committed && result == null && request.getAbortCause() == null)
{
// Complete the response from here
if (exchange.responseComplete())
@@ -349,18 +381,17 @@
if (result != null)
{
+ HttpDestination destination = getHttpChannel().getHttpDestination();
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
- LOG.debug("Request/Response failed {}", request);
+ LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)
channel.exchangeTerminated(result);
}
-
- return true;
}
/**
@@ -398,23 +429,14 @@
{
content.close();
content = null;
- requestState.set(RequestState.QUEUED);
senderState.set(SenderState.IDLE);
}
- protected RequestState dispose()
+ protected void dispose()
{
- while (true)
- {
- RequestState current = requestState.get();
- if (updateRequestState(current, RequestState.FAILURE))
- {
- HttpContent content = this.content;
- if (content != null)
- content.close();
- return current;
- }
- }
+ HttpContent content = this.content;
+ if (content != null)
+ content.close();
}
public void proceed(HttpExchange exchange, Throwable failure)
@@ -485,7 +507,7 @@
return abortable && anyToFailure(failure);
}
- protected boolean updateRequestState(RequestState from, RequestState to)
+ private boolean updateRequestState(RequestState from, RequestState to)
{
boolean updated = requestState.compareAndSet(from, to);
if (!updated)
@@ -505,6 +527,7 @@
{
switch (requestState)
{
+ case TRANSIENT:
case QUEUED:
case BEGIN:
case HEADERS:
@@ -518,6 +541,7 @@
{
switch (requestState)
{
+ case TRANSIENT_CONTENT:
case COMMIT:
case CONTENT:
return true;
@@ -534,9 +558,17 @@
/**
* The request states {@link HttpSender} goes through when sending a request.
*/
- protected enum RequestState
+ private enum RequestState
{
/**
+ * One of the state transition methods is being executed.
+ */
+ TRANSIENT,
+ /**
+ * The content transition method is being executed.
+ */
+ TRANSIENT_CONTENT,
+ /**
* The request is queued, the initial state
*/
QUEUED,
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
index 6b0364d..c8ce041 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
@@ -196,12 +196,11 @@
}
@Override
- protected RequestState dispose()
+ protected void dispose()
{
generator.abort();
- RequestState result = super.dispose();
+ super.dispose();
shutdownOutput();
- return result;
}
private void shutdownOutput()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java
index 5a68967..789b525 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java
@@ -145,27 +145,40 @@
}
@Override
+ public void onSuccess(Response response)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queuing end of content {}{}", EOF, "");
+ queue.offer(EOF);
+ signal();
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure)
+ {
+ fail(failure);
+ signal();
+ }
+
+ @Override
public void onComplete(Result result)
{
+ if (result.isFailed() && failure == null)
+ fail(result.getFailure());
this.result = result;
- if (result.isSucceeded())
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Queuing end of content {}{}", EOF, "");
- queue.offer(EOF);
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Queuing failure {} {}", FAILURE, failure);
- queue.offer(FAILURE);
- this.failure = result.getFailure();
- responseLatch.countDown();
- }
resultLatch.countDown();
signal();
}
+ private void fail(Throwable failure)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queuing failure {} {}", FAILURE, failure);
+ queue.offer(FAILURE);
+ this.failure = failure;
+ responseLatch.countDown();
+ }
+
protected boolean await()
{
try
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java
new file mode 100644
index 0000000..a608942
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseConcurrentAbortTest.java
@@ -0,0 +1,198 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HttpResponseConcurrentAbortTest extends AbstractHttpClientServerTest
+{
+ private final CountDownLatch callbackLatch = new CountDownLatch(1);
+ private final CountDownLatch failureLatch = new CountDownLatch(1);
+ private final CountDownLatch completeLatch = new CountDownLatch(1);
+ private final AtomicBoolean success = new AtomicBoolean();
+
+ public HttpResponseConcurrentAbortTest(SslContextFactory sslContextFactory)
+ {
+ super(sslContextFactory);
+ }
+
+ @Test
+ public void testAbortOnBegin() throws Exception
+ {
+ start(new EmptyServerHandler());
+
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .onResponseBegin(new Response.BeginListener()
+ {
+ @Override
+ public void onBegin(Response response)
+ {
+ abort(response);
+ }
+ })
+ .send(new TestResponseListener());
+ Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(completeLatch.await(6, TimeUnit.SECONDS));
+ Assert.assertTrue(success.get());
+ }
+
+ @Test
+ public void testAbortOnHeader() throws Exception
+ {
+ start(new EmptyServerHandler());
+
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .onResponseHeader(new Response.HeaderListener()
+ {
+ @Override
+ public boolean onHeader(Response response, HttpField field)
+ {
+ abort(response);
+ return true;
+ }
+ })
+ .send(new TestResponseListener());
+ Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(success.get());
+ }
+
+ @Test
+ public void testAbortOnHeaders() throws Exception
+ {
+ start(new EmptyServerHandler());
+
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .onResponseHeaders(new Response.HeadersListener()
+ {
+ @Override
+ public void onHeaders(Response response)
+ {
+ abort(response);
+ }
+ })
+ .send(new TestResponseListener());
+ Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(success.get());
+ }
+
+ @Test
+ public void testAbortOnContent() throws Exception
+ {
+ start(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ OutputStream output = response.getOutputStream();
+ output.write(1);
+ output.flush();
+ }
+ });
+
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .onResponseContent(new Response.ContentListener()
+ {
+ @Override
+ public void onContent(Response response, ByteBuffer content)
+ {
+ abort(response);
+ }
+ })
+ .send(new TestResponseListener());
+ Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(success.get());
+ }
+
+ private void abort(final Response response)
+ {
+ Logger logger = Log.getLogger(getClass());
+
+ new Thread("abort")
+ {
+ @Override
+ public void run()
+ {
+ response.abort(new Exception());
+ }
+ }.start();
+
+ try
+ {
+ // The failure callback must be executed asynchronously.
+ boolean latched = failureLatch.await(4, TimeUnit.SECONDS);
+ success.set(latched);
+ logger.info("SIMON - STEP 1");
+
+ // The complete callback must not be executed
+ // until we return from this callback.
+ latched = completeLatch.await(1, TimeUnit.SECONDS);
+ success.set(!latched);
+ logger.info("SIMON - STEP 2");
+
+ callbackLatch.countDown();
+ }
+ catch (InterruptedException x)
+ {
+ throw new RuntimeException(x);
+ }
+ }
+
+ private class TestResponseListener extends Response.Listener.Adapter
+ {
+ @Override
+ public void onFailure(Response response, Throwable failure)
+ {
+ failureLatch.countDown();
+ }
+
+ @Override
+ public void onComplete(Result result)
+ {
+ Assert.assertTrue(result.isFailed());
+ completeLatch.countDown();
+ }
+ }
+}