Bug 493583 - [http] HttpServletResponseWrapperImpl.InternalOutputStream does not forward flush() to originalOutputStream

Signed-off-by: Raymond Auge <raymond.auge@liferay.com>
diff --git a/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/DispatchingTest.java b/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/DispatchingTest.java
index 6f8340b..e1f89a1 100644
--- a/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/DispatchingTest.java
+++ b/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/DispatchingTest.java
@@ -23,6 +23,9 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.DispatcherType;
@@ -31,6 +34,7 @@
 import javax.servlet.RequestDispatcher;
 import javax.servlet.Servlet;
 import javax.servlet.ServletContext;
+import javax.servlet.ServletOutputStream;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
@@ -42,6 +46,8 @@
 import org.eclipse.equinox.http.servlet.testbase.BaseTest;
 import org.eclipse.equinox.http.servlet.tests.util.BaseServlet;
 import org.eclipse.equinox.http.servlet.tests.util.DispatchResultServlet;
+import org.eclipse.equinox.http.servlet.tests.util.EventHandler;
+import org.eclipse.equinox.http.servlet.tests.util.ServletRequestAdvisor;
 import org.junit.Assert;
 import org.junit.Test;
 import org.osgi.service.http.HttpContext;
@@ -1419,4 +1425,156 @@
 		Assert.assertEquals("dog", response.get("X-animal").get(0));
 	}
 
+	// Bug 493583
+	@Test
+	public void test_streamed_response_outputstream() throws Exception {
+		final long interval = 100L;
+
+		Servlet servlet1 = new BaseServlet() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			protected void service(HttpServletRequest request, HttpServletResponse response)
+				throws ServletException, IOException {
+
+		        response.setContentType("text/event-stream");
+		        response.setCharacterEncoding("UTF-8");
+
+				try (ServletOutputStream out = response.getOutputStream()) {
+					for (int i = 1; i <= 10; ++i) {
+						try {
+							Thread.sleep(interval);
+						}
+						catch (InterruptedException e) {
+							throw new ServletException(e);
+						}
+
+						out.print("data: ");
+						out.print(System.currentTimeMillis());
+						out.print("\n\n");
+						out.flush();
+					}
+				}
+			}
+		};
+
+		Dictionary<String, Object> props = new Hashtable<String, Object>();
+		props.put(HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_PATTERN, "/s1/*");
+		registrations.add(getBundleContext().registerService(Servlet.class, servlet1, props));
+
+		final AtomicLong previousTime = new AtomicLong(System.currentTimeMillis());
+		final AtomicInteger counter = new AtomicInteger();
+		final AtomicBoolean result = new AtomicBoolean(true);
+
+		EventHandler handler = new EventHandler() {
+
+			@Override
+			public void handle(Map<String, String> eventMap) {
+				super.handle(eventMap);
+
+				long currentTime = System.currentTimeMillis();
+
+				long diff = (currentTime - previousTime.get());
+
+				System.out.println("Differential: " + diff);
+
+				// check that there is at least a differential of half the interval
+				// because we can't really guarantee that machine time will accurately
+				// reflect the timeouts we've set
+				if (diff < (interval / 2)) {
+					result.set(false);
+				}
+
+				previousTime.set(currentTime);
+				counter.incrementAndGet();
+			}
+
+		};
+
+		requestAdvisor.eventSource("s1", null, handler);
+
+		handler.close();
+
+		Assert.assertTrue(
+			"The interval between events was too short. It means that the response was not properly streamed.",
+			result.get());
+		Assert.assertEquals(10, counter.get());
+	}
+
+	// Bug 493583
+	@Test
+	public void test_streamed_response_writer() throws Exception {
+		final long interval = 100L;
+
+		Servlet servlet1 = new BaseServlet() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			protected void service(HttpServletRequest request, HttpServletResponse response)
+				throws ServletException, IOException {
+
+		        response.setContentType("text/event-stream");
+		        response.setCharacterEncoding("UTF-8");
+
+				try (PrintWriter writer = response.getWriter()) {
+					for (int i = 1; i <= 10; ++i) {
+						try {
+							Thread.sleep(interval);
+						}
+						catch (InterruptedException e) {
+							throw new ServletException(e);
+						}
+
+						writer.print("data: ");
+						writer.print(System.currentTimeMillis());
+						writer.print("\n\n");
+						writer.flush();
+					}
+				}
+			}
+		};
+
+		Dictionary<String, Object> props = new Hashtable<String, Object>();
+		props.put(HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_PATTERN, "/s1/*");
+		registrations.add(getBundleContext().registerService(Servlet.class, servlet1, props));
+
+		final AtomicLong previousTime = new AtomicLong(System.currentTimeMillis());
+		final AtomicInteger counter = new AtomicInteger();
+		final AtomicBoolean result = new AtomicBoolean(true);
+
+		EventHandler handler = new EventHandler() {
+
+			@Override
+			public void handle(Map<String, String> eventMap) {
+				super.handle(eventMap);
+
+				long currentTime = System.currentTimeMillis();
+
+				long diff = (currentTime - previousTime.get());
+
+				System.out.println("Differential: " + diff);
+
+				// check that there is at least a differential of half the interval
+				// because we can't really guarantee that machine time will accurately
+				// reflect the timeouts we've set
+				if (diff < (interval / 2)) {
+					result.set(false);
+				}
+
+				previousTime.set(currentTime);
+				counter.incrementAndGet();
+			}
+
+		};
+
+		requestAdvisor.eventSource("s1", null, handler);
+
+		handler.close();
+
+		Assert.assertTrue(
+			"The interval between events was too short. It means that the response was not properly streamed.",
+			result.get());
+		Assert.assertEquals(10, counter.get());
+	}
+
 }
