blob: 9fa560b9829848d473e448bf2e8baccef2b305b0 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlets;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* <p>A servlet that implements the <a href="http://www.w3.org/TR/eventsource/">event source protocol</a>,
* also known as "server sent events".</p>
* <p>This servlet must be subclassed to implement abstract method {@link #newEventSource(HttpServletRequest)}
* to return an instance of {@link EventSource} that allows application to listen for event source events
* and to emit event source events.</p>
* <p>This servlet supports the following configuration parameters:</p>
* <ul>
* <li><code>heartBeatPeriod</code>, that specifies the heartbeat period, in seconds, used to check
* whether the connection has been closed by the client; defaults to 10 seconds.</li>
* </ul>
*
* <p>NOTE: there is currently no support for <code>last-event-id</code>.</p>
*/
public abstract class EventSourceServlet extends HttpServlet
{
private static final byte[] CRLF = new byte[]{'\r', '\n'};
private static final byte[] EVENT_FIELD = "event: ".getBytes(StandardCharsets.UTF_8);
private static final byte[] DATA_FIELD = "data: ".getBytes(StandardCharsets.UTF_8);
private static final byte[] COMMENT_FIELD = ": ".getBytes(StandardCharsets.UTF_8);
private ScheduledExecutorService scheduler;
private int heartBeatPeriod = 10;
@Override
public void init() throws ServletException
{
String heartBeatPeriodParam = getServletConfig().getInitParameter("heartBeatPeriod");
if (heartBeatPeriodParam != null)
heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);
scheduler = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void destroy()
{
if (scheduler != null)
scheduler.shutdown();
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
@SuppressWarnings("unchecked")
Enumeration<String> acceptValues = request.getHeaders("Accept");
while (acceptValues.hasMoreElements())
{
String accept = acceptValues.nextElement();
if (accept.equals("text/event-stream"))
{
EventSource eventSource = newEventSource(request);
if (eventSource == null)
{
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
else
{
respond(request, response);
AsyncContext async = request.startAsync();
// Infinite timeout because the continuation is never resumed,
// but only completed on close
async.setTimeout(0);
EventSourceEmitter emitter = new EventSourceEmitter(eventSource, async);
emitter.scheduleHeartBeat();
open(eventSource, emitter);
}
return;
}
}
super.doGet(request, response);
}
protected abstract EventSource newEventSource(HttpServletRequest request);
protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setStatus(HttpServletResponse.SC_OK);
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setContentType("text/event-stream");
// By adding this header, and not closing the connection,
// we disable HTTP chunking, and we can use write()+flush()
// to send data in the text/event-stream protocol
response.addHeader("Connection", "close");
response.flushBuffer();
}
protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException
{
eventSource.onOpen(emitter);
}
protected class EventSourceEmitter implements EventSource.Emitter, Runnable
{
private final EventSource eventSource;
private final AsyncContext async;
private final ServletOutputStream output;
private Future<?> heartBeat;
private boolean closed;
public EventSourceEmitter(EventSource eventSource, AsyncContext async) throws IOException
{
this.eventSource = eventSource;
this.async = async;
this.output = async.getResponse().getOutputStream();
}
@Override
public void event(String name, String data) throws IOException
{
synchronized (this)
{
output.write(EVENT_FIELD);
output.write(name.getBytes(StandardCharsets.UTF_8));
output.write(CRLF);
data(data);
}
}
@Override
public void data(String data) throws IOException
{
synchronized (this)
{
BufferedReader reader = new BufferedReader(new StringReader(data));
String line;
while ((line = reader.readLine()) != null)
{
output.write(DATA_FIELD);
output.write(line.getBytes(StandardCharsets.UTF_8));
output.write(CRLF);
}
output.write(CRLF);
flush();
}
}
@Override
public void comment(String comment) throws IOException
{
synchronized (this)
{
output.write(COMMENT_FIELD);
output.write(comment.getBytes(StandardCharsets.UTF_8));
output.write(CRLF);
output.write(CRLF);
flush();
}
}
@Override
public void run()
{
// If the other peer closes the connection, the first
// flush() should generate a TCP reset that is detected
// on the second flush()
try
{
synchronized (this)
{
output.write('\r');
flush();
output.write('\n');
flush();
}
// We could write, reschedule heartbeat
scheduleHeartBeat();
}
catch (IOException x)
{
// The other peer closed the connection
close();
eventSource.onClose();
}
}
protected void flush() throws IOException
{
async.getResponse().flushBuffer();
}
@Override
public void close()
{
synchronized (this)
{
closed = true;
heartBeat.cancel(false);
}
async.complete();
}
private void scheduleHeartBeat()
{
synchronized (this)
{
if (!closed)
heartBeat = scheduler.schedule(this, heartBeatPeriod, TimeUnit.SECONDS);
}
}
}
}