433262 - WebSocket / Advanced close use cases

+ ClientCloseTest implementation of various outlined use cases.
diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java
index 3b91b38..d7b9ced 100644
--- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java
+++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java
@@ -31,11 +31,10 @@
 import org.eclipse.jetty.websocket.common.CloseInfo;
 import org.eclipse.jetty.websocket.common.WebSocketSession;
 import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
 import org.eclipse.jetty.websocket.jsr356.JsrSession;
 import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata;
 
-public abstract class AbstractJsrEventDriver extends AbstractEventDriver implements EventDriver
+public abstract class AbstractJsrEventDriver extends AbstractEventDriver
 {
     protected final EndpointMetadata metadata;
     protected final EndpointConfig config;
diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java
index a550c20..0cf9df7 100644
--- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java
+++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java
@@ -23,6 +23,7 @@
 import java.io.Reader;
 import java.nio.ByteBuffer;
 import java.util.Map;
+
 import javax.websocket.CloseReason;
 import javax.websocket.DecodeException;
 
@@ -31,7 +32,6 @@
 import org.eclipse.jetty.util.log.Logger;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
 import org.eclipse.jetty.websocket.api.extensions.Frame;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
 import org.eclipse.jetty.websocket.common.message.MessageInputStream;
 import org.eclipse.jetty.websocket.common.message.MessageReader;
 import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
@@ -44,7 +44,7 @@
 /**
  * Base implementation for JSR-356 Annotated event drivers.
  */
-public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
+public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
 {
     private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
     private final JsrEvents<?, ?> events;
diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java
index b977147..9ac4906 100644
--- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java
+++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java
@@ -23,6 +23,7 @@
 import java.io.Reader;
 import java.nio.ByteBuffer;
 import java.util.Map;
+
 import javax.websocket.CloseReason;
 import javax.websocket.Endpoint;
 import javax.websocket.MessageHandler;
@@ -34,7 +35,6 @@
 import org.eclipse.jetty.util.log.Logger;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
 import org.eclipse.jetty.websocket.api.extensions.Frame;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
 import org.eclipse.jetty.websocket.common.message.MessageInputStream;
 import org.eclipse.jetty.websocket.common.message.MessageReader;
 import org.eclipse.jetty.websocket.jsr356.JsrPongMessage;
@@ -49,7 +49,7 @@
 /**
  * EventDriver for websocket that extend from {@link javax.websocket.Endpoint}
  */
-public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements EventDriver
+public class JsrEndpointEventDriver extends AbstractJsrEventDriver
 {
     private static final Logger LOG = Log.getLogger(JsrEndpointEventDriver.class);
 
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java
index da112f9..ab20d77 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java
@@ -32,6 +32,7 @@
 import org.eclipse.jetty.util.component.ContainerLifeCycle;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.api.StatusCode;
 import org.eclipse.jetty.websocket.api.WebSocketException;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
 import org.eclipse.jetty.websocket.client.WebSocketClient;
@@ -158,7 +159,7 @@
         sessions.add(session);
     }
 
-    private void closeAllConnections()
+    private void shutdownAllConnections()
     {
         for (WebSocketSession session : sessions)
         {
@@ -166,11 +167,13 @@
             {
                 try
                 {
-                    session.getConnection().close();
+                    session.getConnection().close(
+                            StatusCode.SHUTDOWN,
+                            "Shutdown");
                 }
                 catch (Throwable t)
                 {
-                    LOG.debug("During Close All Connections",t);
+                    LOG.debug("During Shutdown All Connections",t);
                 }
             }
         }
@@ -203,7 +206,7 @@
     @Override
     protected void doStop() throws Exception
     {
-        closeAllConnections();
+        shutdownAllConnections();
         sessions.clear();
         super.doStop();
         removeBean(selector);
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java
index 928e911..6616f54 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java
@@ -98,7 +98,7 @@
             else
             {
                 // Standard "ws://"
-                endPoint.setIdleTimeout(connectPromise.getClient().getMaxIdleTimeout());
+                endPoint.setIdleTimeout(connectPromise.getDriver().getPolicy().getIdleTimeout());
                 return newUpgradeConnection(channel,endPoint,connectPromise);
             }
         }
@@ -139,4 +139,9 @@
     {
         this.sslContextFactory = sslContextFactory;
     }
+    
+    public WebSocketPolicy getPolicy()
+    {
+        return policy;
+    }
 }
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java
new file mode 100644
index 0000000..3a7f338
--- /dev/null
+++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java
@@ -0,0 +1,626 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.websocket.client;
+
+import static org.hamcrest.Matchers.*;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.EofException;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
+import org.eclipse.jetty.toolchain.test.EventQueue;
+import org.eclipse.jetty.toolchain.test.TestTracker;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
+import org.eclipse.jetty.websocket.api.ProtocolException;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.client.io.ConnectionManager;
+import org.eclipse.jetty.websocket.client.io.WebSocketClientSelectorManager;
+import org.eclipse.jetty.websocket.common.CloseInfo;
+import org.eclipse.jetty.websocket.common.OpCode;
+import org.eclipse.jetty.websocket.common.WebSocketFrame;
+import org.eclipse.jetty.websocket.common.WebSocketSession;
+import org.eclipse.jetty.websocket.common.frames.TextFrame;
+import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
+import org.eclipse.jetty.websocket.common.test.BlockheadServer;
+import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
+import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
+import org.eclipse.jetty.websocket.common.test.RawFrameBuilder;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ClientCloseTest
+{
+    private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
+
+    private static class CloseTrackingSocket extends WebSocketAdapter
+    {
+        private static final Logger LOG = Log.getLogger(ClientCloseTest.CloseTrackingSocket.class);
+
+        public int closeCode = -1;
+        public String closeReason = null;
+        public CountDownLatch closeLatch = new CountDownLatch(1);
+        public CountDownLatch openLatch = new CountDownLatch(1);
+
+        public EventQueue<String> messageQueue = new EventQueue<>();
+        public EventQueue<Throwable> errorQueue = new EventQueue<>();
+
+        public void assertNoCloseEvent()
+        {
+            Assert.assertThat("Client Close Event",closeLatch.getCount(),is(1L));
+            Assert.assertThat("Client Close Event Status Code ",closeCode,is(-1));
+        }
+
+        public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher<Integer> statusCodeMatcher, Matcher<String> reasonMatcher)
+                throws InterruptedException
+        {
+            long maxTimeout = clientTimeoutMs * 2;
+
+            Assert.assertThat("Client Close Event Occurred",closeLatch.await(maxTimeout,TimeUnit.MILLISECONDS),is(true));
+            Assert.assertThat("Client Close Event Status Code",closeCode,statusCodeMatcher);
+            if (reasonMatcher == null)
+            {
+                Assert.assertThat("Client Close Event Reason",closeReason,nullValue());
+            }
+            else
+            {
+                Assert.assertThat("Client Close Event Reason",closeReason,reasonMatcher);
+            }
+        }
+
+        public void assertReceivedError(Class<? extends Throwable> expectedThrownClass, Matcher<String> messageMatcher) throws TimeoutException,
+                InterruptedException
+        {
+            errorQueue.awaitEventCount(1,500,TimeUnit.MILLISECONDS);
+            Throwable actual = errorQueue.poll();
+            Assert.assertThat("Client Error Event",actual,instanceOf(expectedThrownClass));
+            if (messageMatcher == null)
+            {
+                Assert.assertThat("Client Error Event Message",actual.getMessage(),nullValue());
+            }
+            else
+            {
+                Assert.assertThat("Client Error Event Message",actual.getMessage(),messageMatcher);
+            }
+        }
+
+        public void clearQueues()
+        {
+            messageQueue.clear();
+            errorQueue.clear();
+        }
+
+        @Override
+        public void onWebSocketClose(int statusCode, String reason)
+        {
+            LOG.debug("onWebSocketClose({},{})",statusCode,reason);
+            super.onWebSocketClose(statusCode,reason);
+            closeCode = statusCode;
+            closeReason = reason;
+            closeLatch.countDown();
+        }
+
+        @Override
+        public void onWebSocketConnect(Session session)
+        {
+            super.onWebSocketConnect(session);
+            openLatch.countDown();
+        }
+
+        @Override
+        public void onWebSocketError(Throwable cause)
+        {
+            LOG.debug("onWebSocketError",cause);
+            Assert.assertThat("Error capture",errorQueue.offer(cause),is(true));
+        }
+
+        @Override
+        public void onWebSocketText(String message)
+        {
+            LOG.debug("onWebSocketText({})",message);
+            messageQueue.offer(message);
+        }
+
+        public EndPoint getEndPoint() throws Exception
+        {
+            Session session = getSession();
+            Assert.assertThat("Session type",session,instanceOf(WebSocketSession.class));
+
+            WebSocketSession wssession = (WebSocketSession)session;
+            Field fld = wssession.getClass().getDeclaredField("connection");
+            fld.setAccessible(true);
+            Assert.assertThat("Field: connection",fld,notNullValue());
+
+            Object val = fld.get(wssession);
+            Assert.assertThat("Connection type",val,instanceOf(AbstractWebSocketConnection.class));
+            @SuppressWarnings("resource")
+            AbstractWebSocketConnection wsconn = (AbstractWebSocketConnection)val;
+            return wsconn.getEndPoint();
+        }
+    }
+
+    @Rule
+    public TestTracker tt = new TestTracker();
+
+    private BlockheadServer server;
+    private WebSocketClient client;
+
+    private void confirmConnection(CloseTrackingSocket clientSocket, Future<Session> clientFuture, ServerConnection serverConn) throws Exception
+    {
+        // Wait for client connect on via future
+        clientFuture.get(500,TimeUnit.MILLISECONDS);
+
+        // Wait for client connect via client websocket
+        Assert.assertThat("Client WebSocket is Open",clientSocket.openLatch.await(500,TimeUnit.MILLISECONDS),is(true));
+
+        try
+        {
+            // Send message from client to server
+            final String echoMsg = "echo-test";
+            Future<Void> testFut = clientSocket.getRemote().sendStringByFuture(echoMsg);
+
+            // Wait for send future
+            testFut.get(500,TimeUnit.MILLISECONDS);
+
+            // Read Frame on server side
+            IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS);
+            serverCapture.assertNoErrors();
+            serverCapture.assertFrameCount(1);
+            WebSocketFrame frame = serverCapture.getFrames().poll();
+            Assert.assertThat("Server received frame",frame.getOpCode(),is(OpCode.TEXT));
+            Assert.assertThat("Server received frame payload",frame.getPayloadAsUTF8(),is(echoMsg));
+
+            // Server send echo reply
+            serverConn.write(new TextFrame().setPayload(echoMsg));
+
+            // Wait for received echo
+            clientSocket.messageQueue.awaitEventCount(1,1,TimeUnit.SECONDS);
+
+            // Verify received message
+            String recvMsg = clientSocket.messageQueue.poll();
+            Assert.assertThat("Received message",recvMsg,is(echoMsg));
+
+            // Verify that there are no errors
+            Assert.assertThat("Error events",clientSocket.errorQueue,empty());
+        }
+        finally
+        {
+            clientSocket.clearQueues();
+        }
+    }
+
+    private void confirmServerReceivedCloseFrame(ServerConnection serverConn, int expectedCloseCode, Matcher<String> closeReasonMatcher) throws IOException,
+            TimeoutException
+    {
+        IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS);
+        serverCapture.assertNoErrors();
+        serverCapture.assertFrameCount(1);
+        serverCapture.assertHasFrame(OpCode.CLOSE,1);
+        WebSocketFrame frame = serverCapture.getFrames().poll();
+        Assert.assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE));
+        CloseInfo closeInfo = new CloseInfo(frame);
+        Assert.assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode));
+        if (closeReasonMatcher == null)
+        {
+            Assert.assertThat("Server received close reason",closeInfo.getReason(),nullValue());
+        }
+        else
+        {
+            Assert.assertThat("Server received close reason",closeInfo.getReason(),closeReasonMatcher);
+        }
+    }
+
+    public static class TestWebSocketClient extends WebSocketClient
+    {
+        @Override
+        protected ConnectionManager newConnectionManager()
+        {
+            return new TestConnectionManager(this);
+        }
+    }
+
+    public static class TestConnectionManager extends ConnectionManager
+    {
+        public TestConnectionManager(WebSocketClient client)
+        {
+            super(client);
+        }
+
+        @Override
+        protected WebSocketClientSelectorManager newWebSocketClientSelectorManager(WebSocketClient client)
+        {
+            return new TestSelectorManager(client);
+        }
+    }
+
+    public static class TestSelectorManager extends WebSocketClientSelectorManager
+    {
+        public TestSelectorManager(WebSocketClient client)
+        {
+            super(client);
+        }
+
+        @Override
+        protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
+        {
+            return new TestEndPoint(channel,selectSet,selectionKey,getScheduler(),getPolicy().getIdleTimeout());
+        }
+    }
+
+    public static class TestEndPoint extends SelectChannelEndPoint
+    {
+        public AtomicBoolean congestedFlush = new AtomicBoolean(false);
+
+        public TestEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
+        {
+            super(channel,selector,key,scheduler,idleTimeout);
+        }
+
+        @Override
+        public boolean flush(ByteBuffer... buffers) throws IOException
+        {
+            boolean flushed = super.flush(buffers);
+            congestedFlush.set(!flushed);
+            return flushed;
+        }
+    }
+
+    @Before
+    public void startClient() throws Exception
+    {
+        client = new TestWebSocketClient();
+        client.start();
+    }
+
+    @Before
+    public void startServer() throws Exception
+    {
+        server = new BlockheadServer();
+        server.start();
+    }
+
+    @After
+    public void stopClient() throws Exception
+    {
+        if (client.isRunning())
+        {
+            client.stop();
+        }
+    }
+
+    @After
+    public void stopServer() throws Exception
+    {
+        server.stop();
+    }
+
+    @Test
+    public void testHalfClose() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        // Client connects
+        CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+        Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+        // Server accepts connect
+        ServerConnection serverConn = server.accept();
+        serverConn.upgrade();
+
+        // client confirms connection via echo
+        confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+        // client sends close frame (code 1000, normal)
+        final String origCloseReason = "Normal Close";
+        clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+        // server receives close frame
+        confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
+
+        // server sends 2 messages
+        serverConn.write(new TextFrame().setPayload("Hello"));
+        serverConn.write(new TextFrame().setPayload("World"));
+
+        // server sends close frame (code 1000, no reason)
+        CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server");
+        serverConn.write(sclose.asFrame());
+
+        // client receives 2 messages
+        clientSocket.messageQueue.awaitEventCount(2,1,TimeUnit.SECONDS);
+
+        // Verify received messages
+        String recvMsg = clientSocket.messageQueue.poll();
+        Assert.assertThat("Received message 1",recvMsg,is("Hello"));
+        recvMsg = clientSocket.messageQueue.poll();
+        Assert.assertThat("Received message 2",recvMsg,is("World"));
+
+        // Verify that there are no errors
+        Assert.assertThat("Error events",clientSocket.errorQueue,empty());
+
+        // client close event on ws-endpoint
+        clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.NORMAL),containsString("From Server"));
+    }
+
+    @Test
+    public void testNetworkCongestion() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        // Client connects
+        CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+        Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+        // Server accepts connect
+        ServerConnection serverConn = server.accept();
+        serverConn.upgrade();
+
+        // client confirms connection via echo
+        confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+        // client sends BIG frames (until it cannot write anymore)
+        // server must not read (for test purpose, in order to congest connection)
+        // when write is congested, client enqueue close frame
+        // client initiate write, but write never completes
+        EndPoint endp = clientSocket.getEndPoint();
+        Assert.assertThat("EndPoint is testable",endp,instanceOf(TestEndPoint.class));
+        TestEndPoint testendp = (TestEndPoint)endp;
+
+        char msg[] = new char[10240];
+        int writeCount = 0;
+        long writeSize = 0;
+        int i = 0;
+        while (!testendp.congestedFlush.get())
+        {
+            int z = i - ((i / 26) * 26);
+            char c = (char)('a' + z);
+            Arrays.fill(msg,c);
+            clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
+            writeCount++;
+            writeSize += msg.length;
+        }
+        LOG.debug("Wrote {} frames totalling {} bytes of payload before congestion kicked in",writeCount,writeSize);
+
+        // Verify that there are no errors
+        Assert.assertThat("Error events",clientSocket.errorQueue,empty());
+
+        // client idle timeout triggers close event on client ws-endpoint
+        // client close event on ws-endpoint
+        clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
+    }
+
+    @Test
+    public void testProtocolException() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        // Client connects
+        CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+        Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+        // Server accepts connect
+        ServerConnection serverConn = server.accept();
+        serverConn.upgrade();
+
+        // client confirms connection via echo
+        confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+        // client should not have received close message (yet)
+        clientSocket.assertNoCloseEvent();
+
+        // server sends bad close frame (too big of a reason message)
+        byte msg[] = new byte[400];
+        Arrays.fill(msg,(byte)'x');
+        ByteBuffer bad = ByteBuffer.allocate(500);
+        RawFrameBuilder.putOpFin(bad,OpCode.CLOSE,true);
+        RawFrameBuilder.putLength(bad,msg.length + 2,false);
+        bad.putShort((short)StatusCode.NORMAL);
+        bad.put(msg);
+        BufferUtil.flipToFlush(bad,0);
+        serverConn.write(bad);
+
+        // client should have noticed the error
+        clientSocket.assertReceivedError(ProtocolException.class,containsString("Invalid control frame"));
+
+        // client parse invalid frame, notifies server of close (protocol error)
+        confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length")));
+
+        // server disconnects
+        serverConn.disconnect();
+
+        // client triggers close event on client ws-endpoint
+        clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
+    }
+
+    @Test
+    public void testReadEOF() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        // Client connects
+        CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+        Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+        // Server accepts connect
+        ServerConnection serverConn = server.accept();
+        serverConn.upgrade();
+
+        // client confirms connection via echo
+        confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+        // client sends close frame
+        final String origCloseReason = "Normal Close";
+        clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+        // server receives close frame
+        confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
+
+        // client should not have received close message (yet)
+        clientSocket.assertNoCloseEvent();
+
+        // server shuts down connection (no frame reply)
+        serverConn.disconnect();
+
+        // client reads -1 (EOF)
+        // client triggers close event on client ws-endpoint
+        clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
+    }
+
+    @Test
+    public void testServerNoCloseHandshake() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        // Client connects
+        CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+        Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+        // Server accepts connect
+        ServerConnection serverConn = server.accept();
+        serverConn.upgrade();
+
+        // client confirms connection via echo
+        confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+        // client sends close frame
+        final String origCloseReason = "Normal Close";
+        clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+        // server receives close frame
+        confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
+
+        // client should not have received close message (yet)
+        clientSocket.assertNoCloseEvent();
+
+        // server never sends close frame handshake
+        // server sits idle
+
+        // client idle timeout triggers close event on client ws-endpoint
+        // assert - close code==1006 (abnormal)
+        // assert - close reason message contains (timeout)
+        clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
+    }
+
+    @Test
+    public void testStopLifecycle() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        int clientCount = 3;
+        CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount];
+        ServerConnection serverConns[] = new ServerConnection[clientCount];
+
+        // Connect Multiple Clients
+        for (int i = 0; i < clientCount; i++)
+        {
+            // Client Request Upgrade
+            clientSockets[i] = new CloseTrackingSocket();
+            Future<Session> clientConnectFuture = client.connect(clientSockets[i],server.getWsUri());
+
+            // Server accepts connection
+            serverConns[i] = server.accept();
+            serverConns[i].upgrade();
+
+            // client confirms connection via echo
+            confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]);
+        }
+
+        // client lifecycle stop
+        client.stop();
+
+        // clients send close frames (code 1001, shutdown)
+        for (int i = 0; i < clientCount; i++)
+        {
+            // server receives close frame
+            confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown"));
+        }
+
+        // clients disconnect
+        for (int i = 0; i < clientCount; i++)
+        {
+            clientSockets[i].assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Shutdown"));
+        }
+    }
+
+    @Test
+    public void testWriteException() throws Exception
+    {
+        // Set client timeout
+        final int timeout = 1000;
+        client.setMaxIdleTimeout(timeout);
+
+        // Client connects
+        CloseTrackingSocket clientSocket = new CloseTrackingSocket();
+        Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
+
+        // Server accepts connect
+        ServerConnection serverConn = server.accept();
+        serverConn.upgrade();
+
+        // client confirms connection via echo
+        confirmConnection(clientSocket,clientConnectFuture,serverConn);
+
+        // setup client endpoint for write failure (test only)
+        EndPoint endp = clientSocket.getEndPoint();
+        endp.shutdownOutput();
+
+        // client enqueue close frame
+        // client write failure
+        final String origCloseReason = "Normal Close";
+        clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
+
+        clientSocket.assertReceivedError(EofException.class,null);
+
+        // client triggers close event on client ws-endpoint
+        // assert - close code==1006 (abnormal)
+        // assert - close reason message contains (write failure)
+        clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
+    }
+}
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java
index e06e883..2cea2fd 100644
--- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java
+++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java
@@ -19,7 +19,6 @@
 package org.eclipse.jetty.websocket.client;
 
 import java.io.IOException;