\ No newline at end of file
diff --git a/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/EventHandler.java b/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/EventHandler.java
new file mode 100644
index 0000000..47e6813
--- /dev/null
+++ b/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/EventHandler.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Copyright (c) 2016 Raymond Augé and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     Raymond Augé <raymond.auge@liferay.com> - initial implementation
+ *******************************************************************************/
+package org.eclipse.equinox.http.servlet.tests.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventHandler {
+
+	public void close() {
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+
+	public void handle(Map<String, String> eventMap) {
+		System.out.println("==event==\n" + eventMap.get("data"));
+	}
+
+	public void open(final InputStream inputStream) {
+		Runnable streamProcessorThread = new Runnable() {
+
+			@Override
+			public void run() {
+				System.out.println("==event stream opened==");
+
+				// Ref: https://html.spec.whatwg.org/multipage/comms.html#server-sent-events
+
+				Map<String, String> eventMap = new HashMap<String, String>();
+
+				BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+
+				String current;
+
+				try {
+					while ((current = reader.readLine()) != null) {
+						if (current.length() == 0) {
+							handle(eventMap);
+
+							eventMap = new HashMap<String, String>();
+
+							continue;
+						}
+
+						int colon = current.indexOf('\u003A');
+
+						if (colon == 0) {
+							// ignore comment lines
+
+							continue;
+						}
+						else if (colon < 0) {
+							// No colon? Entire line must be treated as the key with blank value
+
+							eventMap.put(current, "");
+
+							continue;
+						}
+
+						String key = current.substring(0, colon);
+						String value = current.substring(colon + 1);
+
+						if (value.startsWith("\u0020")) {
+							value = value.substring(1);
+						}
+
+						if (eventMap.containsKey(key)) {
+							String currentValue = eventMap.get(key);
+
+							value = currentValue + '\n' + value;
+						}
+
+						eventMap.put(key, value);
+					}
+				}
+				catch (IOException e) {
+					e.printStackTrace();
+				}
+				finally {
+					try {
+						inputStream.close();
+					}
+					catch (IOException e) {
+						e.printStackTrace();
+					}
+				}
+
+				// Ignore remaining content which is not a well formed event
+
+				System.out.println("==event stream closed==");
+			}
+
+		};
+
+		thread = new Thread(streamProcessorThread);
+
+		thread.start();
+	}
+
+	private Thread thread;
+
+}
\ No newline at end of file
diff --git a/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/ServletRequestAdvisor.java b/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/ServletRequestAdvisor.java
index 9fdc89c..cf5b4a9 100644
--- a/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/ServletRequestAdvisor.java
+++ b/bundles/org.eclipse.equinox.http.servlet.tests/src/org/eclipse/equinox/http/servlet/tests/util/ServletRequestAdvisor.java
@@ -137,6 +137,46 @@
 		}
 	}
 
+	public Map<String, List<String>> eventSource(String value, Map<String, List<String>> headers, final EventHandler handler) throws IOException {
+		String spec = createUrlSpec(value);
+		log("Requesting " + spec); //$NON-NLS-1$
+		URL url = new URL(spec);
+		HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+
+		connection.setChunkedStreamingMode(0);
+		connection.setDoOutput(true);
+		//connection.setRequestProperty("Connection", "Close");
+		connection.setInstanceFollowRedirects(false);
+		connection.setConnectTimeout(150 * 1000);
+		connection.setReadTimeout(150 * 1000);
+
+		if (headers != null) {
+			for(Map.Entry<String, List<String>> entry : headers.entrySet()) {
+				for(String entryValue : entry.getValue()) {
+					connection.setRequestProperty(entry.getKey(), entryValue);
+				}
+			}
+		}
+
+		int responseCode = connection.getResponseCode();
+
+		Map<String, List<String>> map = new HashMap<String, List<String>>(connection.getHeaderFields());
+		map.put("responseCode", Collections.singletonList(String.valueOf(responseCode)));
+
+		InputStream stream;
+
+		if (responseCode >= 400) {
+			stream = connection.getErrorStream();
+		}
+		else {
+			stream = connection.getInputStream();
+		}
+
+		handler.open(stream);
+
+		return map;
+	}
+
 	public Map<String, List<String>> upload(String value, Map<String, List<Object>> headers) throws IOException {
 		String spec = createUrlSpec(value);
 		log("Requesting " + spec); //$NON-NLS-1$
diff --git a/bundles/org.eclipse.equinox.http.servlet/src/org/eclipse/equinox/http/servlet/internal/servlet/HttpServletResponseWrapperImpl.java b/bundles/org.eclipse.equinox.http.servlet/src/org/eclipse/equinox/http/servlet/internal/servlet/HttpServletResponseWrapperImpl.java
index 0b7518d..a7c5f5f 100644
--- a/bundles/org.eclipse.equinox.http.servlet/src/org/eclipse/equinox/http/servlet/internal/servlet/HttpServletResponseWrapperImpl.java
+++ b/bundles/org.eclipse.equinox.http.servlet/src/org/eclipse/equinox/http/servlet/internal/servlet/HttpServletResponseWrapperImpl.java
@@ -99,6 +99,16 @@
 		}
 
 		@Override
+		public void close() throws IOException {
+			originalOutputStream.close();
+		}
+
+		@Override
+		public void flush() throws IOException {
+			originalOutputStream.flush();
+		}
+
+		@Override
 		public boolean isReady() {
 			return originalOutputStream.isReady();
 		}