Initial implementation of 431642 (async proxy servlet).
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
index afe214f..0002c6b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
@@ -50,6 +50,7 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
public class HttpRequest implements Request
@@ -461,6 +462,20 @@
}
@Override
+ public Request onResponseContent(final Response.AsyncContentListener listener)
+ {
+ this.responseListeners.add(new Response.AsyncContentListener()
+ {
+ @Override
+ public void onContent(Response response, ByteBuffer content, Callback callback)
+ {
+ listener.onContent(response, content, callback);
+ }
+ });
+ return this;
+ }
+
+ @Override
public Request onResponseSuccess(final Response.SuccessListener listener)
{
this.responseListeners.add(new Response.SuccessListener()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java
index cc3b5f8..6c9ae20 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java
@@ -340,9 +340,14 @@
/**
* @param listener a listener for response content events
* @return this request object
+ * @deprecated Use {@link #onResponseContent(Response.AsyncContentListener)} instead.
*/
+ @Deprecated
Request onResponseContent(Response.ContentListener listener);
+ // TODO: JAVADOCS
+ Request onResponseContent(Response.AsyncContentListener listener);
+
/**
* @param listener a listener for response success event
* @return this request object
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java
index 5f60ef8..e92dcaa 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java
@@ -26,6 +26,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.util.Callback;
/**
* <p>{@link Response} represents a HTTP response and offers methods to retrieve status code, HTTP version
@@ -152,6 +153,11 @@
public void onContent(Response response, ByteBuffer content);
}
+ public interface AsyncContentListener extends ResponseListener
+ {
+ public void onContent(Response response, ByteBuffer content, Callback callback);
+ }
+
/**
* Listener for the response succeeded event.
*/
@@ -204,7 +210,7 @@
/**
* Listener for all response events.
*/
- public interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener
+ public interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, AsyncContentListener, SuccessListener, FailureListener, CompleteListener
{
/**
* An empty implementation of {@link Listener}
@@ -233,6 +239,12 @@
}
@Override
+ public void onContent(Response response, ByteBuffer content, Callback callback)
+ {
+ callback.succeeded();
+ }
+
+ @Override
public void onSuccess(Response response)
{
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
index a0af295..73ad961 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,6 +34,7 @@
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.ArrayQueue;
+import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
/**
@@ -83,10 +85,11 @@
*/
public class DeferredContentProvider implements AsyncContentProvider, Closeable
{
- private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
+ private static final Callback EMPTY_CALLBACK = new Callback.Adapter();
+ private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, EMPTY_CALLBACK);
private final Object lock = this;
- private final Queue<ByteBuffer> chunks = new ArrayQueue<>(4, 64, lock);
+ private final Queue<AsyncChunk> chunks = new ArrayQueue<>(4, 64, lock);
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
@@ -127,11 +130,21 @@
*/
public boolean offer(ByteBuffer buffer)
{
+ return offer(buffer, EMPTY_CALLBACK);
+ }
+
+ public boolean offer(ByteBuffer buffer, Callback callback)
+ {
+ return offer(new AsyncChunk(buffer, callback));
+ }
+
+ private boolean offer(AsyncChunk chunk)
+ {
boolean result;
synchronized (lock)
{
- result = chunks.offer(buffer);
- if (result && buffer != CLOSE)
+ result = chunks.offer(chunk);
+ if (result && chunk != CLOSE)
++size;
}
if (result)
@@ -186,7 +199,7 @@
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
{
- private ByteBuffer current;
+ private AsyncChunk current;
@Override
public boolean hasNext()
@@ -202,10 +215,10 @@
{
synchronized (lock)
{
- ByteBuffer element = current = chunks.poll();
- if (element == CLOSE)
+ AsyncChunk chunk = current = chunks.poll();
+ if (chunk == CLOSE)
throw new NoSuchElementException();
- return element;
+ return chunk == null ? null : chunk.buffer;
}
}
@@ -218,24 +231,39 @@
@Override
public void succeeded()
{
+ AsyncChunk chunk;
synchronized (lock)
{
- if (current != null)
- {
- --size;
- lock.notify();
- }
+ chunk = current;
+ --size;
+ lock.notify();
}
+ chunk.callback.succeeded();
}
@Override
public void failed(Throwable x)
{
+ AsyncChunk chunk;
synchronized (lock)
{
+ chunk = current;
failure = x;
lock.notify();
}
+ chunk.callback.failed(x);
+ }
+ }
+
+ private static class AsyncChunk
+ {
+ private final ByteBuffer buffer;
+ private final Callback callback;
+
+ private AsyncChunk(ByteBuffer buffer, Callback callback)
+ {
+ this.buffer = Objects.requireNonNull(buffer);
+ this.callback = Objects.requireNonNull(callback);
}
}
}
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java
new file mode 100644
index 0000000..e9915a4
--- /dev/null
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java
@@ -0,0 +1,106 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.proxy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import javax.servlet.AsyncContext;
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.util.DeferredContentProvider;
+import org.eclipse.jetty.util.Callback;
+
+public class AsyncProxyServlet extends ProxyServlet
+{
+ @Override
+ protected ContentProvider proxyRequestContent(AsyncContext asyncContext, final int requestId) throws IOException
+ {
+ ServletInputStream input = asyncContext.getRequest().getInputStream();
+ DeferredContentProvider provider = new DeferredContentProvider();
+ input.setReadListener(new StreamReader(input, requestId, provider));
+ return provider;
+ }
+
+ private class StreamReader implements ReadListener, Callback
+ {
+ private final byte[] buffer = new byte[512];
+ private final ServletInputStream input;
+ private final int requestId;
+ private final DeferredContentProvider provider;
+
+ public StreamReader(ServletInputStream input, int requestId, DeferredContentProvider provider)
+ {
+ this.input = input;
+ this.requestId = requestId;
+ this.provider = provider;
+ }
+
+ @Override
+ public void onDataAvailable() throws IOException
+ {
+ _log.debug("Asynchronous read start from {}", input);
+
+ // First check for isReady() because it has
+ // side effects, and then for isFinished().
+ while (input.isReady() && !input.isFinished())
+ {
+ int read = input.read(buffer);
+ _log.debug("Asynchronous read {} bytes from {}", read, input);
+ if (read >= 0)
+ {
+ _log.debug("{} proxying content to upstream: {} bytes", requestId, read);
+ provider.offer(ByteBuffer.wrap(buffer, 0, read), this);
+ // Do not call isReady() so that we can apply backpressure.
+ break;
+ }
+ }
+ if (!input.isFinished())
+ _log.debug("Asynchronous read pending from {}", input);
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
+ _log.debug("{} proxying content to upstream completed", requestId);
+ provider.close();
+ }
+
+ @Override
+ public void onError(Throwable x)
+ {
+ failed(x);
+ }
+
+ @Override
+ public void succeeded()
+ {
+ // Notify the container that it may call onDataAvailable() again.
+ input.isReady();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ // TODO: send a response error ?
+ // complete the async context since we cannot throw an exception from here.
+ }
+ }
+}
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java
index 8207b98..de40c8e 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java
@@ -19,6 +19,7 @@
package org.eclipse.jetty.proxy;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
@@ -41,6 +42,7 @@
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@@ -385,7 +387,7 @@
if (rewrittenURI == null)
{
- response.sendError(HttpServletResponse.SC_FORBIDDEN);
+ onRewriteFailed(request, response);
return;
}
@@ -427,25 +429,6 @@
addViaHeader(proxyRequest);
addXForwardedHeaders(proxyRequest, request);
- if (hasContent)
- {
- proxyRequest.content(new InputStreamContentProvider(request.getInputStream())
- {
- @Override
- public long getLength()
- {
- return request.getContentLength();
- }
-
- @Override
- protected ByteBuffer onRead(byte[] buffer, int offset, int length)
- {
- _log.debug("{} proxying content to upstream: {} bytes", requestId, length);
- return super.onRead(buffer, offset, length);
- }
- });
- }
-
final AsyncContext asyncContext = request.startAsync();
// We do not timeout the continuation, but the proxy request
asyncContext.setTimeout(0);
@@ -453,6 +436,9 @@
customizeProxyRequest(proxyRequest, request);
+ if (hasContent)
+ proxyRequest.content(proxyRequestContent(asyncContext, requestId));
+
if (_log.isDebugEnabled())
{
StringBuilder builder = new StringBuilder(request.getMethod());
@@ -490,6 +476,31 @@
proxyRequest.send(new ProxyResponseListener(request, response));
}
+ protected ContentProvider proxyRequestContent(final AsyncContext asyncContext, final int requestId) throws IOException
+ {
+ final HttpServletRequest request = (HttpServletRequest)asyncContext.getRequest();
+ return new InputStreamContentProvider(request.getInputStream())
+ {
+ @Override
+ public long getLength()
+ {
+ return request.getContentLength();
+ }
+
+ @Override
+ protected ByteBuffer onRead(byte[] buffer, int offset, int length)
+ {
+ _log.debug("{} proxying content to upstream: {} bytes", requestId, length);
+ return super.onRead(buffer, offset, length);
+ }
+ };
+ }
+
+ protected void onRewriteFailed(HttpServletRequest request, HttpServletResponse response) throws IOException
+ {
+ response.sendError(HttpServletResponse.SC_FORBIDDEN);
+ }
+
protected Request addViaHeader(Request proxyRequest)
{
return proxyRequest.header(HttpHeader.VIA, "http/1.1 " + getViaHost());
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java
index ec32b8b..2e2f3c6 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java
@@ -18,8 +18,6 @@
package org.eclipse.jetty.proxy;
-import static java.nio.file.StandardOpenOption.CREATE;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -30,6 +28,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,7 +38,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
-
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@@ -64,7 +62,6 @@
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
@@ -78,11 +75,24 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-@RunWith(AdvancedRunner.class)
+import static java.nio.file.StandardOpenOption.CREATE;
+
+@RunWith(Parameterized.class)
public class ProxyServletTest
{
private static final String PROXIED_HEADER = "X-Proxied";
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data()
+ {
+ return Arrays.asList(new Object[][]{
+ {new ProxyServlet()},
+ {new AsyncProxyServlet()}
+ });
+ }
+
@Rule
public final TestTracker tracker = new TestTracker();
private HttpClient client;
@@ -92,6 +102,16 @@
private Server server;
private ServerConnector serverConnector;
+ public ProxyServletTest(ProxyServlet proxyServlet)
+ {
+ this.proxyServlet = proxyServlet;
+ }
+
+ private void prepareProxy() throws Exception
+ {
+ prepareProxy(proxyServlet);
+ }
+
private void prepareProxy(ProxyServlet proxyServlet) throws Exception
{
proxy = new Server();
@@ -145,7 +165,7 @@
@Test
public void testProxyDown() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new EmptyHttpServlet());
// Shutdown the proxy
@@ -167,7 +187,7 @@
@Test
public void testServerDown() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new EmptyHttpServlet());
// Shutdown the server
@@ -187,7 +207,7 @@
((StdErrLog)Log.getLogger(ServletHandler.class)).setHideStacks(true);
try
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -198,8 +218,8 @@
});
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
- .timeout(5, TimeUnit.SECONDS)
- .send();
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
Assert.assertEquals(500, response.getStatus());
}
@@ -212,7 +232,7 @@
@Test
public void testProxyWithoutContent() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -234,7 +254,7 @@
@Test
public void testProxyWithResponseContent() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
HttpClient result = new HttpClient();
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
@@ -259,16 +279,16 @@
}
});
- for ( int i = 0; i < 10; ++i )
+ for (int i = 0; i < 10; ++i)
{
- // Request is for the target server
+ // Request is for the target server
responses[i] = result.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
}
- for ( int i = 0; i < 10; ++i )
+ for (int i = 0; i < 10; ++i)
{
Assert.assertEquals(200, responses[i].getStatus());
Assert.assertTrue(responses[i].getHeaders().containsKey(PROXIED_HEADER));
@@ -279,7 +299,7 @@
@Test
public void testProxyWithRequestContentAndResponseContent() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -307,7 +327,7 @@
@Test
public void testProxyWithBigRequestContentIgnored() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -335,7 +355,7 @@
final byte[] content = new byte[128 * 1024];
new Random().nextBytes(content);
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -370,7 +390,7 @@
@Test
public void testProxyWithBigResponseContentWithSlowReader() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
// Create a 6 MiB file
final int length = 6 * 1024;
@@ -431,7 +451,7 @@
@Test
public void testProxyWithQueryString() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
String query = "a=1&b=%E2%82%AC";
prepareServer(new HttpServlet()
{
@@ -453,7 +473,7 @@
@Test
public void testProxyLongPoll() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
final long timeout = 1000;
prepareServer(new HttpServlet()
{
@@ -504,7 +524,7 @@
@Test
public void testProxyRequestExpired() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
final long timeout = 1000;
proxyServlet.setTimeout(timeout);
prepareServer(new HttpServlet()
@@ -536,7 +556,7 @@
@Test(expected = TimeoutException.class)
public void testClientRequestExpired() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
final long timeout = 1000;
proxyServlet.setTimeout(3 * timeout);
prepareServer(new HttpServlet()
@@ -566,7 +586,7 @@
@Test
public void testProxyXForwardedHostHeaderIsPresent() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -587,7 +607,7 @@
@Test
public void testProxyWhiteList() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new EmptyHttpServlet());
int port = serverConnector.getLocalPort();
proxyServlet.getWhiteListHosts().add("127.0.0.1:" + port);
@@ -608,7 +628,7 @@
@Test
public void testProxyBlackList() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new EmptyHttpServlet());
int port = serverConnector.getLocalPort();
proxyServlet.getBlackListHosts().add("localhost:" + port);
@@ -629,7 +649,7 @@
@Test
public void testClientExcludedHosts() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -821,7 +841,7 @@
@Test
public void testRedirectsAreProxied() throws Exception
{
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -846,7 +866,7 @@
public void testGZIPContentIsProxied() throws Exception
{
final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override
@@ -873,13 +893,15 @@
@Test(expected = TimeoutException.class)
public void shouldHandleWrongContentLength() throws Exception
{
- prepareProxy(new ProxyServlet());
- prepareServer(new HttpServlet() {
+ prepareProxy();
+ prepareServer(new HttpServlet()
+ {
@Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
+ {
byte[] message = "tooshort".getBytes("ascii");
resp.setContentType("text/plain;charset=ascii");
- resp.setHeader("Content-Length", Long.toString(message.length+1));
+ resp.setHeader("Content-Length", Long.toString(message.length + 1));
resp.getOutputStream().write(message);
}
});
@@ -895,7 +917,7 @@
public void testCookiesFromDifferentClientsAreNotMixed() throws Exception
{
final String name = "biscuit";
- prepareProxy(new ProxyServlet());
+ prepareProxy();
prepareServer(new HttpServlet()
{
@Override