-import java.util.concurrent.Exchanger;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -32,7 +31,6 @@
 {
     private static final Logger LOG = Log.getLogger(ServerWriteThread.class);
     private final ServerConnection conn;
-    private Exchanger<String> exchanger;
     private int slowness = -1;
     private int messageCount = 100;
     private String message = "Hello";
@@ -42,11 +40,6 @@
         this.conn = conn;
     }
 
-    public Exchanger<String> getExchanger()
-    {
-        return exchanger;
-    }
-
     public String getMessage()
     {
         return message;
@@ -73,12 +66,6 @@
             {
                 conn.write(new TextFrame().setPayload(message));
 
-                if (exchanger != null)
-                {
-                    // synchronized on exchange
-                    exchanger.exchange(message);
-                }
-
                 m.incrementAndGet();
 
                 if (slowness > 0)
@@ -93,11 +80,6 @@
         }
     }
 
-    public void setExchanger(Exchanger<String> exchanger)
-    {
-        this.exchanger = exchanger;
-    }
-
     public void setMessage(String message)
     {
         this.message = message;
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java
index 80720f3..4f56ec6 100644
--- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java
+++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java
@@ -49,7 +49,7 @@
     public void startClient() throws Exception
     {
         client = new WebSocketClient();
-        client.getPolicy().setIdleTimeout(60000);
+        client.setMaxIdleTimeout(60000);
         client.start();
     }
 
@@ -78,7 +78,7 @@
     {
         JettyTrackingSocket tsocket = new JettyTrackingSocket();
         client.setMasker(new ZeroMasker());
-        client.getPolicy().setIdleTimeout(60000);
+        client.setMaxIdleTimeout(60000);
 
         URI wsUri = server.getWsUri();
         Future<Session> future = client.connect(tsocket,wsUri);
@@ -123,41 +123,38 @@
     @Slow
     public void testServerSlowToSend() throws Exception
     {
-        // final Exchanger<String> exchanger = new Exchanger<String>();
-        JettyTrackingSocket tsocket = new JettyTrackingSocket();
-        // tsocket.messageExchanger = exchanger;
+        JettyTrackingSocket clientSocket = new JettyTrackingSocket();
         client.setMasker(new ZeroMasker());
-        client.getPolicy().setIdleTimeout(60000);
+        client.setMaxIdleTimeout(60000);
 
         URI wsUri = server.getWsUri();
-        Future<Session> future = client.connect(tsocket,wsUri);
+        Future<Session> clientConnectFuture = client.connect(clientSocket,wsUri);
 
-        ServerConnection sconnection = server.accept();
-        sconnection.setSoTimeout(60000);
-        sconnection.upgrade();
+        ServerConnection serverConn = server.accept();
+        serverConn.setSoTimeout(60000);
+        serverConn.upgrade();
 
         // Confirm connected
-        future.get(500,TimeUnit.MILLISECONDS);
-        tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
+        clientConnectFuture.get(500,TimeUnit.MILLISECONDS);
+        clientSocket.waitForConnected(500,TimeUnit.MILLISECONDS);
 
         // Have server write slowly.
         int messageCount = 1000;
 
-        ServerWriteThread writer = new ServerWriteThread(sconnection);
+        ServerWriteThread writer = new ServerWriteThread(serverConn);
         writer.setMessageCount(messageCount);
         writer.setMessage("Hello");
-        // writer.setExchanger(exchanger);
         writer.setSlowness(10);
         writer.start();
         writer.join();
 
         // Verify receive
-        Assert.assertThat("Message Receive Count",tsocket.messageQueue.size(),is(messageCount));
+        Assert.assertThat("Message Receive Count",clientSocket.messageQueue.size(),is(messageCount));
 
         // Close
-        sconnection.close(StatusCode.NORMAL);
+        serverConn.close(StatusCode.NORMAL);
 
-        Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
-        tsocket.assertCloseCode(StatusCode.NORMAL);
+        Assert.assertTrue("Client Socket Closed",clientSocket.closeLatch.await(10,TimeUnit.SECONDS));
+        clientSocket.assertCloseCode(StatusCode.NORMAL);
     }
 }
diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java
deleted file mode 100644
index 72c1fc1..0000000
--- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-//
-//  ========================================================================
-//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
-//  ------------------------------------------------------------------------
-//  All rights reserved. This program and the accompanying materials
-//  are made available under the terms of the Eclipse Public License v1.0
-//  and Apache License v2.0 which accompanies this distribution.
-//
-//      The Eclipse Public License is available at
-//      http://www.eclipse.org/legal/epl-v10.html
-//
-//      The Apache License v2.0 is available at
-//      http://www.opensource.org/licenses/apache2.0.php
-//
-//  You may elect to redistribute this code under either of these licenses.
-//  ========================================================================
-//
-
-package org.eclipse.jetty.websocket.client;
-
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-
-import java.net.URI;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.toolchain.test.TestTracker;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.StatusCode;
-import org.eclipse.jetty.websocket.common.test.BlockheadServer;
-import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Various tests for Timeout handling
- */
-public class TimeoutTest
-{
-    @Rule
-    public TestTracker tt = new TestTracker();
-
-    private BlockheadServer server;
-    private WebSocketClient client;
-
-    @Before
-    public void startClient() throws Exception
-    {
-        client = new WebSocketClient();
-        client.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
-        client.start();
-    }
-
-    @Before
-    public void startServer() throws Exception
-    {
-        server = new BlockheadServer();
-        server.start();
-    }
-
-    @After
-    public void stopClient() throws Exception
-    {
-        client.stop();
-    }
-
-    @After
-    public void stopServer() throws Exception
-    {
-        server.stop();
-    }
-
-    /**
-     * In a situation where the upgrade/connection is successful, and there is no activity for a while, the idle timeout triggers on the client side and
-     * automatically initiates a close handshake.
-     */
-    @Test
-    public void testIdleDetectedByClient() throws Exception
-    {
-        JettyTrackingSocket wsocket = new JettyTrackingSocket();
-
-        URI wsUri = server.getWsUri();
-        client.setMaxIdleTimeout(1000);
-        Future<Session> future = client.connect(wsocket,wsUri);
-
-        ServerConnection ssocket = server.accept();
-        ssocket.upgrade();
-
-        try
-        {
-            ssocket.startEcho();
-            // Validate that connect occurred
-            future.get(500,TimeUnit.MILLISECONDS);
-            wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
-
-            // Wait for inactivity idle timeout.
-            long start = System.currentTimeMillis();
-            wsocket.waitForClose(2,TimeUnit.SECONDS);
-            long end = System.currentTimeMillis();
-            long dur = (end - start);
-            // Make sure idle timeout takes less than 5 total seconds
-            Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(3000L));
-
-            // Client should see a close event, with abnormal status NO_CLOSE
-            wsocket.assertCloseCode(StatusCode.ABNORMAL);
-        }
-        finally
-        {
-            ssocket.stopEcho();
-        }
-    }
-}
diff --git a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
index 7c9bd36..9668b13 100644
--- a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
+++ b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
@@ -1,12 +1,17 @@
 org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
 org.eclipse.jetty.LEVEL=WARN
 # org.eclipse.jetty.LEVEL=DEBUG
-# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
+# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG
+# org.eclipse.jetty.io.SelectChannelEndPoint.LEVEL=DEBUG
+# org.eclipse.jetty.io.IdleTimeout.LEVEL=DEBUG
+# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG
+# org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
 # org.eclipse.jetty.websocket.LEVEL=WARN
 # org.eclipse.jetty.websocket.LEVEL=DEBUG
 # org.eclipse.jetty.websocket.client.LEVEL=DEBUG
 # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.common.io.WriteBytesProvider.LEVEL=DEBUG
+# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
+
 # org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
 # org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
 # org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java
index 03a737c..f14c252 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.eclipse.jetty.util.BufferUtil;
 import org.eclipse.jetty.util.StringUtil;
 import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
 import org.eclipse.jetty.util.Utf8StringBuilder;
@@ -65,7 +66,8 @@
             statusCode |= (data.get() & 0xFF) << 8;
             statusCode |= (data.get() & 0xFF);
 
-            if(validate) {
+            if (validate)
+            {
                 if ((statusCode < StatusCode.NORMAL) || (statusCode == StatusCode.UNDEFINED) || (statusCode == StatusCode.NO_CLOSE)
                         || (statusCode == StatusCode.NO_CODE) || ((statusCode > 1011) && (statusCode <= 2999)) || (statusCode >= 5000))
                 {
@@ -120,7 +122,7 @@
 
     public CloseInfo(int statusCode)
     {
-        this(statusCode, null);
+        this(statusCode,null);
     }
 
     public CloseInfo(int statusCode, String reason)
@@ -144,8 +146,9 @@
             utf = StringUtil.getUtf8Bytes(reason);
             len += utf.length;
         }
-        
-        ByteBuffer buf = ByteBuffer.allocate(len);
+
+        ByteBuffer buf = BufferUtil.allocate(len);
+        BufferUtil.flipToFill(buf);
         buf.put((byte)((statusCode >>> 8) & 0xFF));
         buf.put((byte)((statusCode >>> 0) & 0xFF));
 
@@ -153,7 +156,7 @@
         {
             buf.put(utf,0,utf.length);
         }
-        buf.flip();
+        BufferUtil.flipToFlush(buf,0);
 
         return buf;
     }
@@ -162,7 +165,14 @@
     {
         CloseFrame frame = new CloseFrame();
         frame.setFin(true);
-        frame.setPayload(asByteBuffer());
+        if ((statusCode >= 1000) && (statusCode != StatusCode.NO_CLOSE) && (statusCode != StatusCode.NO_CODE))
+        {
+            if (statusCode == StatusCode.FAILED_TLS_HANDSHAKE)
+            {
+                throw new ProtocolException("Close Frame with status code " + statusCode + " not allowed (per RFC6455)");
+            }
+            frame.setPayload(asByteBuffer());
+        }
         return frame;
     }
 
@@ -180,10 +190,10 @@
     {
         return !((statusCode == StatusCode.NORMAL) || (statusCode == StatusCode.NO_CODE));
     }
-    
+
     public boolean isAbnormal()
     {
-        return (statusCode == StatusCode.ABNORMAL);
+        return (statusCode != StatusCode.NORMAL);
     }
 
     @Override
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java
index 0cf7e5f..86195a0 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java
@@ -96,8 +96,9 @@
 
     private void assertSanePayloadLength(long len)
     {
-        if (LOG.isDebugEnabled())
-            LOG.debug("Payload Length: {} - {}",len,this);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{} Payload Length: {} - {}",policy.getBehavior(),len,this);
+        }
 
         // Since we use ByteBuffer so often, having lengths over Integer.MAX_VALUE is really impossible.
         if (len > Integer.MAX_VALUE)
@@ -239,7 +240,7 @@
         incomingFramesHandler.incomingError(e);
     }
 
