433262 - WebSocket / Advanced close use cases
+ ClientCloseTest implementation of various outlined use cases.
diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java
index 3b91b38..d7b9ced 100644
--- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java
+++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java
@@ -31,11 +31,10 @@
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata;
-public abstract class AbstractJsrEventDriver extends AbstractEventDriver implements EventDriver
+public abstract class AbstractJsrEventDriver extends AbstractEventDriver
{
protected final EndpointMetadata metadata;
protected final EndpointConfig config;
diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java
index a550c20..0cf9df7 100644
--- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java
+++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java
@@ -23,6 +23,7 @@
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
+
import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
@@ -31,7 +32,6 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
@@ -44,7 +44,7 @@
/**
* Base implementation for JSR-356 Annotated event drivers.
*/
-public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
+public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
{
private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
private final JsrEvents<?, ?> events;
diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java
index b977147..9ac4906 100644
--- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java
+++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java
@@ -23,6 +23,7 @@
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
+
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
@@ -34,7 +35,6 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.jsr356.JsrPongMessage;
@@ -49,7 +49,7 @@
/**
* EventDriver for websocket that extend from {@link javax.websocket.Endpoint}
*/
-public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements EventDriver
+public class JsrEndpointEventDriver extends AbstractJsrEventDriver
{
private static final Logger LOG = Log.getLogger(JsrEndpointEventDriver.class);
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java
index da112f9..ab20d77 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java
@@ -32,6 +32,7 @@
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
@@ -158,7 +159,7 @@
sessions.add(session);
}
- private void closeAllConnections()
+ private void shutdownAllConnections()
{
for (WebSocketSession session : sessions)
{
@@ -166,11 +167,13 @@
{
try
{
- session.getConnection().close();
+ session.getConnection().close(
+ StatusCode.SHUTDOWN,
+ "Shutdown");
}
catch (Throwable t)
{
- LOG.debug("During Close All Connections",t);
+ LOG.debug("During Shutdown All Connections",t);
}
}
}
@@ -203,7 +206,7 @@
@Override
protected void doStop() throws Exception
{
- closeAllConnections();
+ shutdownAllConnections();
sessions.clear();
super.doStop();
removeBean(selector);
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java
index 928e911..6616f54 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java
@@ -98,7 +98,7 @@
else
{
// Standard "ws://"
- endPoint.setIdleTimeout(connectPromise.getClient().getMaxIdleTimeout());
+ endPoint.setIdleTimeout(connectPromise.getDriver().getPolicy().getIdleTimeout());
return newUpgradeConnection(channel,endPoint,connectPromise);
}
}
@@ -139,4 +139,9 @@
{
this.sslContextFactory = sslContextFactory;
}
+
+ public WebSocketPolicy getPolicy()
+ {
+ return policy;
+ }
}
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java
new file mode 100644
index 0000000..3a7f338
--- /dev/null
+++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java
@@ -0,0 +1,626 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.client;
+
+import static org.hamcrest.Matchers.*;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.EofException;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
+import org.eclipse.jetty.toolchain.test.EventQueue;
+import org.eclipse.jetty.toolchain.test.TestTracker;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
+import org.eclipse.jetty.websocket.api.ProtocolException;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.client.io.ConnectionManager;
+import org.eclipse.jetty.websocket.client.io.WebSocketClientSelectorManager;
+import org.eclipse.jetty.websocket.common.CloseInfo;
+import org.eclipse.jetty.websocket.common.OpCode;
+import org.eclipse.jetty.websocket.common.WebSocketFrame;
+import org.eclipse.jetty.websocket.common.WebSocketSession;
+import org.eclipse.jetty.websocket.common.frames.TextFrame;
+import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
+import org.eclipse.jetty.websocket.common.test.BlockheadServer;
+import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
+import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
+import org.eclipse.jetty.websocket.common.test.RawFrameBuilder;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ClientCloseTest
+{
+ private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
+
+ private static class CloseTrackingSocket extends WebSocketAdapter
+ {
+ private static final Logger LOG = Log.getLogger(ClientCloseTest.CloseTrackingSocket.class);
+
+ public int closeCode = -1;
+ public String closeReason = null;
+ public CountDownLatch closeLatch = new CountDownLatch(1);
+ public CountDownLatch openLatch = new CountDownLatch(1);
+
+ public EventQueue<String> messageQueue = new EventQueue<>();
+ public EventQueue<Throwable> errorQueue = new EventQueue<>();
+
+ public void assertNoCloseEvent()
+ {
+ Assert.assertThat("Client Close Event",closeLatch.getCount(),is(1L));
+ Assert.assertThat("Client Close Event Status Code ",closeCode,is(-1));
+ }
+
+ public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher<Integer> statusCodeMatcher, Matcher<String> reasonMatcher)
+ throws InterruptedException
+ {
+ long maxTimeout = clientTimeoutMs * 2;
+
+ Assert.assertThat("Client Close Event Occurred",closeLatch.await(maxTimeout,TimeUnit.MILLISECONDS),is(true));
+ Assert.assertThat("Client Close Event Status Code",closeCode,statusCodeMatcher);
+ if (reasonMatcher == null)
+ {
+ Assert.assertThat("Client Close Event Reason",closeReason,nullValue());
+ }
+ else
+ {
+ Assert.assertThat("Client Close Event Reason",closeReason,reasonMatcher);
+ }
+ }
+
+ public void assertReceivedError(Class<? extends Throwable> expectedThrownClass, Matcher<String> messageMatcher) throws TimeoutException,
+ InterruptedException
+ {
+ errorQueue.awaitEventCount(1,500,TimeUnit.MILLISECONDS);
+ Throwable actual = errorQueue.poll();
+ Assert.assertThat("Client Error Event",actual,instanceOf(expectedThrownClass));
+ if (messageMatcher == null)
+ {
+ Assert.assertThat("Client Error Event Message",actual.getMessage(),nullValue());
+ }
+ else
+ {
+ Assert.assertThat("Client Error Event Message",actual.getMessage(),messageMatcher);
+ }
+ }
+
+ public void clearQueues()
+ {
+ messageQueue.clear();
+ errorQueue.clear();
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason)
+ {
+ LOG.debug("onWebSocketClose({},{})",statusCode,reason);
+ super.onWebSocketClose(statusCode,reason);
+ closeCode = statusCode;
+ closeReason = reason;
+ closeLatch.countDown();
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session)
+ {
+ super.onWebSocketConnect(session);
+ openLatch.countDown();
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause)
+ {
+ LOG.debug("onWebSocketError",cause);
+ Assert.assertThat("Error capture",errorQueue.offer(cause),is(true));
+ }
+
+ @Override
+ public void onWebSocketText(String message)
+ {
+ LOG.debug("onWebSocketText({})",message);
+ messageQueue.offer(message);
+ }
+
+ public EndPoint getEndPoint() throws Exception
+ {
+ Session session = getSession();
+ Assert.assertThat("Session type",session,instanceOf(WebSocketSession.class));
+
+ WebSocketSession wssession = (WebSocketSession)session;
+ Field fld = wssession.getClass().getDeclaredField("connection");
+ fld.setAccessible(true);
+ Assert.assertThat("Field: connection",fld,notNullValue());
+
+ Object val = fld.get(wssession);
+ Assert.assertThat("Connection type",val,instanceOf(AbstractWebSocketConnection.class));
+ @SuppressWarnings("resource")
+ AbstractWebSocketConnection wsconn = (AbstractWebSocketConnection)val;
+ return wsconn.getEndPoint();
+ }
+ }
+
+ @Rule
+ public TestTracker tt = new TestTracker();
+
+ private BlockheadServer server;
+ private WebSocketClient client;
+
+ private void confirmConnection(CloseTrackingSocket clientSocket, Future<Session> clientFuture, ServerConnection serverConn) throws Exception
+ {
+ // Wait for client connect on via future
+ clientFuture.get(500,TimeUnit.MILLISECONDS);
+
+ // Wait for client connect via client websocket
+ Assert.assertThat("Client WebSocket is Open",clientSocket.openLatch.await(500,TimeUnit.MILLISECONDS),is(true));
+
+ try
+ {
+ // Send message from client to server
+ final String echoMsg = "echo-test";
+ Future<Void> testFut = clientSocket.getRemote().sendStringByFuture(echoMsg);
+
+ // Wait for send future
+ testFut.get(500,TimeUnit.MILLISECONDS);
+
+ // Read Frame on server side
+ IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS);
+ serverCapture.assertNoErrors();
+ serverCapture.assertFrameCount(1);
+ WebSocketFrame frame = serverCapture.getFrames().poll();
+ Assert.assertThat("Server received frame",frame.getOpCode(),is(OpCode.TEXT));
+ Assert.assertThat("Server received frame payload",frame.getPayloadAsUTF8(),is(echoMsg));
+
+ // Server send echo reply
+ serverConn.write(new TextFrame().setPayload(echoMsg));
+
+ // Wait for received echo
+ clientSocket.messageQueue.awaitEventCount(1,1,TimeUnit.SECONDS);
+
+ // Verify received message
+ String recvMsg = clientSocket.messageQueue.poll();
+ Assert.assertThat("Received message",recvMsg,is(echoMsg));
+
+ // Verify that there are no errors
+ Assert.assertThat("Error events",clientSocket.errorQueue,empty());
+ }
+ finally
+ {
+ clientSocket.clearQueues();
+ }
+ }
+
+ private void confirmServerReceivedCloseFrame(ServerConnection serverConn, int expectedCloseCode, Matcher<String> closeReasonMatcher) throws IOException,
+ TimeoutException
+ {
+ IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS);
+ serverCapture.assertNoErrors();
+ serverCapture.assertFrameCount(1);
+ serverCapture.assertHasFrame(OpCode.CLOSE,1);
+ WebSocketFrame frame = serverCapture.getFrames().poll();
+ Assert.assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo closeInfo = new CloseInfo(frame);
+ Assert.assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode));
+ if (closeReasonMatcher == null)
+ {
+ Assert.assertThat("Server received close reason",closeInfo.getReason(),nullValue());
+ }
+ else
+ {
+ Assert.assertThat("Server received close reason",closeInfo.getReason(),closeReasonMatcher);
+ }
+ }
+
+ public static class TestWebSocketClient extends WebSocketClient
+ {
+ @Override
+ protected ConnectionManager newConnectionManager()
+ {
+ return new TestConnectionManager(this);
+ }
+ }
+
+ public static class TestConnectionManager extends ConnectionManager
+ {
+ public TestConnectionManager(WebSocketClient client)
+ {
+ super(client);
+ }
+
+ @Override
+ protected WebSocketClientSelectorManager newWebSocketClientSelectorManager(WebSocketClient client)
+ {
+ return new TestSelectorManager(client);
+ }
+ }
+
+ public static class TestSelectorManager extends WebSocketClientSelectorManager
+ {
+ public TestSelectorManager(WebSocketClient client)
+ {
+ super(client);
+ }
+
+ @Override
+ protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
+ {
+ return new TestEndPoint(channel,selectSet,selectionKey,getScheduler(),getPolicy().getIdleTimeout());
+ }
+ }
+
+ public static class TestEndPoint extends SelectChannelEndPoint
+ {
+ public AtomicBoolean congestedFlush = new AtomicBoolean(false);
+
+ public TestEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
+ {
+ super(channel,selector,key,scheduler,idleTimeout);
+ }
+
+ @Override
+ public boolean flush(ByteBuffer... buffers) throws IOException
+ {
+ boolean flushed = super.flush(buffers);
+ congestedFlush.set(!flushed);
+ return flushed;
+ }
+ }
+
+ @Before
+ public void startClient() throws Exception
+ {
+ client = new TestWebSocketClient();
+ client.start();
+ }
+
+ @Before
+ public void startServer() throws Exception
+ {
+ server = new BlockheadServer();
+ server.start();
+ }
+
+ @After
+ public void stopClient() throws Exception
+ {
+ if (client.isRunning())
+ {
+ client.stop();
+ }
+ }
+
+ @After
+ public void stopServer() throws Exception
+ {
+ server.stop();
+ }
+
+ @Test
+ public void testHalfClose() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ // Client connects
+ CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+ // Server accepts connect
+ ServerConnection serverConn = server.accept();
+ serverConn.upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+ // client sends close frame (code 1000, normal)
+ final String origCloseReason = "Normal Close";
+ clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+ // server receives close frame
+ confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
+
+ // server sends 2 messages
+ serverConn.write(new TextFrame().setPayload("Hello"));
+ serverConn.write(new TextFrame().setPayload("World"));
+
+ // server sends close frame (code 1000, no reason)
+ CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server");
+ serverConn.write(sclose.asFrame());
+
+ // client receives 2 messages
+ clientSocket.messageQueue.awaitEventCount(2,1,TimeUnit.SECONDS);
+
+ // Verify received messages
+ String recvMsg = clientSocket.messageQueue.poll();
+ Assert.assertThat("Received message 1",recvMsg,is("Hello"));
+ recvMsg = clientSocket.messageQueue.poll();
+ Assert.assertThat("Received message 2",recvMsg,is("World"));
+
+ // Verify that there are no errors
+ Assert.assertThat("Error events",clientSocket.errorQueue,empty());
+
+ // client close event on ws-endpoint
+ clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.NORMAL),containsString("From Server"));
+ }
+
+ @Test
+ public void testNetworkCongestion() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ // Client connects
+ CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+ // Server accepts connect
+ ServerConnection serverConn = server.accept();
+ serverConn.upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+ // client sends BIG frames (until it cannot write anymore)
+ // server must not read (for test purpose, in order to congest connection)
+ // when write is congested, client enqueue close frame
+ // client initiate write, but write never completes
+ EndPoint endp = clientSocket.getEndPoint();
+ Assert.assertThat("EndPoint is testable",endp,instanceOf(TestEndPoint.class));
+ TestEndPoint testendp = (TestEndPoint)endp;
+
+ char msg[] = new char[10240];
+ int writeCount = 0;
+ long writeSize = 0;
+ int i = 0;
+ while (!testendp.congestedFlush.get())
+ {
+ int z = i - ((i / 26) * 26);
+ char c = (char)('a' + z);
+ Arrays.fill(msg,c);
+ clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
+ writeCount++;
+ writeSize += msg.length;
+ }
+ LOG.debug("Wrote {} frames totalling {} bytes of payload before congestion kicked in",writeCount,writeSize);
+
+ // Verify that there are no errors
+ Assert.assertThat("Error events",clientSocket.errorQueue,empty());
+
+ // client idle timeout triggers close event on client ws-endpoint
+ // client close event on ws-endpoint
+ clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
+ }
+
+ @Test
+ public void testProtocolException() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ // Client connects
+ CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+ // Server accepts connect
+ ServerConnection serverConn = server.accept();
+ serverConn.upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+ // client should not have received close message (yet)
+ clientSocket.assertNoCloseEvent();
+
+ // server sends bad close frame (too big of a reason message)
+ byte msg[] = new byte[400];
+ Arrays.fill(msg,(byte)'x');
+ ByteBuffer bad = ByteBuffer.allocate(500);
+ RawFrameBuilder.putOpFin(bad,OpCode.CLOSE,true);
+ RawFrameBuilder.putLength(bad,msg.length + 2,false);
+ bad.putShort((short)StatusCode.NORMAL);
+ bad.put(msg);
+ BufferUtil.flipToFlush(bad,0);
+ serverConn.write(bad);
+
+ // client should have noticed the error
+ clientSocket.assertReceivedError(ProtocolException.class,containsString("Invalid control frame"));
+
+ // client parse invalid frame, notifies server of close (protocol error)
+ confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length")));
+
+ // server disconnects
+ serverConn.disconnect();
+
+ // client triggers close event on client ws-endpoint
+ clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
+ }
+
+ @Test
+ public void testReadEOF() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ // Client connects
+ CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+ // Server accepts connect
+ ServerConnection serverConn = server.accept();
+ serverConn.upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+ // client sends close frame
+ final String origCloseReason = "Normal Close";
+ clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+ // server receives close frame
+ confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
+
+ // client should not have received close message (yet)
+ clientSocket.assertNoCloseEvent();
+
+ // server shuts down connection (no frame reply)
+ serverConn.disconnect();
+
+ // client reads -1 (EOF)
+ // client triggers close event on client ws-endpoint
+ clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
+ }
+
+ @Test
+ public void testServerNoCloseHandshake() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ // Client connects
+ CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+ // Server accepts connect
+ ServerConnection serverConn = server.accept();
+ serverConn.upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+ // client sends close frame
+ final String origCloseReason = "Normal Close";
+ clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+ // server receives close frame
+ confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
+
+ // client should not have received close message (yet)
+ clientSocket.assertNoCloseEvent();
+
+ // server never sends close frame handshake
+ // server sits idle
+
+ // client idle timeout triggers close event on client ws-endpoint
+ // assert - close code==1006 (abnormal)
+ // assert - close reason message contains (timeout)
+ clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
+ }
+
+ @Test
+ public void testStopLifecycle() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ int clientCount = 3;
+ CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount];
+ ServerConnection serverConns[] = new ServerConnection[clientCount];
+
+ // Connect Multiple Clients
+ for (int i = 0; i < clientCount; i++)
+ {
+ // Client Request Upgrade
+ clientSockets[i] = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSockets[i],server.getWsUri());
+
+ // Server accepts connection
+ serverConns[i] = server.accept();
+ serverConns[i].upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]);
+ }
+
+ // client lifecycle stop
+ client.stop();
+
+ // clients send close frames (code 1001, shutdown)
+ for (int i = 0; i < clientCount; i++)
+ {
+ // server receives close frame
+ confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown"));
+ }
+
+ // clients disconnect
+ for (int i = 0; i < clientCount; i++)
+ {
+ clientSockets[i].assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Shutdown"));
+ }
+ }
+
+ @Test
+ public void testWriteException() throws Exception
+ {
+ // Set client timeout
+ final int timeout = 1000;
+ client.setMaxIdleTimeout(timeout);
+
+ // Client connects
+ CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+ Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+ // Server accepts connect
+ ServerConnection serverConn = server.accept();
+ serverConn.upgrade();
+
+ // client confirms connection via echo
+ confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+ // setup client endpoint for write failure (test only)
+ EndPoint endp = clientSocket.getEndPoint();
+ endp.shutdownOutput();
+
+ // client enqueue close frame
+ // client write failure
+ final String origCloseReason = "Normal Close";
+ clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+ clientSocket.assertReceivedError(EofException.class,null);
+
+ // client triggers close event on client ws-endpoint
+ // assert - close code==1006 (abnormal)
+ // assert - close reason message contains (write failure)
+ clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
+ }
+}
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java
index e06e883..2cea2fd 100644
--- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java
+++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java
@@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
-import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,7 +31,6 @@
{
private static final Logger LOG = Log.getLogger(ServerWriteThread.class);
private final ServerConnection conn;
- private Exchanger<String> exchanger;
private int slowness = -1;
private int messageCount = 100;
private String message = "Hello";
@@ -42,11 +40,6 @@
this.conn = conn;
}
- public Exchanger<String> getExchanger()
- {
- return exchanger;
- }
-
public String getMessage()
{
return message;
@@ -73,12 +66,6 @@
{
conn.write(new TextFrame().setPayload(message));
- if (exchanger != null)
- {
- // synchronized on exchange
- exchanger.exchange(message);
- }
-
m.incrementAndGet();
if (slowness > 0)
@@ -93,11 +80,6 @@
}
}
- public void setExchanger(Exchanger<String> exchanger)
- {
- this.exchanger = exchanger;
- }
-
public void setMessage(String message)
{
this.message = message;
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java
index 80720f3..4f56ec6 100644
--- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java
+++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java
@@ -49,7 +49,7 @@
public void startClient() throws Exception
{
client = new WebSocketClient();
- client.getPolicy().setIdleTimeout(60000);
+ client.setMaxIdleTimeout(60000);
client.start();
}
@@ -78,7 +78,7 @@
{
JettyTrackingSocket tsocket = new JettyTrackingSocket();
client.setMasker(new ZeroMasker());
- client.getPolicy().setIdleTimeout(60000);
+ client.setMaxIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<Session> future = client.connect(tsocket,wsUri);
@@ -123,41 +123,38 @@
@Slow
public void testServerSlowToSend() throws Exception
{
- // final Exchanger<String> exchanger = new Exchanger<String>();
- JettyTrackingSocket tsocket = new JettyTrackingSocket();
- // tsocket.messageExchanger = exchanger;
+ JettyTrackingSocket clientSocket = new JettyTrackingSocket();
client.setMasker(new ZeroMasker());
- client.getPolicy().setIdleTimeout(60000);
+ client.setMaxIdleTimeout(60000);
URI wsUri = server.getWsUri();
- Future<Session> future = client.connect(tsocket,wsUri);
+ Future<Session> clientConnectFuture = client.connect(clientSocket,wsUri);
- ServerConnection sconnection = server.accept();
- sconnection.setSoTimeout(60000);
- sconnection.upgrade();
+ ServerConnection serverConn = server.accept();
+ serverConn.setSoTimeout(60000);
+ serverConn.upgrade();
// Confirm connected
- future.get(500,TimeUnit.MILLISECONDS);
- tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
+ clientConnectFuture.get(500,TimeUnit.MILLISECONDS);
+ clientSocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have server write slowly.
int messageCount = 1000;
- ServerWriteThread writer = new ServerWriteThread(sconnection);
+ ServerWriteThread writer = new ServerWriteThread(serverConn);
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
- // writer.setExchanger(exchanger);
writer.setSlowness(10);
writer.start();
writer.join();
// Verify receive
- Assert.assertThat("Message Receive Count",tsocket.messageQueue.size(),is(messageCount));
+ Assert.assertThat("Message Receive Count",clientSocket.messageQueue.size(),is(messageCount));
// Close
- sconnection.close(StatusCode.NORMAL);
+ serverConn.close(StatusCode.NORMAL);
- Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
- tsocket.assertCloseCode(StatusCode.NORMAL);
+ Assert.assertTrue("Client Socket Closed",clientSocket.closeLatch.await(10,TimeUnit.SECONDS));
+ clientSocket.assertCloseCode(StatusCode.NORMAL);
}
}
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java
deleted file mode 100644
index 72c1fc1..0000000
--- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.client;
-
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-
-import java.net.URI;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.toolchain.test.TestTracker;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.StatusCode;
-import org.eclipse.jetty.websocket.common.test.BlockheadServer;
-import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Various tests for Timeout handling
- */
-public class TimeoutTest
-{
- @Rule
- public TestTracker tt = new TestTracker();
-
- private BlockheadServer server;
- private WebSocketClient client;
-
- @Before
- public void startClient() throws Exception
- {
- client = new WebSocketClient();
- client.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
- client.start();
- }
-
- @Before
- public void startServer() throws Exception
- {
- server = new BlockheadServer();
- server.start();
- }
-
- @After
- public void stopClient() throws Exception
- {
- client.stop();
- }
-
- @After
- public void stopServer() throws Exception
- {
- server.stop();
- }
-
- /**
- * In a situation where the upgrade/connection is successful, and there is no activity for a while, the idle timeout triggers on the client side and
- * automatically initiates a close handshake.
- */
- @Test
- public void testIdleDetectedByClient() throws Exception
- {
- JettyTrackingSocket wsocket = new JettyTrackingSocket();
-
- URI wsUri = server.getWsUri();
- client.setMaxIdleTimeout(1000);
- Future<Session> future = client.connect(wsocket,wsUri);
-
- ServerConnection ssocket = server.accept();
- ssocket.upgrade();
-
- try
- {
- ssocket.startEcho();
- // Validate that connect occurred
- future.get(500,TimeUnit.MILLISECONDS);
- wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
-
- // Wait for inactivity idle timeout.
- long start = System.currentTimeMillis();
- wsocket.waitForClose(2,TimeUnit.SECONDS);
- long end = System.currentTimeMillis();
- long dur = (end - start);
- // Make sure idle timeout takes less than 5 total seconds
- Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(3000L));
-
- // Client should see a close event, with abnormal status NO_CLOSE
- wsocket.assertCloseCode(StatusCode.ABNORMAL);
- }
- finally
- {
- ssocket.stopEcho();
- }
- }
-}
diff --git a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
index 7c9bd36..9668b13 100644
--- a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
+++ b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
@@ -1,12 +1,17 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.LEVEL=DEBUG
-# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
+# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG
+# org.eclipse.jetty.io.SelectChannelEndPoint.LEVEL=DEBUG
+# org.eclipse.jetty.io.IdleTimeout.LEVEL=DEBUG
+# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG
+# org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.client.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.common.io.WriteBytesProvider.LEVEL=DEBUG
+# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
+
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java
index 03a737c..f14c252 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
+import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
@@ -65,7 +66,8 @@
statusCode |= (data.get() & 0xFF) << 8;
statusCode |= (data.get() & 0xFF);
- if(validate) {
+ if (validate)
+ {
if ((statusCode < StatusCode.NORMAL) || (statusCode == StatusCode.UNDEFINED) || (statusCode == StatusCode.NO_CLOSE)
|| (statusCode == StatusCode.NO_CODE) || ((statusCode > 1011) && (statusCode <= 2999)) || (statusCode >= 5000))
{
@@ -120,7 +122,7 @@
public CloseInfo(int statusCode)
{
- this(statusCode, null);
+ this(statusCode,null);
}
public CloseInfo(int statusCode, String reason)
@@ -144,8 +146,9 @@
utf = StringUtil.getUtf8Bytes(reason);
len += utf.length;
}
-
- ByteBuffer buf = ByteBuffer.allocate(len);
+
+ ByteBuffer buf = BufferUtil.allocate(len);
+ BufferUtil.flipToFill(buf);
buf.put((byte)((statusCode >>> 8) & 0xFF));
buf.put((byte)((statusCode >>> 0) & 0xFF));
@@ -153,7 +156,7 @@
{
buf.put(utf,0,utf.length);
}
- buf.flip();
+ BufferUtil.flipToFlush(buf,0);
return buf;
}
@@ -162,7 +165,14 @@
{
CloseFrame frame = new CloseFrame();
frame.setFin(true);
- frame.setPayload(asByteBuffer());
+ if ((statusCode >= 1000) && (statusCode != StatusCode.NO_CLOSE) && (statusCode != StatusCode.NO_CODE))
+ {
+ if (statusCode == StatusCode.FAILED_TLS_HANDSHAKE)
+ {
+ throw new ProtocolException("Close Frame with status code " + statusCode + " not allowed (per RFC6455)");
+ }
+ frame.setPayload(asByteBuffer());
+ }
return frame;
}
@@ -180,10 +190,10 @@
{
return !((statusCode == StatusCode.NORMAL) || (statusCode == StatusCode.NO_CODE));
}
-
+
public boolean isAbnormal()
{
- return (statusCode == StatusCode.ABNORMAL);
+ return (statusCode != StatusCode.NORMAL);
}
@Override
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java
index 0cf7e5f..86195a0 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java
@@ -96,8 +96,9 @@
private void assertSanePayloadLength(long len)
{
- if (LOG.isDebugEnabled())
- LOG.debug("Payload Length: {} - {}",len,this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} Payload Length: {} - {}",policy.getBehavior(),len,this);
+ }
// Since we use ByteBuffer so often, having lengths over Integer.MAX_VALUE is really impossible.
if (len > Integer.MAX_VALUE)
@@ -239,7 +240,7 @@
incomingFramesHandler.incomingError(e);
}
- public void parse(ByteBuffer buffer)
+ public void parse(ByteBuffer buffer) throws WebSocketException
{
if (buffer.remaining() <= 0)
{
@@ -266,13 +267,20 @@
{
buffer.position(buffer.limit()); // consume remaining
reset();
+ // let session know
notifyWebSocketException(e);
+ // need to throw for proper close behavior in connection
+ throw e;
}
catch (Throwable t)
{
buffer.position(buffer.limit()); // consume remaining
reset();
- notifyWebSocketException(new WebSocketException(t));
+ // let session know
+ WebSocketException e = new WebSocketException(t);
+ notifyWebSocketException(e);
+ // need to throw for proper close behavior in connection
+ throw e;
}
}
@@ -299,7 +307,9 @@
private boolean parseFrame(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
+ {
LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
+ }
while (buffer.hasRemaining())
{
switch (state)
@@ -318,7 +328,8 @@
}
if (LOG.isDebugEnabled())
- LOG.debug("OpCode {}, fin={} rsv={}{}{}",
+ LOG.debug("{} OpCode {}, fin={} rsv={}{}{}",
+ policy.getBehavior(),
OpCode.name(opcode),
fin,
(isRsv1InUse()?'1':'.'),
@@ -412,11 +423,6 @@
throw new ProtocolException("RSV3 not allowed to be set");
}
}
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("OpCode {}, fin={} rsv=000",OpCode.name(opcode),fin);
- }
state = State.PAYLOAD_LEN;
break;
@@ -591,8 +597,9 @@
buffer.limit(limit);
buffer.position(buffer.position() + window.remaining());
- if (LOG.isDebugEnabled())
- LOG.debug("Window: {}",BufferUtil.toDetailString(window));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} Window: {}",policy.getBehavior(),BufferUtil.toDetailString(window));
+ }
maskProcessor.process(window);
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
index 460d1d7..04637e3 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
@@ -41,6 +41,7 @@
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
+import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@@ -90,20 +91,19 @@
@Override
public void close()
{
- this.close(StatusCode.NORMAL, null);
+ this.close(StatusCode.NORMAL,null);
}
@Override
public void close(CloseStatus closeStatus)
{
- this.close(closeStatus.getCode(), closeStatus.getPhrase());
+ this.close(closeStatus.getCode(),closeStatus.getPhrase());
}
@Override
public void close(int statusCode, String reason)
{
- connection.close(statusCode, reason);
- notifyClose(statusCode, reason);
+ connection.close(statusCode,reason);
}
/**
@@ -115,7 +115,7 @@
connection.disconnect();
// notify of harsh disconnect
- notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect");
+ notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
}
public void dispatch(Runnable runnable)
@@ -130,7 +130,7 @@
out.append(indent).append(" +- incomingHandler : ");
if (incomingHandler instanceof Dumpable)
{
- ((Dumpable)incomingHandler).dump(out, indent + " ");
+ ((Dumpable)incomingHandler).dump(out,indent + " ");
}
else
{
@@ -140,7 +140,7 @@
out.append(indent).append(" +- outgoingHandler : ");
if (outgoingHandler instanceof Dumpable)
{
- ((Dumpable)outgoingHandler).dump(out, indent + " ");
+ ((Dumpable)outgoingHandler).dump(out,indent + " ");
}
else
{
@@ -273,7 +273,7 @@
{
final int prime = 31;
int result = 1;
- result = (prime * result) + ((connection == null) ? 0 : connection.hashCode());
+ result = (prime * result) + ((connection == null)?0:connection.hashCode());
return result;
}
@@ -328,7 +328,11 @@
public void notifyClose(int statusCode, String reason)
{
- websocket.onClose(new CloseInfo(statusCode, reason));
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("notifyClose({},{})",statusCode,reason);
+ }
+ websocket.onClose(new CloseInfo(statusCode,reason));
}
public void notifyError(Throwable cause)
@@ -342,12 +346,13 @@
{
switch (state)
{
- case CLOSING:
+ case CLOSED:
// notify session listeners
for (SessionListener listener : sessionListeners)
{
try
{
+ LOG.debug("{}.onSessionClosed()",listener.getClass().getSimpleName());
listener.onSessionClosed(this);
}
catch (Throwable t)
@@ -355,12 +360,10 @@
LOG.ignore(t);
}
}
- break;
- case CLOSED:
IOState ioState = this.connection.getIOState();
CloseInfo close = ioState.getCloseInfo();
// confirmed close of local endpoint
- notifyClose(close.getStatusCode(), close.getReason());
+ notifyClose(close.getStatusCode(),close.getReason());
break;
case OPEN:
// notify session listeners
@@ -394,17 +397,32 @@
connection.getIOState().onConnected();
// Connect remote
- remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchMode());
+ remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
- // Open WebSocket
- websocket.openSession(this);
-
- // Open connection
- connection.getIOState().onOpened();
-
- if (LOG.isDebugEnabled())
+ try
{
- LOG.debug("open -> {}", dump());
+ // Open WebSocket
+ websocket.openSession(this);
+
+ // Open connection
+ connection.getIOState().onOpened();
+
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("open -> {}",dump());
+ }
+ }
+ catch (Throwable t)
+ {
+ // Exception on end-user WS-Endpoint.
+ // Fast-fail & close connection with reason.
+ int statusCode = StatusCode.SERVER_ERROR;
+ if(policy.getBehavior() == WebSocketBehavior.CLIENT)
+ {
+ statusCode = StatusCode.POLICY_VIOLATION;
+ }
+
+ close(statusCode,t.getMessage());
}
}
@@ -444,11 +462,11 @@
List<String> values = entry.getValue();
if (values != null)
{
- this.parameterMap.put(entry.getKey(), values.toArray(new String[values.size()]));
+ this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
}
else
{
- this.parameterMap.put(entry.getKey(), new String[0]);
+ this.parameterMap.put(entry.getKey(),new String[0]);
}
}
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java
index 8cc60f0..7a55b2d 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java
@@ -88,13 +88,7 @@
{
if (LOG.isDebugEnabled())
{
- LOG.debug("incoming(WebSocketException)",e);
- }
-
- if (e instanceof CloseException)
- {
- CloseException close = (CloseException)e;
- terminateConnection(close.getStatusCode(),close.getMessage());
+ LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
}
onError(e);
@@ -105,7 +99,7 @@
{
if (LOG.isDebugEnabled())
{
- LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
+ LOG.debug("incomingFrame({})",frame);
}
try
@@ -226,6 +220,7 @@
catch (Throwable t)
{
unhandled(t);
+ throw t;
}
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
index c9f98bb..cea5909 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
@@ -32,7 +32,6 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
-import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@@ -54,13 +53,13 @@
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.LogicalConnection;
+import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
- * Provides the implementation of {@link LogicalConnection} within the
- * framework of the new {@link Connection} framework of {@code jetty-io}.
+ * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of {@code jetty-io}.
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
{
@@ -68,7 +67,7 @@
{
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
{
- super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8);
+ super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
}
@Override
@@ -106,7 +105,7 @@
// Abnormal Close
reason = CloseStatus.trimMaxReasonLength(reason);
session.notifyError(x);
- session.notifyClose(StatusCode.NO_CLOSE,reason);
+ session.notifyClose(StatusCode.ABNORMAL,reason);
disconnect(); // disconnect endpoint & connection
}
@@ -116,10 +115,11 @@
{
private final boolean outputOnly;
- public OnDisconnectCallback(boolean outputOnly) {
+ public OnDisconnectCallback(boolean outputOnly)
+ {
this.outputOnly = outputOnly;
}
-
+
@Override
public void writeFailed(Throwable x)
{
@@ -218,10 +218,10 @@
@Override
public void close(int statusCode, String reason)
{
+ LOG.debug("close({},{})",statusCode,reason);
CloseInfo close = new CloseInfo(statusCode,reason);
if (statusCode == StatusCode.ABNORMAL)
{
- flusher.close(); // TODO this makes the IdleTimeoutTest pass, but I'm dubious it is the correct way
ioState.onAbnormalClose(close);
}
else
@@ -230,7 +230,6 @@
}
}
-
@Override
public void disconnect()
{
@@ -366,7 +365,9 @@
@Override
public void onClose()
{
+ LOG.debug("{} onClose()",policy.getBehavior());
super.onClose();
+ // ioState.onDisconnected();
flusher.close();
}
@@ -385,18 +386,15 @@
{
// Fire out a close frame, indicating abnormal shutdown, then disconnect
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
- outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false), BatchMode.OFF);
+ outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
}
- else
- {
- // Just disconnect
- this.disconnect(false);
- }
+ // Just disconnect
+ this.disconnect(false);
break;
case CLOSING:
CloseInfo close = ioState.getCloseInfo();
// reply to close handshake from remote
- outgoingFrame(close.asFrame(),new OnDisconnectCallback(true), BatchMode.OFF);
+ outgoingFrame(close.asFrame(),new OnDisconnectCallback(true),BatchMode.OFF);
default:
break;
}
@@ -447,20 +445,26 @@
@Override
protected boolean onReadTimeout()
{
- LOG.debug("{} Read Timeout",policy.getBehavior());
-
IOState state = getIOState();
- if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
+ ConnectionState cstate = state.getConnectionState();
+ LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
+
+ if (cstate == ConnectionState.CLOSED)
{
- // close already initiated, extra timeouts not relevant
+ // close already completed, extra timeouts not relevant
// allow underlying connection and endpoint to disconnect on its own
return true;
}
- // Initiate close - politely send close frame.
- session.notifyError(new SocketTimeoutException("Timeout on Read"));
- // This is an Abnormal Close condition
- close(StatusCode.ABNORMAL,"Idle Timeout");
+ try
+ {
+ session.notifyError(new SocketTimeoutException("Timeout on Read"));
+ }
+ finally
+ {
+ // This is an Abnormal Close condition
+ close(StatusCode.ABNORMAL,"Idle Timeout");
+ }
return false;
}
@@ -476,7 +480,21 @@
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
- flusher.enqueue(frame,callback, batchMode);
+ CloseInfo close = null;
+ // grab a copy of the frame details before masking and whatnot
+ if (frame.getOpCode() == OpCode.CLOSE)
+ {
+ close = new CloseInfo(frame);
+ }
+
+ flusher.enqueue(frame,callback,batchMode);
+
+ // now trigger local close
+ if (close != null)
+ {
+ LOG.debug("outgoing CLOSE frame - {}: {}",frame,close);
+ ioState.onCloseLocal(close);
+ }
}
private int read(ByteBuffer buffer)
@@ -504,7 +522,6 @@
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
- // TODO: has the end user application already consumed what it was given?
}
}
}
@@ -520,6 +537,12 @@
close(e.getStatusCode(),e.getMessage());
return -1;
}
+ catch (Throwable t)
+ {
+ LOG.warn(t);
+ close(StatusCode.ABNORMAL,t.getMessage());
+ return -1;
+ }
}
@Override
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
index a7bb3cb..6e995e5 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
@@ -29,7 +29,6 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -45,246 +44,22 @@
*/
public class FrameFlusher
{
- public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
- private static final Logger LOG = Log.getLogger(FrameFlusher.class);
-
- private final ByteBufferPool bufferPool;
- private final EndPoint endpoint;
- private final int bufferSize;
- private final Generator generator;
- private final int maxGather;
- private final Object lock = new Object();
- private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16, 16, lock);
- private final Flusher flusher = new Flusher();
- private final AtomicBoolean closed = new AtomicBoolean();
- private volatile Throwable failure;
-
- public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
- {
- this.bufferPool = bufferPool;
- this.endpoint = endpoint;
- this.bufferSize = bufferSize;
- this.generator = Objects.requireNonNull(generator);
- this.maxGather = maxGather;
- }
-
- public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
- {
- if (closed.get())
- {
- notifyCallbackFailure(callback, new EOFException("Connection has been closed locally"));
- return;
- }
- if (flusher.isFailed())
- {
- notifyCallbackFailure(callback, failure);
- return;
- }
-
- FrameEntry entry = new FrameEntry(frame, callback, batchMode);
-
- synchronized (lock)
- {
- switch (frame.getOpCode())
- {
- case OpCode.PING:
- {
- // Prepend PINGs so they are processed first.
- queue.add(0, entry);
- break;
- }
- case OpCode.CLOSE:
- {
- // There may be a chance that other frames are
- // added after this close frame, but we will
- // fail them later to keep it simple here.
- closed.set(true);
- queue.add(entry);
- break;
- }
- default:
- {
- queue.add(entry);
- break;
- }
- }
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("{} queued {}", this, entry);
-
- flusher.iterate();
- }
-
- public void close()
- {
- if (closed.compareAndSet(false, true))
- {
- LOG.debug("{} closing {}", this);
- EOFException eof = new EOFException("Connection has been closed locally");
- flusher.failed(eof);
-
- // Fail also queued entries.
- List<FrameEntry> entries = new ArrayList<>();
- synchronized (lock)
- {
- entries.addAll(queue);
- queue.clear();
- }
- // Notify outside sync block.
- for (FrameEntry entry : entries)
- notifyCallbackFailure(entry.callback, eof);
- }
- }
-
- protected void onFailure(Throwable x)
- {
- LOG.warn(x);
- }
-
- protected void notifyCallbackSuccess(WriteCallback callback)
- {
- try
- {
- if (callback != null)
- callback.writeSuccess();
- }
- catch (Throwable x)
- {
- LOG.debug("Exception while notifying success of callback " + callback, x);
- }
- }
-
- protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
- {
- try
- {
- if (callback != null)
- callback.writeFailed(failure);
- }
- catch (Throwable x)
- {
- LOG.debug("Exception while notifying failure of callback " + callback, x);
- }
- }
-
- @Override
- public String toString()
- {
- ByteBuffer aggregate = flusher.aggregate;
- return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",
- getClass().getSimpleName(),
- queue.size(),
- aggregate == null ? 0 : aggregate.position(),
- failure);
- }
-
private class Flusher extends IteratingCallback
{
private final List<FrameEntry> entries = new ArrayList<>(maxGather);
- private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
+ private final List<ByteBuffer> buffers = new ArrayList<>((maxGather * 2) + 1);
private ByteBuffer aggregate;
private BatchMode batchMode;
- @Override
- protected Action process() throws Exception
- {
- int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
- BatchMode currentBatchMode = BatchMode.AUTO;
- synchronized (lock)
- {
- while (entries.size() <= maxGather && !queue.isEmpty())
- {
- FrameEntry entry = queue.remove(0);
- currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
-
- // Force flush if we need to.
- if (entry.frame == FLUSH_FRAME)
- currentBatchMode = BatchMode.OFF;
-
- int payloadLength = BufferUtil.length(entry.frame.getPayload());
- int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
-
- // If it is a "big" frame, avoid copying into the aggregate buffer.
- if (approxFrameLength > (bufferSize >> 2))
- currentBatchMode = BatchMode.OFF;
-
- // If the aggregate buffer overflows, do not batch.
- space -= approxFrameLength;
- if (space <= 0)
- currentBatchMode = BatchMode.OFF;
-
- entries.add(entry);
- }
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries);
-
- if (entries.isEmpty())
- {
- if (batchMode != BatchMode.AUTO)
- {
- // Nothing more to do, release the aggregate buffer if we need to.
- // Releasing it here rather than in succeeded() allows for its reuse.
- releaseAggregate();
- return Action.IDLE;
- }
-
- LOG.debug("{} auto flushing", FrameFlusher.this);
- return flush();
- }
-
- batchMode = currentBatchMode;
-
- return currentBatchMode == BatchMode.OFF ? flush() : batch();
- }
-
- private Action flush()
- {
- if (!BufferUtil.isEmpty(aggregate))
- {
- buffers.add(aggregate);
- if (LOG.isDebugEnabled())
- LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
- }
-
- // Do not allocate the iterator here.
- for (int i = 0; i < entries.size(); ++i)
- {
- FrameEntry entry = entries.get(i);
- // Skip the "synthetic" frame used for flushing.
- if (entry.frame == FLUSH_FRAME)
- continue;
- buffers.add(entry.generateHeaderBytes());
- ByteBuffer payload = entry.frame.getPayload();
- if (BufferUtil.hasContent(payload))
- buffers.add(payload);
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
-
- if (buffers.isEmpty())
- {
- releaseAggregate();
- // We may have the FLUSH_FRAME to notify.
- succeedEntries();
- return Action.IDLE;
- }
-
- endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
- buffers.clear();
- return Action.SCHEDULED;
- }
-
private Action batch()
{
if (aggregate == null)
{
- aggregate = bufferPool.acquire(bufferSize, true);
+ aggregate = bufferPool.acquire(bufferSize,true);
if (LOG.isDebugEnabled())
- LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
+ {
+ LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
+ }
}
// Do not allocate the iterator here.
@@ -296,17 +71,149 @@
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
- BufferUtil.append(aggregate, payload);
+ {
+ BufferUtil.append(aggregate,payload);
+ }
}
if (LOG.isDebugEnabled())
- LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
+ {
+ LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
+ }
succeeded();
return Action.SCHEDULED;
}
+ @Override
+ protected void completed()
+ {
+ // This IteratingCallback never completes.
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ for (FrameEntry entry : entries)
+ {
+ notifyCallbackFailure(entry.callback,x);
+ entry.release();
+ }
+ entries.clear();
+ super.failed(x);
+ failure = x;
+ onFailure(x);
+ }
+
+ private Action flush()
+ {
+ if (!BufferUtil.isEmpty(aggregate))
+ {
+ buffers.add(aggregate);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
+ }
+ }
+
+ // Do not allocate the iterator here.
+ for (int i = 0; i < entries.size(); ++i)
+ {
+ FrameEntry entry = entries.get(i);
+ // Skip the "synthetic" frame used for flushing.
+ if (entry.frame == FLUSH_FRAME)
+ {
+ continue;
+ }
+ buffers.add(entry.generateHeaderBytes());
+ ByteBuffer payload = entry.frame.getPayload();
+ if (BufferUtil.hasContent(payload))
+ {
+ buffers.add(payload);
+ }
+ }
+
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
+ }
+
+ if (buffers.isEmpty())
+ {
+ releaseAggregate();
+ // We may have the FLUSH_FRAME to notify.
+ succeedEntries();
+ return Action.IDLE;
+ }
+
+ endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
+ buffers.clear();
+ return Action.SCHEDULED;
+ }
+
+ @Override
+ protected Action process() throws Exception
+ {
+ int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
+ BatchMode currentBatchMode = BatchMode.AUTO;
+ synchronized (lock)
+ {
+ while ((entries.size() <= maxGather) && !queue.isEmpty())
+ {
+ FrameEntry entry = queue.remove(0);
+ currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
+
+ // Force flush if we need to.
+ if (entry.frame == FLUSH_FRAME)
+ {
+ currentBatchMode = BatchMode.OFF;
+ }
+
+ int payloadLength = BufferUtil.length(entry.frame.getPayload());
+ int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
+
+ // If it is a "big" frame, avoid copying into the aggregate buffer.
+ if (approxFrameLength > (bufferSize >> 2))
+ {
+ currentBatchMode = BatchMode.OFF;
+ }
+
+ // If the aggregate buffer overflows, do not batch.
+ space -= approxFrameLength;
+ if (space <= 0)
+ {
+ currentBatchMode = BatchMode.OFF;
+ }
+
+ entries.add(entry);
+ }
+ }
+
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
+ }
+
+ if (entries.isEmpty())
+ {
+ if (batchMode != BatchMode.AUTO)
+ {
+ // Nothing more to do, release the aggregate buffer if we need to.
+ // Releasing it here rather than in succeeded() allows for its reuse.
+ releaseAggregate();
+ return Action.IDLE;
+ }
+
+ LOG.debug("{} auto flushing",FrameFlusher.this);
+ return flush();
+ }
+
+ batchMode = currentBatchMode;
+
+ return currentBatchMode == BatchMode.OFF?flush():batch();
+ }
+
private void releaseAggregate()
{
- if (aggregate != null && BufferUtil.isEmpty(aggregate))
+ if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
@@ -331,26 +238,6 @@
}
entries.clear();
}
-
- @Override
- protected void completed()
- {
- // This IteratingCallback never completes.
- }
-
- @Override
- public void failed(Throwable x)
- {
- for (FrameEntry entry : entries)
- {
- notifyCallbackFailure(entry.callback, x);
- entry.release();
- }
- entries.clear();
- super.failed(x);
- failure = x;
- onFailure(x);
- }
}
private class FrameEntry
@@ -374,7 +261,7 @@
private void generateHeaderBytes(ByteBuffer buffer)
{
- generator.generateHeaderBytes(frame, buffer);
+ generator.generateHeaderBytes(frame,buffer);
}
private void release()
@@ -389,7 +276,145 @@
@Override
public String toString()
{
- return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);
+ return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
}
}
+
+ public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
+ private static final Logger LOG = Log.getLogger(FrameFlusher.class);
+ private final ByteBufferPool bufferPool;
+ private final EndPoint endpoint;
+ private final int bufferSize;
+ private final Generator generator;
+ private final int maxGather;
+ private final Object lock = new Object();
+ private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
+ private final Flusher flusher = new Flusher();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private volatile Throwable failure;
+
+ public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
+ {
+ this.bufferPool = bufferPool;
+ this.endpoint = endpoint;
+ this.bufferSize = bufferSize;
+ this.generator = Objects.requireNonNull(generator);
+ this.maxGather = maxGather;
+ }
+
+ public void close()
+ {
+ if (closed.compareAndSet(false,true))
+ {
+ LOG.debug("{} closing {}",this);
+ EOFException eof = new EOFException("Connection has been closed locally");
+ flusher.failed(eof);
+
+ // Fail also queued entries.
+ List<FrameEntry> entries = new ArrayList<>();
+ synchronized (lock)
+ {
+ entries.addAll(queue);
+ queue.clear();
+ }
+ // Notify outside sync block.
+ for (FrameEntry entry : entries)
+ {
+ notifyCallbackFailure(entry.callback,eof);
+ }
+ }
+ }
+
+ public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
+ {
+ if (closed.get())
+ {
+ notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
+ return;
+ }
+ if (flusher.isFailed())
+ {
+ notifyCallbackFailure(callback,failure);
+ return;
+ }
+
+ FrameEntry entry = new FrameEntry(frame,callback,batchMode);
+
+ synchronized (lock)
+ {
+ switch (frame.getOpCode())
+ {
+ case OpCode.PING:
+ {
+ // Prepend PINGs so they are processed first.
+ queue.add(0,entry);
+ break;
+ }
+ case OpCode.CLOSE:
+ {
+ // There may be a chance that other frames are
+ // added after this close frame, but we will
+ // fail them later to keep it simple here.
+ closed.set(true);
+ queue.add(entry);
+ break;
+ }
+ default:
+ {
+ queue.add(entry);
+ break;
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("{} queued {}",this,entry);
+ }
+
+ flusher.iterate();
+ }
+
+ protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
+ {
+ try
+ {
+ if (callback != null)
+ {
+ callback.writeFailed(failure);
+ }
+ }
+ catch (Throwable x)
+ {
+ LOG.debug("Exception while notifying failure of callback " + callback,x);
+ }
+ }
+
+ protected void notifyCallbackSuccess(WriteCallback callback)
+ {
+ try
+ {
+ if (callback != null)
+ {
+ callback.writeSuccess();
+ }
+ }
+ catch (Throwable x)
+ {
+ LOG.debug("Exception while notifying success of callback " + callback,x);
+ }
+ }
+
+ protected void onFailure(Throwable x)
+ {
+ LOG.warn(x);
+ }
+
+ @Override
+ public String toString()
+ {
+ ByteBuffer aggregate = flusher.aggregate;
+ return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
+ failure);
+ }
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java
index 6d4dbc4..815d2d3 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java
@@ -139,6 +139,10 @@
{
for (ConnectionStateListener listener : listeners)
{
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
+ }
listener.onConnectionStateChange(state);
}
}
@@ -166,8 +170,7 @@
}
this.state = ConnectionState.CLOSED;
- if (closeInfo == null)
- this.closeInfo = close;
+ this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@@ -193,16 +196,16 @@
if (initialState == ConnectionState.CONNECTED)
{
- // fast close. a local close request from end-user onConnected() method
+ // fast close. a local close request from end-user onConnect/onOpen method
LOG.debug("FastClose in CONNECTED detected");
// Force the state open (to allow read/write to endpoint)
onOpened();
+ LOG.debug("FastClose continuing with Closure");
}
synchronized (this)
{
- if (closeInfo == null)
- closeInfo = close;
+ closeInfo = close;
boolean in = inputAvailable;
boolean out = outputAvailable;
@@ -236,7 +239,6 @@
LOG.debug("notifying state listeners: {}",event);
notifyStateListeners(event);
- /*
// if abnormal, we don't expect an answer.
if (close.isAbnormal())
{
@@ -253,7 +255,6 @@
notifyStateListeners(event);
return;
}
- */
}
}
@@ -272,8 +273,7 @@
return;
}
- if (closeInfo == null)
- closeInfo = close;
+ closeInfo = close;
boolean in = inputAvailable;
boolean out = outputAvailable;
@@ -360,7 +360,7 @@
// already opened
return;
}
-
+
if (this.state != ConnectionState.CONNECTED)
{
LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
@@ -394,12 +394,11 @@
return;
}
- CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
+ CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Read EOF");
this.cleanClose = false;
this.state = ConnectionState.CLOSED;
- if (closeInfo == null)
- this.closeInfo = close;
+ this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@@ -408,6 +407,58 @@
notifyStateListeners(event);
}
+ public void onDisconnected()
+ {
+ ConnectionState event = null;
+ synchronized (this)
+ {
+ if (this.state == ConnectionState.CLOSED)
+ {
+ // already closed
+ return;
+ }
+
+ CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
+
+ this.cleanClose = false;
+ this.state = ConnectionState.CLOSED;
+ this.closeInfo = close;
+ this.inputAvailable = false;
+ this.outputAvailable = false;
+ this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
+ event = this.state;
+ }
+ notifyStateListeners(event);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder str = new StringBuilder();
+ str.append(this.getClass().getSimpleName());
+ str.append("@").append(Integer.toHexString(hashCode()));
+ str.append("[").append(state);
+ str.append(',');
+ if (!inputAvailable)
+ {
+ str.append('!');
+ }
+ str.append("in,");
+ if (!outputAvailable)
+ {
+ str.append('!');
+ }
+ str.append("out");
+ if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
+ {
+ str.append(",close=").append(closeInfo);
+ str.append(",clean=").append(cleanClose);
+ str.append(",closeSource=").append(closeHandshakeSource);
+ }
+ str.append(']');
+ return str.toString();
+ }
+
public boolean wasAbnormalClose()
{
return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
@@ -427,4 +478,5 @@
{
return closeHandshakeSource == CloseHandshakeSource.REMOTE;
}
+
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java
index 32b9c1b..2194ec5 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java
@@ -392,4 +392,4 @@
}
return name;
}
-}
+}
\ No newline at end of file
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java
new file mode 100644
index 0000000..a40e306
--- /dev/null
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java
@@ -0,0 +1,166 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.common;
+
+import static org.eclipse.jetty.websocket.api.StatusCode.*;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.websocket.api.ProtocolException;
+import org.eclipse.jetty.websocket.common.frames.CloseFrame;
+import org.junit.Test;
+
+public class CloseInfoTest
+{
+ /**
+ * A test where no close is provided
+ */
+ @Test
+ public void testAnonymousClose()
+ {
+ CloseInfo close = new CloseInfo();
+ assertThat("close.code",close.getStatusCode(),is(NO_CODE));
+ assertThat("close.reason",close.getReason(),nullValue());
+
+ CloseFrame frame = close.asFrame();
+ assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+ // should result in no payload
+ assertThat("close frame has payload",frame.hasPayload(),is(false));
+ assertThat("close frame payload length",frame.getPayloadLength(),is(0));
+ }
+
+ /**
+ * A test where NO_CODE (1005) is provided
+ */
+ @Test
+ public void testNoCode()
+ {
+ CloseInfo close = new CloseInfo(NO_CODE);
+ assertThat("close.code",close.getStatusCode(),is(NO_CODE));
+ assertThat("close.reason",close.getReason(),nullValue());
+
+ CloseFrame frame = close.asFrame();
+ assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+ // should result in no payload
+ assertThat("close frame has payload",frame.hasPayload(),is(false));
+ assertThat("close frame payload length",frame.getPayloadLength(),is(0));
+ }
+
+ /**
+ * A test where NO_CLOSE (1006) is provided
+ */
+ @Test
+ public void testNoClose()
+ {
+ CloseInfo close = new CloseInfo(NO_CLOSE);
+ assertThat("close.code",close.getStatusCode(),is(NO_CLOSE));
+ assertThat("close.reason",close.getReason(),nullValue());
+
+ CloseFrame frame = close.asFrame();
+ assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+ // should result in no payload
+ assertThat("close frame has payload",frame.hasPayload(),is(false));
+ assertThat("close frame payload length",frame.getPayloadLength(),is(0));
+ }
+
+ /**
+ * A test of FAILED_TLS_HANDSHAKE (1007)
+ */
+ @Test
+ public void testFailedTlsHandshake()
+ {
+ CloseInfo close = new CloseInfo(FAILED_TLS_HANDSHAKE);
+ assertThat("close.code",close.getStatusCode(),is(FAILED_TLS_HANDSHAKE));
+ assertThat("close.reason",close.getReason(),nullValue());
+
+ try
+ {
+ @SuppressWarnings("unused")
+ CloseFrame frame = close.asFrame();
+ fail("Expected " + ProtocolException.class.getName());
+ }
+ catch (ProtocolException e)
+ {
+ // expected path
+ assertThat("ProtocolException message",e.getMessage(),containsString("not allowed (per RFC6455)"));
+ }
+ }
+
+ /**
+ * A test of NORMAL (1000)
+ */
+ @Test
+ public void testNormal()
+ {
+ CloseInfo close = new CloseInfo(NORMAL);
+ assertThat("close.code",close.getStatusCode(),is(NORMAL));
+ assertThat("close.reason",close.getReason(),nullValue());
+
+ CloseFrame frame = close.asFrame();
+ assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+ assertThat("close frame payload length",frame.getPayloadLength(),is(2));
+ }
+
+ private ByteBuffer asByteBuffer(int statusCode, String reason)
+ {
+ int len = 2; // status code length
+ byte utf[] = null;
+ if (StringUtil.isNotBlank(reason))
+ {
+ utf = StringUtil.getUtf8Bytes(reason);
+ len += utf.length;
+ }
+
+ ByteBuffer buf = BufferUtil.allocate(len);
+ BufferUtil.flipToFill(buf);
+ buf.put((byte)((statusCode >>> 8) & 0xFF));
+ buf.put((byte)((statusCode >>> 0) & 0xFF));
+
+ if (utf != null)
+ {
+ buf.put(utf,0,utf.length);
+ }
+ BufferUtil.flipToFlush(buf,0);
+
+ return buf;
+ }
+
+ @Test
+ public void testFromFrame()
+ {
+ ByteBuffer payload = asByteBuffer(NORMAL,null);
+ assertThat("payload length", payload.remaining(), is(2));
+ CloseFrame frame = new CloseFrame();
+ frame.setPayload(payload);
+
+ // create from frame
+ CloseInfo close = new CloseInfo(frame);
+ assertThat("close.code",close.getStatusCode(),is(NORMAL));
+ assertThat("close.reason",close.getReason(),nullValue());
+
+ // and back again
+ frame = close.asFrame();
+ assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+ assertThat("close frame payload length",frame.getPayloadLength(),is(2));
+ }
+}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java
index fe6011d..6b0f018 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.StacklessLogging;
+import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@@ -39,8 +40,7 @@
{
ByteBuffer expected = ByteBuffer.allocate(32);
- expected.put(new byte[]
- { (byte)0x8b, 0x00 });
+ expected.put(new byte[] { (byte)0x8b, 0x00 });
expected.flip();
@@ -50,10 +50,17 @@
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
- parser.parse(expected);
+ try
+ {
+ parser.parse(expected);
+ }
+ catch (ProtocolException ignore)
+ {
+ // ignore
+ }
}
- Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+ Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
@@ -65,8 +72,7 @@
{
ByteBuffer expected = ByteBuffer.allocate(32);
- expected.put(new byte[]
- { (byte)0x8c, 0x01, 0x00 });
+ expected.put(new byte[] { (byte)0x8c, 0x01, 0x00 });
expected.flip();
@@ -76,24 +82,29 @@
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
- parser.parse(expected);
+ try
+ {
+ parser.parse(expected);
+ }
+ catch (ProtocolException ignore)
+ {
+ // ignore
+ }
}
- Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+ Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 12"));
}
-
@Test
public void testParserNonControlOpCode3Case4_1_1() throws Exception
{
ByteBuffer expected = ByteBuffer.allocate(32);
- expected.put(new byte[]
- { (byte)0x83, 0x00 });
+ expected.put(new byte[] { (byte)0x83, 0x00 });
expected.flip();
@@ -103,10 +114,17 @@
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
- parser.parse(expected);
+ try
+ {
+ parser.parse(expected);
+ }
+ catch (ProtocolException ignore)
+ {
+ // ignore
+ }
}
- Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+ Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
@@ -118,8 +136,7 @@
{
ByteBuffer expected = ByteBuffer.allocate(32);
- expected.put(new byte[]
- { (byte)0x84, 0x01, 0x00 });
+ expected.put(new byte[] { (byte)0x84, 0x01, 0x00 });
expected.flip();
@@ -129,10 +146,17 @@
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
- parser.parse(expected);
+ try
+ {
+ parser.parse(expected);
+ }
+ catch (ProtocolException ignore)
+ {
+ // ignore
+ }
}
- Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+ Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java
index bd02460..dedba3e 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java
@@ -18,7 +18,8 @@
package org.eclipse.jetty.websocket.common.test;
-import java.io.Closeable;
+import static org.hamcrest.Matchers.*;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -68,10 +69,6 @@
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.junit.Assert;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-
/**
* A simple websocket client for performing unit tests with.
* <p>
@@ -84,7 +81,7 @@
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
* scope.
*/
-public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, Closeable
+public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, AutoCloseable
{
private static final String REQUEST_HASH_KEY = "dGhlIHNhbXBsZSBub25jZQ==";
private static final int BUFFER_SIZE = 8192;
@@ -182,22 +179,14 @@
public void close(int statusCode, String message)
{
+ LOG.debug("close({},{})",statusCode,message);
CloseInfo close = new CloseInfo(statusCode,message);
- ioState.onCloseLocal(close);
-
if (!ioState.isClosed())
{
- WebSocketFrame frame = close.asFrame();
- LOG.debug("Issuing: {}",frame);
- try
- {
- write(frame);
- }
- catch (IOException e)
- {
- LOG.debug(e);
- }
+ ioState.onCloseLocal(close);
+ } else {
+ LOG.debug("Not issuing close. ioState = {}",ioState);
}
}
@@ -429,13 +418,8 @@
{
LOG.info("Client parsed {} frames",count);
}
-
- if (frame.getOpCode() == OpCode.CLOSE)
- {
- CloseInfo close = new CloseInfo(frame);
- ioState.onCloseRemote(close);
- }
-
+
+ // Capture Frame Copy
WebSocketFrame copy = WebSocketFrame.copy(frame);
incomingFrames.incomingFrame(copy);
}
@@ -448,6 +432,7 @@
@Override
public void onConnectionStateChange(ConnectionState state)
{
+ LOG.debug("CLIENT onConnectionStateChange() - {}", state);
switch (state)
{
case CLOSED:
@@ -455,10 +440,17 @@
// this.disconnect();
break;
case CLOSING:
- if (ioState.wasRemoteCloseInitiated())
+ CloseInfo close = ioState.getCloseInfo();
+
+ WebSocketFrame frame = close.asFrame();
+ LOG.debug("Issuing: {}",frame);
+ try
{
- CloseInfo close = ioState.getCloseInfo();
- close(close.getStatusCode(),close.getReason());
+ write(frame);
+ }
+ catch (IOException e)
+ {
+ LOG.debug(e);
}
break;
default:
@@ -701,6 +693,7 @@
{
if (!ioState.isOpen())
{
+ LOG.debug("IO Not Open / Not Writing: {}",frame);
return;
}
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java
index c3cd1ad..1b665bd 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java
@@ -54,6 +54,7 @@
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
+import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
@@ -124,7 +125,6 @@
{
write(new CloseFrame());
flush();
- disconnect();
}
public void close(int statusCode) throws IOException
@@ -132,7 +132,6 @@
CloseInfo close = new CloseInfo(statusCode);
write(close.asFrame());
flush();
- disconnect();
}
public void disconnect()
@@ -229,6 +228,19 @@
CloseInfo close = new CloseInfo(frame);
LOG.debug("Close frame: {}",close);
}
+
+ Type type = frame.getType();
+ if (echoing.get() && (type.isData() || type.isContinuation()))
+ {
+ try
+ {
+ write(WebSocketFrame.copy(frame));
+ }
+ catch (IOException e)
+ {
+ LOG.warn(e);
+ }
+ }
}
@Override
@@ -317,9 +329,18 @@
return len;
}
+ /**
+ * @deprecated use {@link #readFrames(int, int, TimeUnit)} for correct parameter order
+ */
+ @Deprecated
public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
- LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
+ return readFrames(expectedCount,timeoutDuration,timeoutUnit);
+ }
+
+ public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException
+ {
+ LOG.debug("Read: waiting for {} frame(s) from client",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
@@ -562,13 +583,22 @@
public void write(Frame frame) throws IOException
{
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
- outgoing.outgoingFrame(frame,null, BatchMode.OFF);
+ outgoing.outgoingFrame(frame,null,BatchMode.OFF);
}
public void write(int b) throws IOException
{
getOutputStream().write(b);
}
+
+ public void write(ByteBuffer buf) throws IOException
+ {
+ byte arr[] = BufferUtil.toArray(buf);
+ if ((arr != null) && (arr.length > 0))
+ {
+ getOutputStream().write(arr);
+ }
+ }
}
private static final Logger LOG = Log.getLogger(BlockheadServer.class);
diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
index bebe7c0..f6a2f2c 100644
--- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
+++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
@@ -44,6 +44,7 @@
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
+import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@@ -199,11 +200,23 @@
}
}
- protected void closeAllConnections()
+ protected void shutdownAllConnections()
{
for (WebSocketSession session : openSessions)
{
- session.close();
+ if (session.getConnection() != null)
+ {
+ try
+ {
+ session.getConnection().close(
+ StatusCode.SHUTDOWN,
+ "Shutdown");
+ }
+ catch (Throwable t)
+ {
+ LOG.debug("During Shutdown All Connections",t);
+ }
+ }
}
openSessions.clear();
}
@@ -269,7 +282,7 @@
@Override
protected void doStop() throws Exception
{
- closeAllConnections();
+ shutdownAllConnections();
super.doStop();
}
diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
index 0c2ddb3..5a4707c 100644
--- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
+++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
@@ -114,7 +114,7 @@
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})",sess);
- sess.close();
+ sess.close(StatusCode.NORMAL,"FastCloseServer");
}
}
@@ -129,14 +129,10 @@
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})",sess);
+ // Test failure due to unhandled exception
+ // this should trigger a fast-fail closure during open/connect
throw new RuntimeException("Intentional FastFail");
}
-
- @Override
- public void onWebSocketError(Throwable cause)
- {
- errors.add(cause);
- }
}
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.class);
@@ -163,30 +159,28 @@
@Test
public void testFastClose() throws Exception
{
- BlockheadClient client = new BlockheadClient(server.getServerUri());
- client.setProtocols("fastclose");
- client.setTimeout(TimeUnit.SECONDS,1);
- try
+ try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
{
+ client.setProtocols("fastclose");
+ client.setTimeout(TimeUnit.SECONDS,1);
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
+ // Verify that client got close frame
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
-
+
+ // Notify server of close handshake
client.write(close.asFrame()); // respond with close
-
+
+ // ensure server socket got close event
Assert.assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
Assert.assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
}
- finally
- {
- client.close();
- }
}
/**
@@ -195,11 +189,10 @@
@Test
public void testFastFail() throws Exception
{
- BlockheadClient client = new BlockheadClient(server.getServerUri());
- client.setProtocols("fastfail");
- client.setTimeout(TimeUnit.SECONDS,1);
- try
+ try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
{
+ client.setProtocols("fastfail");
+ client.setTimeout(TimeUnit.SECONDS,1);
try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
{
client.connect();
@@ -214,14 +207,11 @@
client.write(close.asFrame()); // respond with close
+ // ensure server socket got close event
Assert.assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
Assert.assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR));
Assert.assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1));
}
}
- finally
- {
- client.close();
- }
}
}