Implemented HTTP/2 push functionality.
A PushCacheFilter contains the logic to associate secondary resources
to primary resources.
PushCacheFilter calls a Jetty-specific API on the request dispatcher:
Dispatcher.push(ServletRequest). This is a technology preview of the
push functionality slated for Servlet 4.0.
The push() invocation arrives to the transport and it is converted to
HTTP/2 specific PUSH_PROMISE, along with the mechanism to simulate
the request for the secondary resource.
diff --git a/jetty-http2/http2-client/pom.xml b/jetty-http2/http2-client/pom.xml
index 60c4bb9..0f53720 100644
--- a/jetty-http2/http2-client/pom.xml
+++ b/jetty-http2/http2-client/pom.xml
@@ -73,6 +73,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java
index e7644e7..7a47f81 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java
@@ -24,6 +24,7 @@
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.io.EndPoint;
@@ -44,6 +45,9 @@
@Override
public boolean onHeaders(HeadersFrame frame)
{
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received {}", frame);
+
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream == null)
@@ -76,4 +80,47 @@
LOG.info("Failure while notifying listener " + listener, x);
}
}
+
+ @Override
+ public boolean onPushPromise(PushPromiseFrame frame)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received {}", frame);
+
+ int streamId = frame.getStreamId();
+ int pushStreamId = frame.getPromisedStreamId();
+ IStream stream = getStream(streamId);
+ if (stream == null)
+ {
+ ResetFrame reset = new ResetFrame(pushStreamId, ErrorCodes.STREAM_CLOSED_ERROR);
+ reset(reset, disconnectOnFailure());
+ }
+ else
+ {
+ IStream pushStream = createRemoteStream(pushStreamId);
+ pushStream.updateClose(true, true);
+ pushStream.process(frame, Callback.Adapter.INSTANCE);
+ Stream.Listener listener = notifyPush(stream, pushStream, frame);
+ pushStream.setListener(listener);
+ if (pushStream.isClosed())
+ removeStream(pushStream, false);
+ }
+ return false;
+ }
+
+ private Stream.Listener notifyPush(IStream stream, IStream pushStream, PushPromiseFrame frame)
+ {
+ Stream.Listener listener = stream.getListener();
+ if (listener == null)
+ return null;
+ try
+ {
+ return listener.onPush(pushStream, frame);
+ }
+ catch (Throwable x)
+ {
+ LOG.info("Failure while notifying listener " + listener, x);
+ return null;
+ }
+ }
}
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java
index ab9357e..fd2fcd9 100644
--- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java
@@ -20,7 +20,6 @@
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
-
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HostPortHttpField;
@@ -45,7 +44,7 @@
public class AbstractTest
{
protected ServerConnector connector;
- private String path = "/test";
+ protected String servletPath = "/test";
protected HTTP2Client client;
private Server server;
@@ -53,8 +52,9 @@
{
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration()));
- ServletContextHandler context = new ServletContextHandler(server, "/");
- context.addServlet(new ServletHolder(servlet), path);
+ ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
+ context.addServlet(new ServletHolder(servlet), servletPath + "/*");
+ customizeContext(context);
prepareClient();
@@ -62,6 +62,10 @@
client.start();
}
+ protected void customizeContext(ServletContextHandler context)
+ {
+ }
+
protected void startServer(ServerSessionListener listener) throws Exception
{
prepareServer(new RawHTTP2ServerConnectionFactory(listener));
@@ -96,18 +100,23 @@
return promise.get(5, TimeUnit.SECONDS);
}
+ protected MetaData.Request newRequest(String method, HttpFields fields)
+ {
+ return newRequest(method, "", fields);
+ }
+
+ protected MetaData.Request newRequest(String method, String pathInfo, HttpFields fields)
+ {
+ String host = "localhost";
+ int port = connector.getLocalPort();
+ String authority = host + ":" + port;
+ return new MetaData.Request(method, HttpScheme.HTTP, new HostPortHttpField(authority), servletPath + pathInfo, HttpVersion.HTTP_2, fields);
+ }
+
@After
public void dispose() throws Exception
{
client.stop();
server.stop();
}
-
- protected MetaData.Request newRequest(String method, HttpFields fields)
- {
- String host = "localhost";
- int port = connector.getLocalPort();
- String authority = host + ":" + port;
- return new MetaData.Request(method, HttpScheme.HTTP, new HostPortHttpField(authority), path, HttpVersion.HTTP_2, fields);
- }
}
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushTest.java
new file mode 100644
index 0000000..4f62281
--- /dev/null
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PushTest.java
@@ -0,0 +1,139 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http2.client;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlets.PushCacheFilter;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PushTest extends AbstractTest
+{
+ @Override
+ protected void customizeContext(ServletContextHandler context)
+ {
+ context.addFilter(PushCacheFilter.class, "/*", EnumSet.of(DispatcherType.REQUEST));
+ }
+
+ @Test
+ public void testPush() throws Exception
+ {
+ final String primaryResource = "/primary.html";
+ final String secondaryResource = "/secondary.png";
+ final byte[] secondaryData = "SECONDARY".getBytes("UTF-8");
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
+ {
+ String requestURI = req.getRequestURI();
+ ServletOutputStream output = resp.getOutputStream();
+ if (requestURI.endsWith(primaryResource))
+ output.print("<html><head></head><body>PRIMARY</body></html>");
+ else if (requestURI.endsWith(secondaryResource))
+ output.write(secondaryData);
+ }
+ });
+
+ final Session session = newClient(new Session.Listener.Adapter());
+
+ // Request for the primary and secondary resource to build the cache.
+ final String primaryURI = "http://localhost:" + connector.getLocalPort() + servletPath + primaryResource;
+ HttpFields primaryFields = new HttpFields();
+ MetaData.Request primaryRequest = newRequest("GET", primaryResource, primaryFields);
+ final CountDownLatch secondaryResponseLatch = new CountDownLatch(1);
+ session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback)
+ {
+ callback.succeeded();
+ if (frame.isEndStream())
+ {
+ // Request for the secondary resource.
+ HttpFields secondaryFields = new HttpFields();
+ secondaryFields.put(HttpHeader.REFERER, primaryURI);
+ MetaData.Request secondaryRequest = newRequest("GET", secondaryResource, secondaryFields);
+ session.newStream(new HeadersFrame(0, secondaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback)
+ {
+ secondaryResponseLatch.countDown();
+ }
+ });
+ }
+ }
+ });
+ Assert.assertTrue(secondaryResponseLatch.await(5, TimeUnit.SECONDS));
+
+ // Request again the primary resource, we should get the secondary resource pushed.
+ primaryRequest = newRequest("GET", primaryResource, primaryFields);
+ final CountDownLatch primaryResponseLatch = new CountDownLatch(1);
+ final CountDownLatch pushLatch = new CountDownLatch(1);
+ session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
+ {
+ @Override
+ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
+ {
+ return new Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback)
+ {
+ callback.succeeded();
+ if (frame.isEndStream())
+ pushLatch.countDown();
+ }
+ };
+ }
+
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback)
+ {
+ callback.succeeded();
+ if (frame.isEndStream())
+ primaryResponseLatch.countDown();
+ }
+ });
+ Assert.assertTrue(pushLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(primaryResponseLatch.await(5, TimeUnit.SECONDS));
+ }
+}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 6390677..01e768b 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -255,13 +255,6 @@
}
@Override
- public boolean onPushPromise(PushPromiseFrame frame)
- {
- // TODO
- return false;
- }
-
- @Override
public boolean onPing(PingFrame frame)
{
if (LOG.isDebugEnabled())
@@ -332,7 +325,7 @@
priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
- final IStream stream = createLocalStream(frame, promise);
+ final IStream stream = createLocalStream(streamId, promise);
if (stream == null)
return;
stream.updateClose(frame.isEndStream(), true);
@@ -346,6 +339,28 @@
}
@Override
+ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame)
+ {
+ // Synchronization is necessary to atomically create
+ // the stream id and enqueue the frame to be sent.
+ synchronized (this)
+ {
+ int streamId = streamIds.getAndAdd(2);
+ frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
+
+ final IStream pushStream = createLocalStream(streamId, promise);
+ if (pushStream == null)
+ return;
+ pushStream.updateClose(true, false);
+
+ ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
+ flusher.append(entry);
+ }
+ // Iterate outside the synchronized block.
+ flusher.iterate();
+ }
+
+ @Override
public void settings(SettingsFrame frame, Callback callback)
{
control(null, callback, frame);
@@ -414,7 +429,7 @@
flusher.iterate();
}
- protected IStream createLocalStream(HeadersFrame frame, Promise<Stream> promise)
+ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
{
while (true)
{
@@ -429,8 +444,7 @@
break;
}
- IStream stream = newStream(frame);
- int streamId = stream.getId();
+ IStream stream = newStream(streamId);
if (streams.putIfAbsent(streamId, stream) == null)
{
stream.setIdleTimeout(endPoint.getIdleTimeout());
@@ -446,10 +460,8 @@
}
}
- protected IStream createRemoteStream(HeadersFrame frame)
+ protected IStream createRemoteStream(int streamId)
{
- int streamId = frame.getStreamId();
-
// SPEC: exceeding max concurrent streams is treated as stream error.
while (true)
{
@@ -464,7 +476,7 @@
break;
}
- IStream stream = newStream(frame);
+ IStream stream = newStream(streamId);
// SPEC: duplicate stream is treated as connection error.
if (streams.putIfAbsent(streamId, stream) == null)
@@ -483,9 +495,9 @@
}
}
- protected IStream newStream(HeadersFrame frame)
+ protected IStream newStream(int streamId)
{
- return new HTTP2Stream(scheduler, this, frame);
+ return new HTTP2Stream(scheduler, this, streamId);
}
protected void removeStream(IStream stream, boolean local)
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index bd7668e..f22a93c 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -28,9 +28,11 @@
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -52,21 +54,21 @@
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final ISession session;
- private final HeadersFrame frame;
+ private final int streamId;
private volatile Listener listener;
private volatile boolean reset;
- public HTTP2Stream(Scheduler scheduler, ISession session, HeadersFrame frame)
+ public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
{
super(scheduler);
this.session = session;
- this.frame = frame;
+ this.streamId = streamId;
}
@Override
public int getId()
{
- return frame.getStreamId();
+ return streamId;
}
@Override
@@ -82,6 +84,12 @@
}
@Override
+ public void push(PushPromiseFrame frame, Promise<Stream> promise)
+ {
+ session.push(this, promise, frame);
+ }
+
+ @Override
public void data(DataFrame frame, Callback callback)
{
session.data(this, callback, frame);
@@ -198,6 +206,10 @@
reset = true;
return false;
}
+ case PUSH_PROMISE:
+ {
+ return false;
+ }
default:
{
throw new UnsupportedOperationException();
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
index d40ba0b..024922c 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
@@ -19,10 +19,13 @@
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
public interface ISession extends Session
{
@@ -31,6 +34,8 @@
public void control(IStream stream, Callback callback, Frame frame, Frame... frames);
+ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame);
+
public void data(IStream stream, Callback callback, DataFrame frame);
public int updateSendWindow(int delta);
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
index 0139f12..c493fae 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
@@ -24,6 +24,8 @@
public interface IStream extends Stream
{
+ public static final String CHANNEL_ATTRIBUTE = IStream.class.getName() + ".channel";
+
@Override
public ISession getSession();
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
index b6af7e7..c5aa7f2 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
@@ -20,7 +20,9 @@
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
public interface Stream
{
@@ -30,6 +32,8 @@
public void headers(HeadersFrame frame, Callback callback);
+ public void push(PushPromiseFrame frame, Promise<Stream> promise);
+
public void data(DataFrame frame, Callback callback);
public Object getAttribute(String key);
@@ -52,13 +56,12 @@
{
public void onHeaders(Stream stream, HeadersFrame frame);
+ public Listener onPush(Stream stream, PushPromiseFrame frame);
+
public void onData(Stream stream, DataFrame frame, Callback callback);
- // TODO: is this method needed ?
public void onFailure(Stream stream, Throwable x);
- // TODO: See SPDY's StreamFrameListener
-
public static class Adapter implements Listener
{
@Override
@@ -67,6 +70,12 @@
}
@Override
+ public Listener onPush(Stream stream, PushPromiseFrame frame)
+ {
+ return null;
+ }
+
+ @Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java
index a1c4412..c4d0b70 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java
@@ -48,4 +48,10 @@
{
return metaData;
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s#%d/%d", super.toString(), streamId, promisedStreamId);
+ }
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java
index 0a17081..a716ccc 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/Parser.java
@@ -97,17 +97,7 @@
{
int type = headerParser.getFrameType();
if (LOG.isDebugEnabled())
- {
- int fl=headerParser.getLength();
- int l=Math.min(16,Math.min(buffer.remaining(),fl));
-
- LOG.debug(String.format("Parsing %s frame %s%s%s",
- FrameType.from(type),
- " ".substring(0,11-FrameType.from(type).toString().length()),
- TypeUtil.toHexString(buffer.array(),buffer.arrayOffset()+buffer.position(),l),
- l<fl?"...":""));
- }
-
+ LOG.debug("Parsing {} frame", FrameType.from(type));
if (type < 0 || type >= bodyParsers.length)
{
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "unknown_frame_type_" + type);
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java
index fb6403c..5c39454 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java
@@ -21,12 +21,16 @@
import java.util.HashMap;
import java.util.Map;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.ErrorCodes;
+import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.ByteBufferPool;
@@ -41,7 +45,6 @@
public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionFactory
{
private static final Logger LOG = Log.getLogger(HTTP2ServerConnectionFactory.class);
- private static final String CHANNEL_ATTRIBUTE = HttpChannelOverHTTP2.class.getName();
private final HttpConfiguration httpConfiguration;
@@ -93,14 +96,14 @@
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
- HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2((IStream)stream, frame);
+ MetaData.Request request = (MetaData.Request)frame.getMetaData();
+ HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)stream, request);
HttpInputOverHTTP2 input = new HttpInputOverHTTP2();
-
// TODO pool HttpChannels per connection - maybe associate with thread?
HttpChannelOverHTTP2 channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, transport, input, stream);
- stream.setAttribute(CHANNEL_ATTRIBUTE, channel);
+ stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
- channel.onHeadersFrame(frame);
+ channel.onRequest(frame);
return frame.isEndStream() ? null : this;
}
@@ -109,6 +112,15 @@
public void onHeaders(Stream stream, HeadersFrame frame)
{
// Servers do not receive responses.
+ close(stream, "response_headers");
+ }
+
+ @Override
+ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
+ {
+ // Servers do not receive pushes.
+ close(stream, "push_promise");
+ return null;
}
@Override
@@ -117,7 +129,7 @@
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
- HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(CHANNEL_ATTRIBUTE);
+ HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.requestContent(frame, callback);
}
@@ -126,5 +138,18 @@
{
// TODO
}
+
+ private void close(Stream stream, String reason)
+ {
+ final Session session = stream.getSession();
+ session.close(ErrorCodes.PROTOCOL_ERROR, reason, new Callback.Adapter()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ ((ISession)session).disconnect();
+ }
+ });
+ }
}
}
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java
index a0df3ce..fced76c 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java
@@ -21,6 +21,8 @@
import java.util.Collections;
import java.util.Map;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
@@ -29,6 +31,7 @@
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ServerParser;
@@ -66,17 +69,32 @@
@Override
public boolean onHeaders(HeadersFrame frame)
{
- IStream stream = createRemoteStream(frame);
- if (stream != null)
+ MetaData metaData = frame.getMetaData();
+ if (metaData.isRequest())
{
- stream.updateClose(frame.isEndStream(), false);
- stream.process(frame, Callback.Adapter.INSTANCE);
- Stream.Listener listener = notifyNewStream(stream, frame);
- stream.setListener(listener);
- // The listener may have sent a frame that closed the stream.
- if (stream.isClosed())
- removeStream(stream, false);
+ IStream stream = createRemoteStream(frame.getStreamId());
+ if (stream != null)
+ {
+ stream.updateClose(frame.isEndStream(), false);
+ stream.process(frame, Callback.Adapter.INSTANCE);
+ Stream.Listener listener = notifyNewStream(stream, frame);
+ stream.setListener(listener);
+ // The listener may have sent a frame that closed the stream.
+ if (stream.isClosed())
+ removeStream(stream, false);
+ }
}
+ else
+ {
+ onConnectionFailure(ErrorCodes.INTERNAL_ERROR, "invalid_request");
+ }
+ return false;
+ }
+
+ @Override
+ public boolean onPushPromise(PushPromiseFrame frame)
+ {
+ onConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "push_promise");
return false;
}
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java
index 2bd2ef9..1160212 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java
@@ -21,13 +21,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@@ -63,16 +63,9 @@
return _expect100Continue;
}
- public void onHeadersFrame(HeadersFrame frame)
+ public void onRequest(HeadersFrame frame)
{
- MetaData metaData = frame.getMetaData();
- if (!metaData.isRequest())
- {
- onBadMessage(400, null);
- return;
- }
-
- MetaData.Request request = (MetaData.Request)metaData;
+ MetaData.Request request = (MetaData.Request)frame.getMetaData();
HttpFields fields = request.getFields();
_expect100Continue = fields.contains(HttpHeader.EXPECT,HttpHeaderValue.CONTINUE.asString());
@@ -91,7 +84,22 @@
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Request #{}:{}{} {} {}{}{}",
- stream.getId(), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(), System.lineSeparator(), fields);
+ stream.getId(), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(),
+ System.lineSeparator(), fields);
+ }
+
+ execute(this);
+ }
+
+ public void onPushRequest(MetaData.Request request)
+ {
+ onRequest(request);
+
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("HTTP2 Push Request #{}:{}{} {} {}{}{}",
+ stream.getId(), System.lineSeparator(), request.getMethod(), request.getURI(), request.getVersion(),
+ System.lineSeparator(), request.getFields());
}
execute(this);
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java
index 86f37c8..b0e5b48 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java
@@ -26,11 +26,17 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.IStream;
+import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PushPromiseFrame;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -40,11 +46,17 @@
private final AtomicBoolean commit = new AtomicBoolean();
private final Callback commitCallback = new CommitCallback();
+ private final Connector connector;
+ private final HttpConfiguration httpConfiguration;
+ private final EndPoint endPoint;
private final IStream stream;
- private final HeadersFrame request;
+ private final MetaData.Request request;
- public HttpTransportOverHTTP2(IStream stream, HeadersFrame request)
+ public HttpTransportOverHTTP2(Connector connector, HttpConfiguration httpConfiguration, EndPoint endPoint, IStream stream, MetaData.Request request)
{
+ this.connector = connector;
+ this.httpConfiguration = httpConfiguration;
+ this.endPoint = endPoint;
this.stream = stream;
this.request = request;
}
@@ -52,8 +64,7 @@
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{
- MetaData.Request metaData = (MetaData.Request)request.getMetaData();
- boolean isHeadRequest = HttpMethod.HEAD.is(metaData.getMethod());
+ boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
// info != null | content != 0 | last = true => commit + send/end
// info != null | content != 0 | last = false => commit + send
@@ -86,14 +97,30 @@
}
}
- /**
- * @see org.eclipse.jetty.server.HttpTransport#push(org.eclipse.jetty.http.MetaData.Request)
- */
@Override
- public void push(org.eclipse.jetty.http.MetaData.Request request)
- {
- // TODO implement push
- LOG.warn("NOT YET IMPLEMENTED push in {}",this);
+ public void push(final MetaData.Request request)
+ {
+ stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
+ {
+ @Override
+ public void succeeded(Stream pushStream)
+ {
+ HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, httpConfiguration, endPoint, (IStream)pushStream, request);
+ HttpInputOverHTTP2 input = new HttpInputOverHTTP2();
+ HttpChannelOverHTTP2 channel = new HttpChannelOverHTTP2(connector, httpConfiguration, endPoint, transport, input, pushStream);
+ pushStream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
+
+ channel.onPushRequest(request);
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not push " + request, x);
+ stream.getSession().disconnect();
+ }
+ });
}
private void commit(HttpGenerator.ResponseInfo info, boolean endStream, Callback callback)
diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/PushCacheFilter.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/PushCacheFilter.java
index 09af845..bdeec79 100644
--- a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/PushCacheFilter.java
+++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/PushCacheFilter.java
@@ -22,7 +22,8 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -30,6 +31,7 @@
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@@ -41,8 +43,6 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-
-/* ------------------------------------------------------------ */
/**
* A filter that builds a cache of associated resources to push
* using the following heuristics:<ul>
@@ -53,135 +53,133 @@
* <li>If the time period between a request and an associated request is small,
* that indicates a possible push resource
* </ul>
- *
*/
public class PushCacheFilter implements Filter
{
private static final Logger LOG = Log.getLogger(PushCacheFilter.class);
- private final ConcurrentMap<String, Target> _cache = new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, PrimaryResource> _cache = new ConcurrentHashMap<>();
+ private long _associatePeriod = 2000L;
- private long _associateDelay=2000L;
-
- /* ------------------------------------------------------------ */
- /**
- * @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
- */
@Override
public void init(FilterConfig config) throws ServletException
{
- if (config.getInitParameter("associateDelay")!=null)
- _associateDelay=Long.valueOf(config.getInitParameter("associateDelay"));
+ if (config.getInitParameter("associateDelay") != null)
+ _associatePeriod = Long.valueOf(config.getInitParameter("associateDelay"));
}
- /* ------------------------------------------------------------ */
- /**
- * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest, javax.servlet.ServletResponse, javax.servlet.FilterChain)
- */
@Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
- {
- Request baseRequest = Request.getBaseRequest(request);
-
-
+ public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException
+ {
+ HttpServletRequest request = (HttpServletRequest)req;
+
// Iterating over fields is more efficient than multiple gets
- HttpFields fields = baseRequest.getHttpFields();
- boolean conditional=false;
- String referer=null;
- loop: for (int i=0;i<fields.size();i++)
+ HttpFields fields = Request.getBaseRequest(req).getHttpFields();
+ boolean conditional = false;
+ String referrer = null;
+ loop: for (int i = 0; i < fields.size(); i++)
{
- HttpField field=fields.getField(i);
- HttpHeader header=field.getHeader();
- if (header==null)
+ HttpField field = fields.getField(i);
+ HttpHeader header = field.getHeader();
+ if (header == null)
continue;
-
+
switch (header)
{
case IF_MATCH:
case IF_MODIFIED_SINCE:
case IF_NONE_MATCH:
case IF_UNMODIFIED_SINCE:
- conditional=true;
+ conditional = true;
break loop;
-
+
case REFERER:
- referer=field.getValue();
+ referrer = field.getValue();
break;
-
+
default:
break;
}
}
if (LOG.isDebugEnabled())
- LOG.debug("{} {} referer={} conditional={}%n",baseRequest.getMethod(),baseRequest.getRequestURI(),referer,conditional);
+ LOG.debug("{} {} referrer={} conditional={}", request.getMethod(), request.getRequestURI(), referrer, conditional);
- HttpURI uri = null;
if (!conditional)
{
- String session = baseRequest.getSession(true).getId();
- String path = URIUtil.addPaths(baseRequest.getServletPath(),baseRequest.getPathInfo());
-
- if (referer!=null)
+ String path = URIUtil.addPaths(request.getServletPath(), request.getPathInfo());
+
+ if (referrer != null)
{
- uri = new HttpURI(referer);
- if (request.getServerName().equals(uri.getHost()))
+ HttpURI referrerURI = new HttpURI(referrer);
+ if (request.getServerName().equals(referrerURI.getHost()) &&
+ request.getServerPort() == referrerURI.getPort())
{
- String from = uri.getPath();
- if (from.startsWith(baseRequest.getContextPath()))
+ String referrerPath = referrerURI.getPath();
+ if (referrerPath.startsWith(request.getContextPath()))
{
- String from_in_ctx = from.substring(baseRequest.getContextPath().length());
-
- Target target = _cache.get(from_in_ctx);
- if (target!=null)
+ String referrerPathNoContext = referrerPath.substring(request.getContextPath().length());
+ PrimaryResource primaryResource = _cache.get(referrerPathNoContext);
+ if (primaryResource != null)
{
- Long last = target._timestamp.get(session);
- if (last!=null && (System.currentTimeMillis()-last)<_associateDelay && !target._associated.containsKey(path))
+ long primaryTimestamp = primaryResource._timestamp.get();
+ if (primaryTimestamp != 0)
{
- RequestDispatcher dispatcher = baseRequest.getServletContext().getRequestDispatcher(path);
- if (target._associated.putIfAbsent(path,dispatcher)==null)
- LOG.info("ASSOCIATE {}->{}",from_in_ctx,dispatcher);
+ RequestDispatcher dispatcher = request.getServletContext().getRequestDispatcher(path);
+ if (System.nanoTime() - primaryTimestamp < TimeUnit.MILLISECONDS.toNanos(_associatePeriod))
+ {
+ if (primaryResource._associated.putIfAbsent(path, dispatcher) == null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Associated {} -> {}", referrerPathNoContext, dispatcher);
+ }
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Not associated {} -> {}, outside associate period of {}ms", referrerPathNoContext, dispatcher, _associatePeriod);
+ }
}
}
}
}
}
- // push some resources?
- Target target = _cache.get(path);
- if (target == null)
+ // Push some resources?
+ PrimaryResource primaryResource = _cache.get(path);
+ if (primaryResource == null)
{
- Target t=new Target();
- target = _cache.putIfAbsent(path,t);
- target = target==null?t:target;
+ PrimaryResource t = new PrimaryResource();
+ primaryResource = _cache.putIfAbsent(path, t);
+ primaryResource = primaryResource == null ? t : primaryResource;
+ primaryResource._timestamp.compareAndSet(0, System.nanoTime());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Cached {}", path);
}
- target._timestamp.put(session,System.currentTimeMillis());
- if (target._associated.size()>0)
+
+ if (!primaryResource._associated.isEmpty())
{
- for (RequestDispatcher dispatcher : target._associated.values())
+ for (RequestDispatcher dispatcher : primaryResource._associated.values())
{
- LOG.info("PUSH {}->{}",path,dispatcher);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Pushing {} <- {}", dispatcher, path);
((Dispatcher)dispatcher).push(request);
}
}
}
- chain.doFilter(request,response);
-
+ chain.doFilter(req, resp);
}
- /* ------------------------------------------------------------ */
- /**
- * @see javax.servlet.Filter#destroy()
- */
@Override
public void destroy()
- {
+ {
+ _cache.clear();
}
-
- public static class Target
+ private static class PrimaryResource
{
- final ConcurrentMap<String,RequestDispatcher> _associated = new ConcurrentHashMap<>();
- final ConcurrentMap<String,Long> _timestamp = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, RequestDispatcher> _associated = new ConcurrentHashMap<>();
+ private final AtomicLong _timestamp = new AtomicLong();
}
}