-    public void parse(ByteBuffer buffer)
+    public void parse(ByteBuffer buffer) throws WebSocketException
     {
         if (buffer.remaining() <= 0)
         {
@@ -266,13 +267,20 @@
         {
             buffer.position(buffer.limit()); // consume remaining
             reset();
+            // let session know
             notifyWebSocketException(e);
+            // need to throw for proper close behavior in connection
+            throw e;
         }
         catch (Throwable t)
         {
             buffer.position(buffer.limit()); // consume remaining
             reset();
-            notifyWebSocketException(new WebSocketException(t));
+            // let session know
+            WebSocketException e = new WebSocketException(t);
+            notifyWebSocketException(e);
+            // need to throw for proper close behavior in connection
+            throw e;
         }
     }
 
@@ -299,7 +307,9 @@
     private boolean parseFrame(ByteBuffer buffer)
     {
         if (LOG.isDebugEnabled())
+        {
             LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
+        }
         while (buffer.hasRemaining())
         {
             switch (state)
@@ -318,7 +328,8 @@
                     }
                     
                     if (LOG.isDebugEnabled())
-                        LOG.debug("OpCode {}, fin={} rsv={}{}{}",
+                        LOG.debug("{} OpCode {}, fin={} rsv={}{}{}",
+                                policy.getBehavior(),
                                 OpCode.name(opcode),
                                 fin,
                                 (isRsv1InUse()?'1':'.'),
@@ -412,11 +423,6 @@
                                 throw new ProtocolException("RSV3 not allowed to be set");   
                         }
                     }
-                    else
-                    {
-                        if (LOG.isDebugEnabled())
-                            LOG.debug("OpCode {}, fin={} rsv=000",OpCode.name(opcode),fin);
-                    }
                     
                     state = State.PAYLOAD_LEN;
                     break;
@@ -591,8 +597,9 @@
             buffer.limit(limit);
             buffer.position(buffer.position() + window.remaining());
 
-            if (LOG.isDebugEnabled())
-                LOG.debug("Window: {}",BufferUtil.toDetailString(window));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{} Window: {}",policy.getBehavior(),BufferUtil.toDetailString(window));
+            }
 
             maskProcessor.process(window);
 
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
index 460d1d7..04637e3 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
@@ -41,6 +41,7 @@
 import org.eclipse.jetty.websocket.api.SuspendToken;
 import org.eclipse.jetty.websocket.api.UpgradeRequest;
 import org.eclipse.jetty.websocket.api.UpgradeResponse;
+import org.eclipse.jetty.websocket.api.WebSocketBehavior;
 import org.eclipse.jetty.websocket.api.WebSocketException;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@@ -90,20 +91,19 @@
     @Override
     public void close()
     {
-        this.close(StatusCode.NORMAL, null);
+        this.close(StatusCode.NORMAL,null);
     }
 
     @Override
     public void close(CloseStatus closeStatus)
     {
-        this.close(closeStatus.getCode(), closeStatus.getPhrase());
+        this.close(closeStatus.getCode(),closeStatus.getPhrase());
     }
 
     @Override
     public void close(int statusCode, String reason)
     {
-        connection.close(statusCode, reason);
-        notifyClose(statusCode, reason);
+        connection.close(statusCode,reason);
     }
 
     /**
@@ -115,7 +115,7 @@
         connection.disconnect();
 
         // notify of harsh disconnect
-        notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect");
+        notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
     }
 
     public void dispatch(Runnable runnable)
@@ -130,7 +130,7 @@
         out.append(indent).append(" +- incomingHandler : ");
         if (incomingHandler instanceof Dumpable)
         {
-            ((Dumpable)incomingHandler).dump(out, indent + "    ");
+            ((Dumpable)incomingHandler).dump(out,indent + "    ");
         }
         else
         {
@@ -140,7 +140,7 @@
         out.append(indent).append(" +- outgoingHandler : ");
         if (outgoingHandler instanceof Dumpable)
         {
-            ((Dumpable)outgoingHandler).dump(out, indent + "    ");
+            ((Dumpable)outgoingHandler).dump(out,indent + "    ");
         }
         else
         {
@@ -273,7 +273,7 @@
     {
         final int prime = 31;
         int result = 1;
-        result = (prime * result) + ((connection == null) ? 0 : connection.hashCode());
+        result = (prime * result) + ((connection == null)?0:connection.hashCode());
         return result;
     }
 
@@ -328,7 +328,11 @@
 
     public void notifyClose(int statusCode, String reason)
     {
-        websocket.onClose(new CloseInfo(statusCode, reason));
+        if (LOG.isDebugEnabled())
+        {
+            LOG.debug("notifyClose({},{})",statusCode,reason);
+        }
+        websocket.onClose(new CloseInfo(statusCode,reason));
     }
 
     public void notifyError(Throwable cause)
@@ -342,12 +346,13 @@
     {
         switch (state)
         {
-            case CLOSING:
+            case CLOSED:
                 // notify session listeners
                 for (SessionListener listener : sessionListeners)
                 {
                     try
                     {
+                        LOG.debug("{}.onSessionClosed()",listener.getClass().getSimpleName());
                         listener.onSessionClosed(this);
                     }
                     catch (Throwable t)
@@ -355,12 +360,10 @@
                         LOG.ignore(t);
                     }
                 }
-                break;
-            case CLOSED:
                 IOState ioState = this.connection.getIOState();
                 CloseInfo close = ioState.getCloseInfo();
                 // confirmed close of local endpoint
-                notifyClose(close.getStatusCode(), close.getReason());
+                notifyClose(close.getStatusCode(),close.getReason());
                 break;
             case OPEN:
                 // notify session listeners
@@ -394,17 +397,32 @@
         connection.getIOState().onConnected();
 
         // Connect remote
-        remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchMode());
+        remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
 
-        // Open WebSocket
-        websocket.openSession(this);
-
-        // Open connection
-        connection.getIOState().onOpened();
-
-        if (LOG.isDebugEnabled())
+        try
         {
-            LOG.debug("open -> {}", dump());
+            // Open WebSocket
+            websocket.openSession(this);
+
+            // Open connection
+            connection.getIOState().onOpened();
+
+            if (LOG.isDebugEnabled())
+            {
+                LOG.debug("open -> {}",dump());
+            }
+        }
+        catch (Throwable t)
+        {
+            // Exception on end-user WS-Endpoint.
+            // Fast-fail & close connection with reason.
+            int statusCode = StatusCode.SERVER_ERROR;
+            if(policy.getBehavior() == WebSocketBehavior.CLIENT)
+            {
+                statusCode = StatusCode.POLICY_VIOLATION;
+            }
+            
+            close(statusCode,t.getMessage());
         }
     }
 
@@ -444,11 +462,11 @@
                 List<String> values = entry.getValue();
                 if (values != null)
                 {
-                    this.parameterMap.put(entry.getKey(), values.toArray(new String[values.size()]));
+                    this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
                 }
                 else
                 {
-                    this.parameterMap.put(entry.getKey(), new String[0]);
+                    this.parameterMap.put(entry.getKey(),new String[0]);
                 }
             }
         }
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java
index 8cc60f0..7a55b2d 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java
@@ -88,13 +88,7 @@
     {
         if (LOG.isDebugEnabled())
         {
-            LOG.debug("incoming(WebSocketException)",e);
-        }
-
-        if (e instanceof CloseException)
-        {
-            CloseException close = (CloseException)e;
-            terminateConnection(close.getStatusCode(),close.getMessage());
+            LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
         }
 
         onError(e);
@@ -105,7 +99,7 @@
     {
         if (LOG.isDebugEnabled())
         {
-            LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
+            LOG.debug("incomingFrame({})",frame);
         }
 
         try
@@ -226,6 +220,7 @@
         catch (Throwable t)
         {
             unhandled(t);
+            throw t;
         }
     }
 
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
index c9f98bb..cea5909 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
@@ -32,7 +32,6 @@
 
 import org.eclipse.jetty.io.AbstractConnection;
 import org.eclipse.jetty.io.ByteBufferPool;
-import org.eclipse.jetty.io.Connection;
 import org.eclipse.jetty.io.EndPoint;
 import org.eclipse.jetty.util.BufferUtil;
 import org.eclipse.jetty.util.StringUtil;
@@ -54,13 +53,13 @@
 import org.eclipse.jetty.websocket.common.ConnectionState;
 import org.eclipse.jetty.websocket.common.Generator;
 import org.eclipse.jetty.websocket.common.LogicalConnection;
+import org.eclipse.jetty.websocket.common.OpCode;
 import org.eclipse.jetty.websocket.common.Parser;
 import org.eclipse.jetty.websocket.common.WebSocketSession;
 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
 
 /**
- * Provides the implementation of {@link LogicalConnection} within the
- * framework of the new {@link Connection} framework of {@code jetty-io}.
+ * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of {@code jetty-io}.
  */
 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
 {
@@ -68,7 +67,7 @@
     {
         private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
         {
-            super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8);
+            super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
         }
 
         @Override
@@ -106,7 +105,7 @@
             // Abnormal Close
             reason = CloseStatus.trimMaxReasonLength(reason);
             session.notifyError(x);
-            session.notifyClose(StatusCode.NO_CLOSE,reason);
+            session.notifyClose(StatusCode.ABNORMAL,reason);
 
             disconnect(); // disconnect endpoint & connection
         }
@@ -116,10 +115,11 @@
     {
         private final boolean outputOnly;
 
-        public OnDisconnectCallback(boolean outputOnly) {
+        public OnDisconnectCallback(boolean outputOnly)
+        {
             this.outputOnly = outputOnly;
         }
-        
+
         @Override
         public void writeFailed(Throwable x)
         {
@@ -218,10 +218,10 @@
     @Override
     public void close(int statusCode, String reason)
     {
+        LOG.debug("close({},{})",statusCode,reason);
         CloseInfo close = new CloseInfo(statusCode,reason);
         if (statusCode == StatusCode.ABNORMAL)
         {
-            flusher.close(); // TODO this makes the IdleTimeoutTest pass, but I'm dubious it is the correct way
             ioState.onAbnormalClose(close);
         }
         else
@@ -230,7 +230,6 @@
         }
     }
 
-
     @Override
     public void disconnect()
     {
@@ -366,7 +365,9 @@
     @Override
     public void onClose()
     {
+        LOG.debug("{} onClose()",policy.getBehavior());
         super.onClose();
+        // ioState.onDisconnected();
         flusher.close();
     }
 
@@ -385,18 +386,15 @@
                 {
                     // Fire out a close frame, indicating abnormal shutdown, then disconnect
                     CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
-                    outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false), BatchMode.OFF);
+                    outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
                 }
-                else
-                {
-                    // Just disconnect
-                    this.disconnect(false);
-                }
+                // Just disconnect
+                this.disconnect(false);
                 break;
             case CLOSING:
                 CloseInfo close = ioState.getCloseInfo();
                 // reply to close handshake from remote
-                outgoingFrame(close.asFrame(),new OnDisconnectCallback(true), BatchMode.OFF);
+                outgoingFrame(close.asFrame(),new OnDisconnectCallback(true),BatchMode.OFF);
             default:
                 break;
         }
@@ -447,20 +445,26 @@
     @Override
     protected boolean onReadTimeout()
     {
-        LOG.debug("{} Read Timeout",policy.getBehavior());
-
         IOState state = getIOState();
-        if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
+        ConnectionState cstate = state.getConnectionState();
+        LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
+
+        if (cstate == ConnectionState.CLOSED)
         {
-            // close already initiated, extra timeouts not relevant
+            // close already completed, extra timeouts not relevant
             // allow underlying connection and endpoint to disconnect on its own
             return true;
         }
 
-        // Initiate close - politely send close frame.
-        session.notifyError(new SocketTimeoutException("Timeout on Read"));
-        // This is an Abnormal Close condition
-        close(StatusCode.ABNORMAL,"Idle Timeout");
+        try
+        {
+            session.notifyError(new SocketTimeoutException("Timeout on Read"));
+        }
+        finally
+        {
+            // This is an Abnormal Close condition
+            close(StatusCode.ABNORMAL,"Idle Timeout");
+        }
 
         return false;
     }
@@ -476,7 +480,21 @@
             LOG.debug("outgoingFrame({}, {})",frame,callback);
         }
 
-        flusher.enqueue(frame,callback, batchMode);
+        CloseInfo close = null;
+        // grab a copy of the frame details before masking and whatnot
+        if (frame.getOpCode() == OpCode.CLOSE)
+        {
+            close = new CloseInfo(frame);
+        }
+
+        flusher.enqueue(frame,callback,batchMode);
+
+        // now trigger local close
+        if (close != null)
+        {
+            LOG.debug("outgoing CLOSE frame - {}: {}",frame,close);
+            ioState.onCloseLocal(close);
+        }
     }
 
     private int read(ByteBuffer buffer)
@@ -504,7 +522,6 @@
                         LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
                     }
                     parser.parse(buffer);
-                    // TODO: has the end user application already consumed what it was given?
                 }
             }
         }
@@ -520,6 +537,12 @@
             close(e.getStatusCode(),e.getMessage());
             return -1;
         }
+        catch (Throwable t)
+        {
+            LOG.warn(t);
+            close(StatusCode.ABNORMAL,t.getMessage());
+            return -1;
+        }
     }
 
     @Override
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
index a7bb3cb..6e995e5 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
@@ -29,7 +29,6 @@
 import org.eclipse.jetty.io.EndPoint;
 import org.eclipse.jetty.util.ArrayQueue;
 import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.Callback;
 import org.eclipse.jetty.util.IteratingCallback;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
@@ -45,246 +44,22 @@
  */
 public class FrameFlusher
 {
-    public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
-    private static final Logger LOG = Log.getLogger(FrameFlusher.class);
-
-    private final ByteBufferPool bufferPool;
-    private final EndPoint endpoint;
-    private final int bufferSize;
-    private final Generator generator;
-    private final int maxGather;
-    private final Object lock = new Object();
-    private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16, 16, lock);
-    private final Flusher flusher = new Flusher();
-    private final AtomicBoolean closed = new AtomicBoolean();
-    private volatile Throwable failure;
-
-    public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
-    {
-        this.bufferPool = bufferPool;
-        this.endpoint = endpoint;
-        this.bufferSize = bufferSize;
-        this.generator = Objects.requireNonNull(generator);
-        this.maxGather = maxGather;
-    }
-
-    public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
-    {
-        if (closed.get())
-        {
-            notifyCallbackFailure(callback, new EOFException("Connection has been closed locally"));
-            return;
-        }
-        if (flusher.isFailed())
-        {
-            notifyCallbackFailure(callback, failure);
-            return;
-        }
-
-        FrameEntry entry = new FrameEntry(frame, callback, batchMode);
-
-        synchronized (lock)
-        {
-            switch (frame.getOpCode())
-            {
-                case OpCode.PING:
-                {
-                    // Prepend PINGs so they are processed first.
-                    queue.add(0, entry);
-                    break;
-                }
-                case OpCode.CLOSE:
-                {
-                    // There may be a chance that other frames are
-                    // added after this close frame, but we will
-                    // fail them later to keep it simple here.
-                    closed.set(true);
-                    queue.add(entry);
-                    break;
-                }
-                default:
-                {
-                    queue.add(entry);
-                    break;
-                }
-            }
-        }
-
-        if (LOG.isDebugEnabled())
-            LOG.debug("{} queued {}", this, entry);
-
-        flusher.iterate();
-    }
-
-    public void close()
-    {
-        if (closed.compareAndSet(false, true))
-        {
-            LOG.debug("{} closing {}", this);
-            EOFException eof = new EOFException("Connection has been closed locally");
-            flusher.failed(eof);
-
-            // Fail also queued entries.
-            List<FrameEntry> entries = new ArrayList<>();
-            synchronized (lock)
-            {
-                entries.addAll(queue);
-                queue.clear();
-            }
-            // Notify outside sync block.
-            for (FrameEntry entry : entries)
-                notifyCallbackFailure(entry.callback, eof);
-        }
-    }
-
-    protected void onFailure(Throwable x)
-    {
-        LOG.warn(x);
-    }
-
-    protected void notifyCallbackSuccess(WriteCallback callback)
-    {
-        try
-        {
-            if (callback != null)
-                callback.writeSuccess();
-        }
-        catch (Throwable x)
-        {
-            LOG.debug("Exception while notifying success of callback " + callback, x);
-        }
-    }
-
-    protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
-    {
-        try
-        {
-            if (callback != null)
-                callback.writeFailed(failure);
-        }
-        catch (Throwable x)
-        {
-            LOG.debug("Exception while notifying failure of callback " + callback, x);
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        ByteBuffer aggregate = flusher.aggregate;
-        return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",
-                getClass().getSimpleName(),
-                queue.size(),
-                aggregate == null ? 0 : aggregate.position(),
-                failure);
-    }
-
     private class Flusher extends IteratingCallback
     {
         private final List<FrameEntry> entries = new ArrayList<>(maxGather);
-        private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
+        private final List<ByteBuffer> buffers = new ArrayList<>((maxGather * 2) + 1);
         private ByteBuffer aggregate;
         private BatchMode batchMode;
 
-        @Override
-        protected Action process() throws Exception
-        {
-            int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
-            BatchMode currentBatchMode = BatchMode.AUTO;
-            synchronized (lock)
-            {
-                while (entries.size() <= maxGather && !queue.isEmpty())
-                {
-                    FrameEntry entry = queue.remove(0);
-                    currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
-
-                    // Force flush if we need to.
-                    if (entry.frame == FLUSH_FRAME)
-                        currentBatchMode = BatchMode.OFF;
-
-                    int payloadLength = BufferUtil.length(entry.frame.getPayload());
-                    int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
-
-                    // If it is a "big" frame, avoid copying into the aggregate buffer.
-                    if (approxFrameLength > (bufferSize >> 2))
-                        currentBatchMode = BatchMode.OFF;
-
-                    // If the aggregate buffer overflows, do not batch.
-                    space -= approxFrameLength;
-                    if (space <= 0)
-                        currentBatchMode = BatchMode.OFF;
-
-                    entries.add(entry);
-                }
-            }
-
-            if (LOG.isDebugEnabled())
-                LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries);
-
-            if (entries.isEmpty())
-            {
-                if (batchMode != BatchMode.AUTO)
-                {
-                    // Nothing more to do, release the aggregate buffer if we need to.
-                    // Releasing it here rather than in succeeded() allows for its reuse.
-                    releaseAggregate();
-                    return Action.IDLE;
-                }
-
-                LOG.debug("{} auto flushing", FrameFlusher.this);
-                return flush();
-            }
-
-            batchMode = currentBatchMode;
-
-            return currentBatchMode == BatchMode.OFF ? flush() : batch();
-        }
-
-        private Action flush()
-        {
-            if (!BufferUtil.isEmpty(aggregate))
-            {
-                buffers.add(aggregate);
-                if (LOG.isDebugEnabled())
-                    LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
-            }
-
-            // Do not allocate the iterator here.
-            for (int i = 0; i < entries.size(); ++i)
-            {
-                FrameEntry entry = entries.get(i);
-                // Skip the "synthetic" frame used for flushing.
-                if (entry.frame == FLUSH_FRAME)
-                    continue;
-                buffers.add(entry.generateHeaderBytes());
-                ByteBuffer payload = entry.frame.getPayload();
-                if (BufferUtil.hasContent(payload))
-                    buffers.add(payload);
-            }
-
-            if (LOG.isDebugEnabled())
-                LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
-
-            if (buffers.isEmpty())
-            {
-                releaseAggregate();
-                // We may have the FLUSH_FRAME to notify.
-                succeedEntries();
-                return Action.IDLE;
-            }
-
-            endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
-            buffers.clear();
-            return Action.SCHEDULED;
-        }
-
         private Action batch()
         {
             if (aggregate == null)
             {
-                aggregate = bufferPool.acquire(bufferSize, true);
+                aggregate = bufferPool.acquire(bufferSize,true);
                 if (LOG.isDebugEnabled())
-                    LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
+                {
+                    LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
+                }
             }
 
             // Do not allocate the iterator here.
@@ -296,17 +71,149 @@
 
                 ByteBuffer payload = entry.frame.getPayload();
                 if (BufferUtil.hasContent(payload))
-                    BufferUtil.append(aggregate, payload);
+                {
+                    BufferUtil.append(aggregate,payload);
+                }
             }
             if (LOG.isDebugEnabled())
-                LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
+            {
+                LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
+            }
             succeeded();
             return Action.SCHEDULED;
         }
 
+        @Override
+        protected void completed()
+        {
+            // This IteratingCallback never completes.
+        }
+
+        @Override
+        public void failed(Throwable x)
+        {
+            for (FrameEntry entry : entries)
+            {
+                notifyCallbackFailure(entry.callback,x);
+                entry.release();
+            }
+            entries.clear();
+            super.failed(x);
+            failure = x;
+            onFailure(x);
+        }
+
+        private Action flush()
+        {
+            if (!BufferUtil.isEmpty(aggregate))
+            {
+                buffers.add(aggregate);
+                if (LOG.isDebugEnabled())
+                {
+                    LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
+                }
+            }
+
+            // Do not allocate the iterator here.
+            for (int i = 0; i < entries.size(); ++i)
+            {
+                FrameEntry entry = entries.get(i);
+                // Skip the "synthetic" frame used for flushing.
+                if (entry.frame == FLUSH_FRAME)
+                {
+                    continue;
+                }
+                buffers.add(entry.generateHeaderBytes());
+                ByteBuffer payload = entry.frame.getPayload();
+                if (BufferUtil.hasContent(payload))
+                {
+                    buffers.add(payload);
+                }
+            }
+
+            if (LOG.isDebugEnabled())
+            {
+                LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
+            }
+
+            if (buffers.isEmpty())
+            {
+                releaseAggregate();
+                // We may have the FLUSH_FRAME to notify.
+                succeedEntries();
+                return Action.IDLE;
+            }
+
+            endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
+            buffers.clear();
+            return Action.SCHEDULED;
+        }
+
+        @Override
+        protected Action process() throws Exception
+        {
+            int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
+            BatchMode currentBatchMode = BatchMode.AUTO;
+            synchronized (lock)
+            {
+                while ((entries.size() <= maxGather) && !queue.isEmpty())
+                {
+                    FrameEntry entry = queue.remove(0);
+                    currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
+
+                    // Force flush if we need to.
+                    if (entry.frame == FLUSH_FRAME)
+                    {
+                        currentBatchMode = BatchMode.OFF;
+                    }
+
+                    int payloadLength = BufferUtil.length(entry.frame.getPayload());
+                    int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
+
+                    // If it is a "big" frame, avoid copying into the aggregate buffer.
+                    if (approxFrameLength > (bufferSize >> 2))
+                    {
+                        currentBatchMode = BatchMode.OFF;
+                    }
+
+                    // If the aggregate buffer overflows, do not batch.
+                    space -= approxFrameLength;
+                    if (space <= 0)
+                    {
+                        currentBatchMode = BatchMode.OFF;
+                    }
+
+                    entries.add(entry);
+                }
+            }
+
+            if (LOG.isDebugEnabled())
+            {
+                LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
+            }
+
+            if (entries.isEmpty())
+            {
+                if (batchMode != BatchMode.AUTO)
+                {
+                    // Nothing more to do, release the aggregate buffer if we need to.
+                    // Releasing it here rather than in succeeded() allows for its reuse.
+                    releaseAggregate();
+                    return Action.IDLE;
+                }
+
+                LOG.debug("{} auto flushing",FrameFlusher.this);
+                return flush();
+            }
+
+            batchMode = currentBatchMode;
+
+            return currentBatchMode == BatchMode.OFF?flush():batch();
+        }
+
         private void releaseAggregate()
         {
-            if (aggregate != null && BufferUtil.isEmpty(aggregate))
+            if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
             {
                 bufferPool.release(aggregate);
                 aggregate = null;
@@ -331,26 +238,6 @@
             }
             entries.clear();
         }
-
-        @Override
-        protected void completed()
-        {
-            // This IteratingCallback never completes.
-        }
-
-        @Override
-        public void failed(Throwable x)
-        {
-            for (FrameEntry entry : entries)
-            {
-                notifyCallbackFailure(entry.callback, x);
-                entry.release();
-            }
-            entries.clear();
-            super.failed(x);
-            failure = x;
-            onFailure(x);
-        }
     }
 
     private class FrameEntry
@@ -374,7 +261,7 @@
 
         private void generateHeaderBytes(ByteBuffer buffer)
         {
-            generator.generateHeaderBytes(frame, buffer);
+            generator.generateHeaderBytes(frame,buffer);
         }
 
         private void release()
@@ -389,7 +276,145 @@
         @Override
         public String toString()
         {
-            return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);
+            return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
         }
     }
+
+    public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
+    private static final Logger LOG = Log.getLogger(FrameFlusher.class);
+    private final ByteBufferPool bufferPool;
+    private final EndPoint endpoint;
+    private final int bufferSize;
+    private final Generator generator;
+    private final int maxGather;
+    private final Object lock = new Object();
+    private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
+    private final Flusher flusher = new Flusher();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private volatile Throwable failure;
+
+    public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
+    {
+        this.bufferPool = bufferPool;
+        this.endpoint = endpoint;
+        this.bufferSize = bufferSize;
+        this.generator = Objects.requireNonNull(generator);
+        this.maxGather = maxGather;
+    }
+
+    public void close()
+    {
+        if (closed.compareAndSet(false,true))
+        {
+            LOG.debug("{} closing {}",this);
+            EOFException eof = new EOFException("Connection has been closed locally");
+            flusher.failed(eof);
+
+            // Fail also queued entries.
+            List<FrameEntry> entries = new ArrayList<>();
+            synchronized (lock)
+            {
+                entries.addAll(queue);
+                queue.clear();
+            }
+            // Notify outside sync block.
+            for (FrameEntry entry : entries)
+            {
+                notifyCallbackFailure(entry.callback,eof);
+            }
+        }
+    }
+
+    public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
+    {
+        if (closed.get())
+        {
+            notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
+            return;
+        }
+        if (flusher.isFailed())
+        {
+            notifyCallbackFailure(callback,failure);
+            return;
+        }
+
+        FrameEntry entry = new FrameEntry(frame,callback,batchMode);
+
+        synchronized (lock)
+        {
+            switch (frame.getOpCode())
+            {
+                case OpCode.PING:
+                {
+                    // Prepend PINGs so they are processed first.
+                    queue.add(0,entry);
+                    break;
+                }
+                case OpCode.CLOSE:
+                {
+                    // There may be a chance that other frames are
+                    // added after this close frame, but we will
+                    // fail them later to keep it simple here.
+                    closed.set(true);
+                    queue.add(entry);
+                    break;
+                }
+                default:
+                {
+                    queue.add(entry);
+                    break;
+                }
+            }
+        }
+
+        if (LOG.isDebugEnabled())
+        {
+            LOG.debug("{} queued {}",this,entry);
+        }
+
+        flusher.iterate();
+    }
+
+    protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
+    {
+        try
+        {
+            if (callback != null)
+            {
+                callback.writeFailed(failure);
+            }
+        }
+        catch (Throwable x)
+        {
+            LOG.debug("Exception while notifying failure of callback " + callback,x);
+        }
+    }
+
+    protected void notifyCallbackSuccess(WriteCallback callback)
+    {
+        try
+        {
+            if (callback != null)
+            {
+                callback.writeSuccess();
+            }
+        }
+        catch (Throwable x)
+        {
+            LOG.debug("Exception while notifying success of callback " + callback,x);
+        }
+    }
+
+    protected void onFailure(Throwable x)
+    {
+        LOG.warn(x);
+    }
+
+    @Override
+    public String toString()
+    {
+        ByteBuffer aggregate = flusher.aggregate;
+        return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
+                failure);
+    }
 }
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java
index 6d4dbc4..815d2d3 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java
@@ -139,6 +139,10 @@
     {
         for (ConnectionStateListener listener : listeners)
         {
+            if (LOG.isDebugEnabled())
+            {
+                LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
+            }
             listener.onConnectionStateChange(state);
         }
     }
@@ -166,8 +170,7 @@
             }
 
             this.state = ConnectionState.CLOSED;
-            if (closeInfo == null)
-                this.closeInfo = close;
+            this.closeInfo = close;
             this.inputAvailable = false;
             this.outputAvailable = false;
             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@@ -193,16 +196,16 @@
 
         if (initialState == ConnectionState.CONNECTED)
         {
-            // fast close. a local close request from end-user onConnected() method
+            // fast close. a local close request from end-user onConnect/onOpen method
             LOG.debug("FastClose in CONNECTED detected");
             // Force the state open (to allow read/write to endpoint)
             onOpened();
+            LOG.debug("FastClose continuing with Closure");
         }
 
         synchronized (this)
         {
-            if (closeInfo == null)
-                closeInfo = close;
+            closeInfo = close;
 
             boolean in = inputAvailable;
             boolean out = outputAvailable;
@@ -236,7 +239,6 @@
             LOG.debug("notifying state listeners: {}",event);
             notifyStateListeners(event);
 
-            /*
             // if abnormal, we don't expect an answer.
             if (close.isAbnormal())
             {
@@ -253,7 +255,6 @@
                 notifyStateListeners(event);
                 return;
             }
-            */
         }
     }
 
@@ -272,8 +273,7 @@
                 return;
             }
 
-            if (closeInfo == null)
-                closeInfo = close;
+            closeInfo = close;
 
             boolean in = inputAvailable;
             boolean out = outputAvailable;
@@ -360,7 +360,7 @@
             // already opened
             return;
         }
-        
+
         if (this.state != ConnectionState.CONNECTED)
         {
             LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
@@ -394,12 +394,11 @@
                 return;
             }
 
-            CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
+            CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Read EOF");
 
             this.cleanClose = false;
             this.state = ConnectionState.CLOSED;
-            if (closeInfo == null)
-                this.closeInfo = close;
+            this.closeInfo = close;
             this.inputAvailable = false;
             this.outputAvailable = false;
             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@@ -408,6 +407,58 @@
         notifyStateListeners(event);
     }
 
+    public void onDisconnected()
+    {
+        ConnectionState event = null;
+        synchronized (this)
+        {
+            if (this.state == ConnectionState.CLOSED)
+            {
+                // already closed
+                return;
+            }
+
+            CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
+
+            this.cleanClose = false;
+            this.state = ConnectionState.CLOSED;
+            this.closeInfo = close;
+            this.inputAvailable = false;
+            this.outputAvailable = false;
+            this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
+            event = this.state;
+        }
+        notifyStateListeners(event);
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder str = new StringBuilder();
+        str.append(this.getClass().getSimpleName());
+        str.append("@").append(Integer.toHexString(hashCode()));
+        str.append("[").append(state);
+        str.append(',');
+        if (!inputAvailable)
+        {
+            str.append('!');
+        }
+        str.append("in,");
+        if (!outputAvailable)
+        {
+            str.append('!');
+        }
+        str.append("out");
+        if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
+        {
+            str.append(",close=").append(closeInfo);
+            str.append(",clean=").append(cleanClose);
+            str.append(",closeSource=").append(closeHandshakeSource);
+        }
+        str.append(']');
+        return str.toString();
+    }
+
     public boolean wasAbnormalClose()
     {
         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
@@ -427,4 +478,5 @@
     {
         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
     }
+
 }
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java
index 32b9c1b..2194ec5 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java
@@ -392,4 +392,4 @@
         }
         return name;
     }
-}
+}
\ No newline at end of file
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java
new file mode 100644
index 0000000..a40e306
--- /dev/null
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java
@@ -0,0 +1,166 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.websocket.common;
+
+import static org.eclipse.jetty.websocket.api.StatusCode.*;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.websocket.api.ProtocolException;
+import org.eclipse.jetty.websocket.common.frames.CloseFrame;
+import org.junit.Test;
+
+public class CloseInfoTest
+{
+    /**
+     * A test where no close is provided
+     */
+    @Test
+    public void testAnonymousClose()
+    {
+        CloseInfo close = new CloseInfo();
+        assertThat("close.code",close.getStatusCode(),is(NO_CODE));
+        assertThat("close.reason",close.getReason(),nullValue());
+
+        CloseFrame frame = close.asFrame();
+        assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+        // should result in no payload
+        assertThat("close frame has payload",frame.hasPayload(),is(false));
+        assertThat("close frame payload length",frame.getPayloadLength(),is(0));
+    }
+    
+    /**
+     * A test where NO_CODE (1005) is provided
+     */
+    @Test
+    public void testNoCode()
+    {
+        CloseInfo close = new CloseInfo(NO_CODE);
+        assertThat("close.code",close.getStatusCode(),is(NO_CODE));
+        assertThat("close.reason",close.getReason(),nullValue());
+
+        CloseFrame frame = close.asFrame();
+        assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+        // should result in no payload
+        assertThat("close frame has payload",frame.hasPayload(),is(false));
+        assertThat("close frame payload length",frame.getPayloadLength(),is(0));
+    }
+    
+    /**
+     * A test where NO_CLOSE (1006) is provided
+     */
+    @Test
+    public void testNoClose()
+    {
+        CloseInfo close = new CloseInfo(NO_CLOSE);
+        assertThat("close.code",close.getStatusCode(),is(NO_CLOSE));
+        assertThat("close.reason",close.getReason(),nullValue());
+
+        CloseFrame frame = close.asFrame();
+        assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+        // should result in no payload
+        assertThat("close frame has payload",frame.hasPayload(),is(false));
+        assertThat("close frame payload length",frame.getPayloadLength(),is(0));
+    }
+
+    /**
+     * A test of FAILED_TLS_HANDSHAKE (1007)
+     */
+    @Test
+    public void testFailedTlsHandshake()
+    {
+        CloseInfo close = new CloseInfo(FAILED_TLS_HANDSHAKE);
+        assertThat("close.code",close.getStatusCode(),is(FAILED_TLS_HANDSHAKE));
+        assertThat("close.reason",close.getReason(),nullValue());
+
+        try
+        {
+            @SuppressWarnings("unused")
+            CloseFrame frame = close.asFrame();
+            fail("Expected " + ProtocolException.class.getName());
+        }
+        catch (ProtocolException e)
+        {
+            // expected path
+            assertThat("ProtocolException message",e.getMessage(),containsString("not allowed (per RFC6455)"));
+        }
+    }
+
+    /**
+     * A test of NORMAL (1000)
+     */
+    @Test
+    public void testNormal()
+    {
+        CloseInfo close = new CloseInfo(NORMAL);
+        assertThat("close.code",close.getStatusCode(),is(NORMAL));
+        assertThat("close.reason",close.getReason(),nullValue());
+
+        CloseFrame frame = close.asFrame();
+        assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+        assertThat("close frame payload length",frame.getPayloadLength(),is(2));
+    }
+    
+    private ByteBuffer asByteBuffer(int statusCode, String reason)
+    {
+        int len = 2; // status code length
+        byte utf[] = null;
+        if (StringUtil.isNotBlank(reason))
+        {
+            utf = StringUtil.getUtf8Bytes(reason);
+            len += utf.length;
+        }
+
+        ByteBuffer buf = BufferUtil.allocate(len);
+        BufferUtil.flipToFill(buf);
+        buf.put((byte)((statusCode >>> 8) & 0xFF));
+        buf.put((byte)((statusCode >>> 0) & 0xFF));
+
+        if (utf != null)
+        {
+            buf.put(utf,0,utf.length);
+        }
+        BufferUtil.flipToFlush(buf,0);
+        
+        return buf;
+    }
+    
+    @Test
+    public void testFromFrame()
+    {
+        ByteBuffer payload = asByteBuffer(NORMAL,null);
+        assertThat("payload length", payload.remaining(), is(2));
+        CloseFrame frame = new CloseFrame();
+        frame.setPayload(payload);
+        
+        // create from frame
+        CloseInfo close = new CloseInfo(frame);
+        assertThat("close.code",close.getStatusCode(),is(NORMAL));
+        assertThat("close.reason",close.getReason(),nullValue());
+
+        // and back again
+        frame = close.asFrame();
+        assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
+        assertThat("close frame payload length",frame.getPayloadLength(),is(2));
+    }
+}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java
index fe6011d..6b0f018 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.eclipse.jetty.util.log.StacklessLogging;
+import org.eclipse.jetty.websocket.api.ProtocolException;
 import org.eclipse.jetty.websocket.api.WebSocketBehavior;
 import org.eclipse.jetty.websocket.api.WebSocketException;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@@ -39,8 +40,7 @@
     {
         ByteBuffer expected = ByteBuffer.allocate(32);
 
-        expected.put(new byte[]
-                { (byte)0x8b, 0x00 });
+        expected.put(new byte[] { (byte)0x8b, 0x00 });
 
         expected.flip();
 
@@ -50,10 +50,17 @@
         {
             Parser parser = new UnitParser(policy);
             parser.setIncomingFramesHandler(capture);
-            parser.parse(expected);
+            try
+            {
+                parser.parse(expected);
+            }
+            catch (ProtocolException ignore)
+            {
+                // ignore
+            }
         }
 
-        Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+        Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
 
         Throwable known = capture.getErrors().poll();
 
@@ -65,8 +72,7 @@
     {
         ByteBuffer expected = ByteBuffer.allocate(32);
 
-        expected.put(new byte[]
-                { (byte)0x8c, 0x01, 0x00 });
+        expected.put(new byte[] { (byte)0x8c, 0x01, 0x00 });
 
         expected.flip();
 
@@ -76,24 +82,29 @@
         {
             Parser parser = new UnitParser(policy);
             parser.setIncomingFramesHandler(capture);
-            parser.parse(expected);
+            try
+            {
+                parser.parse(expected);
+            }
+            catch (ProtocolException ignore)
+            {
+                // ignore
+            }
         }
 
-        Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+        Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
 
         Throwable known = capture.getErrors().poll();
 
         Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 12"));
     }
 
-
     @Test
     public void testParserNonControlOpCode3Case4_1_1() throws Exception
     {
         ByteBuffer expected = ByteBuffer.allocate(32);
 
-        expected.put(new byte[]
-                { (byte)0x83, 0x00 });
+        expected.put(new byte[] { (byte)0x83, 0x00 });
 
         expected.flip();
 
@@ -103,10 +114,17 @@
         {
             Parser parser = new UnitParser(policy);
             parser.setIncomingFramesHandler(capture);
-            parser.parse(expected);
+            try
+            {
+                parser.parse(expected);
+            }
+            catch (ProtocolException ignore)
+            {
+                // ignore
+            }
         }
 
-        Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+        Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
 
         Throwable known = capture.getErrors().poll();
 
@@ -118,8 +136,7 @@
     {
         ByteBuffer expected = ByteBuffer.allocate(32);
 
-        expected.put(new byte[]
-                { (byte)0x84, 0x01, 0x00 });
+        expected.put(new byte[] { (byte)0x84, 0x01, 0x00 });
 
         expected.flip();
 
@@ -129,10 +146,17 @@
         {
             Parser parser = new UnitParser(policy);
             parser.setIncomingFramesHandler(capture);
-            parser.parse(expected);
+            try
+            {
+                parser.parse(expected);
+            }
+            catch (ProtocolException ignore)
+            {
+                // ignore
+            }
         }
 
-        Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
+        Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
 
         Throwable known = capture.getErrors().poll();
 
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java
index bd02460..dedba3e 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java
@@ -18,7 +18,8 @@
 
 package org.eclipse.jetty.websocket.common.test;
 
-import java.io.Closeable;
+import static org.hamcrest.Matchers.*;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -68,10 +69,6 @@
 import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
 import org.junit.Assert;
 
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-
 /**
  * A simple websocket client for performing unit tests with.
  * <p>
@@ -84,7 +81,7 @@
  * with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
  * scope.
  */
-public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, Closeable
+public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, AutoCloseable
 {
     private static final String REQUEST_HASH_KEY = "dGhlIHNhbXBsZSBub25jZQ==";
     private static final int BUFFER_SIZE = 8192;
@@ -182,22 +179,14 @@
 
     public void close(int statusCode, String message)
     {
+        LOG.debug("close({},{})",statusCode,message);
         CloseInfo close = new CloseInfo(statusCode,message);
 
-        ioState.onCloseLocal(close);
-
         if (!ioState.isClosed())
         {
-            WebSocketFrame frame = close.asFrame();
-            LOG.debug("Issuing: {}",frame);
-            try
-            {
-                write(frame);
-            }
-            catch (IOException e)
-            {
-                LOG.debug(e);
-            }
+            ioState.onCloseLocal(close);
+        } else {
+            LOG.debug("Not issuing close. ioState = {}",ioState);
         }
     }
 
@@ -429,13 +418,8 @@
         {
             LOG.info("Client parsed {} frames",count);
         }
-
-        if (frame.getOpCode() == OpCode.CLOSE)
-        {
-            CloseInfo close = new CloseInfo(frame);
-            ioState.onCloseRemote(close);
-        }
-
+        
+        // Capture Frame Copy
         WebSocketFrame copy = WebSocketFrame.copy(frame);
         incomingFrames.incomingFrame(copy);
     }
@@ -448,6 +432,7 @@
     @Override
     public void onConnectionStateChange(ConnectionState state)
     {
+        LOG.debug("CLIENT onConnectionStateChange() - {}", state);
         switch (state)
         {
             case CLOSED:
@@ -455,10 +440,17 @@
                 // this.disconnect();
                 break;
             case CLOSING:
-                if (ioState.wasRemoteCloseInitiated())
+                CloseInfo close = ioState.getCloseInfo();
+                
+                WebSocketFrame frame = close.asFrame();
+                LOG.debug("Issuing: {}",frame);
+                try
                 {
-                    CloseInfo close = ioState.getCloseInfo();
-                    close(close.getStatusCode(),close.getReason());
+                    write(frame);
+                }
+                catch (IOException e)
+                {
+                    LOG.debug(e);
                 }
                 break;
             default:
@@ -701,6 +693,7 @@
     {
         if (!ioState.isOpen())
         {
+            LOG.debug("IO Not Open / Not Writing: {}",frame);
             return;
         }
         LOG.debug("write(Frame->{}) to {}",frame,outgoing);
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java
index c3cd1ad..1b665bd 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java
@@ -54,6 +54,7 @@
 import org.eclipse.jetty.websocket.api.extensions.Frame;
 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
+import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
 import org.eclipse.jetty.websocket.common.AcceptHash;
 import org.eclipse.jetty.websocket.common.CloseInfo;
 import org.eclipse.jetty.websocket.common.Generator;
@@ -124,7 +125,6 @@
         {
             write(new CloseFrame());
             flush();
-            disconnect();
         }
 
         public void close(int statusCode) throws IOException
@@ -132,7 +132,6 @@
             CloseInfo close = new CloseInfo(statusCode);
             write(close.asFrame());
             flush();
-            disconnect();
         }
 
         public void disconnect()
@@ -229,6 +228,19 @@
                 CloseInfo close = new CloseInfo(frame);
                 LOG.debug("Close frame: {}",close);
             }
+
+            Type type = frame.getType();
+            if (echoing.get() && (type.isData() || type.isContinuation()))
+            {
+                try
+                {
+                    write(WebSocketFrame.copy(frame));
+                }
+                catch (IOException e)
+                {
+                    LOG.warn(e);
+                }
+            }
         }
 
         @Override
@@ -317,9 +329,18 @@
             return len;
         }
 
+        /**
+         * @deprecated use {@link #readFrames(int, int, TimeUnit)} for correct parameter order
+         */
+        @Deprecated
         public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
         {
-            LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
+            return readFrames(expectedCount,timeoutDuration,timeoutUnit);
+        }
+
+        public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException
+        {
+            LOG.debug("Read: waiting for {} frame(s) from client",expectedCount);
             int startCount = incomingFrames.size();
 
             ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
@@ -562,13 +583,22 @@
         public void write(Frame frame) throws IOException
         {
             LOG.debug("write(Frame->{}) to {}",frame,outgoing);
-            outgoing.outgoingFrame(frame,null, BatchMode.OFF);
+            outgoing.outgoingFrame(frame,null,BatchMode.OFF);
         }
 
         public void write(int b) throws IOException
         {
             getOutputStream().write(b);
         }
+
+        public void write(ByteBuffer buf) throws IOException
+        {
+            byte arr[] = BufferUtil.toArray(buf);
+            if ((arr != null) && (arr.length > 0))
+            {
+                getOutputStream().write(arr);
+            }
+        }
     }
 
     private static final Logger LOG = Log.getLogger(BlockheadServer.class);
diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
index bebe7c0..f6a2f2c 100644
--- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
+++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
@@ -44,6 +44,7 @@
 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
 import org.eclipse.jetty.util.thread.Scheduler;
 import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
+import org.eclipse.jetty.websocket.api.StatusCode;
 import org.eclipse.jetty.websocket.api.WebSocketException;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@@ -199,11 +200,23 @@
         }
     }
 
-    protected void closeAllConnections()
+    protected void shutdownAllConnections()
     {
         for (WebSocketSession session : openSessions)
         {
-            session.close();
+            if (session.getConnection() != null)
+            {
+                try
+                {
+                    session.getConnection().close(
+                            StatusCode.SHUTDOWN,
+                            "Shutdown");
+                }
+                catch (Throwable t)
+                {
+                    LOG.debug("During Shutdown All Connections",t);
+                }
+            }
         }
         openSessions.clear();
     }
@@ -269,7 +282,7 @@
     @Override
     protected void doStop() throws Exception
     {
-        closeAllConnections();
+        shutdownAllConnections();
         super.doStop();
     }
 
diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
index 0c2ddb3..5a4707c 100644
--- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
+++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
@@ -114,7 +114,7 @@
         public void onWebSocketConnect(Session sess)
         {
             LOG.debug("onWebSocketConnect({})",sess);
-            sess.close();
+            sess.close(StatusCode.NORMAL,"FastCloseServer");
         }
     }
 
@@ -129,14 +129,10 @@
         public void onWebSocketConnect(Session sess)
         {
             LOG.debug("onWebSocketConnect({})",sess);
+            // Test failure due to unhandled exception
+            // this should trigger a fast-fail closure during open/connect
             throw new RuntimeException("Intentional FastFail");
         }
-
-        @Override
-        public void onWebSocketError(Throwable cause)
-        {
-            errors.add(cause);
-        }
     }
 
     private static final Logger LOG = Log.getLogger(WebSocketCloseTest.class);
@@ -163,30 +159,28 @@
     @Test
     public void testFastClose() throws Exception
     {
-        BlockheadClient client = new BlockheadClient(server.getServerUri());
-        client.setProtocols("fastclose");
-        client.setTimeout(TimeUnit.SECONDS,1);
-        try
+        try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
         {
+            client.setProtocols("fastclose");
+            client.setTimeout(TimeUnit.SECONDS,1);
             client.connect();
             client.sendStandardRequest();
             client.expectUpgradeResponse();
 
+            // Verify that client got close frame
             IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
             WebSocketFrame frame = capture.getFrames().poll();
             Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
             CloseInfo close = new CloseInfo(frame);
             Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
-
+            
+            // Notify server of close handshake
             client.write(close.asFrame()); // respond with close
-
+            
+            // ensure server socket got close event
             Assert.assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
             Assert.assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     /**
@@ -195,11 +189,10 @@
     @Test
     public void testFastFail() throws Exception
     {
-        BlockheadClient client = new BlockheadClient(server.getServerUri());
-        client.setProtocols("fastfail");
-        client.setTimeout(TimeUnit.SECONDS,1);
-        try
+        try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
         {
+            client.setProtocols("fastfail");
+            client.setTimeout(TimeUnit.SECONDS,1);
             try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
             {
                 client.connect();
@@ -214,14 +207,11 @@
 
                 client.write(close.asFrame()); // respond with close
 
+                // ensure server socket got close event
                 Assert.assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
                 Assert.assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR));
                 Assert.assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1));
             }
         }
-        finally
-        {
-            client.close();
-        }
     }
 }