486930 - Selector does not correctly handle rejected execution exception

This fix work in two ways:

1) Both the PEC and EPC strategies when confronted with a
RejectedExecutionException will continue to Produce rather than consume.

2) If a produced Runnable cannot be consumed and it supports the new Rejectable interface,
then it's reject() method is called by the producer thread.    Typically this is implemented
to close the connection - with the risk being that the close might block, but that is
probably better than leaking the connection?
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 546095c..0af6827 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -44,6 +44,7 @@
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
 import org.eclipse.jetty.util.thread.ExecutionStrategy;
+import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
 import org.eclipse.jetty.util.thread.Locker;
 import org.eclipse.jetty.util.thread.Scheduler;
 
@@ -541,7 +542,7 @@
         }
     }
 
-    class Accept implements Runnable
+    class Accept implements Runnable, Rejectable
     {
         private final SocketChannel channel;
         private final Object attachment;
@@ -553,6 +554,13 @@
         }
 
         @Override
+        public void reject()
+        {
+            LOG.debug("rejected accept {}",channel);
+            closeNoExceptions(channel);
+        }
+        
+        @Override
         public void run()
         {
             try
@@ -568,7 +576,7 @@
         }
     }
 
-    private class CreateEndPoint implements Product
+    private class CreateEndPoint implements Product, Rejectable
     {
         private final SocketChannel channel;
         private final SelectionKey key;
@@ -593,6 +601,13 @@
             }
         }
 
+        @Override
+        public void reject()
+        {
+            LOG.debug("rejected create {}",channel);
+            closeNoExceptions(channel);
+        }
+
         protected void failed(Throwable failure)
         {
             closeNoExceptions(channel);
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index e7587bd..435c816 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -25,6 +25,7 @@
 
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
 import org.eclipse.jetty.util.thread.Locker;
 import org.eclipse.jetty.util.thread.Scheduler;
 
@@ -67,7 +68,24 @@
             return SelectChannelEndPoint.this.toString()+":runUpdateKey";
         }
     };
-    private final Runnable _runFillable = new Runnable()
+    
+    private abstract class RejectableRunnable implements Runnable,Rejectable
+    {
+        @Override 
+        public void reject()
+        {
+            try
+            {
+                close();
+            }
+            catch (Throwable x)
+            {
+                LOG.warn(x);
+            }
+        }
+    }
+    
+    private final Runnable _runFillable = new RejectableRunnable()
     {
         @Override
         public void run()
@@ -81,7 +99,7 @@
             return SelectChannelEndPoint.this.toString()+":runFillable";
         }
     };
-    private final Runnable _runCompleteWrite = new Runnable()
+    private final Runnable _runCompleteWrite = new RejectableRunnable()
     {
         @Override
         public void run()
@@ -95,7 +113,7 @@
             return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
         }
     };
-    private final Runnable _runFillableCompleteWrite = new Runnable()
+    private final Runnable _runFillableCompleteWrite = new RejectableRunnable()
     {
         @Override
         public void run()
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
index 08f1630..de4a5d2 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
@@ -36,9 +36,12 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.eclipse.jetty.util.BufferUtil;
 import org.eclipse.jetty.util.Callback;
@@ -48,6 +51,7 @@
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.Scheduler;
 import org.eclipse.jetty.util.thread.TimerScheduler;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -121,10 +125,18 @@
         ByteBuffer _in = BufferUtil.allocate(32 * 1024);
         ByteBuffer _out = BufferUtil.allocate(32 * 1024);
         long _last = -1;
+        final CountDownLatch _latch;
 
         public TestConnection(EndPoint endp)
         {
             super(endp, _threadPool);
+            _latch=null;
+        }
+        
+        public TestConnection(EndPoint endp,CountDownLatch latch)
+        {
+            super(endp, _threadPool);
+            _latch=latch;
         }
 
         @Override
@@ -150,6 +162,18 @@
         @Override
         public void onFillable()
         {
+            if (_latch!=null)
+            {
+                try
+                {
+                    _latch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+            }
+            
             Callback blocking = _blockingRead;
             if (blocking!=null)
             {
@@ -668,4 +692,96 @@
         }
         assertFalse(server.isOpen());
     }
+    
+
+    @Test
+    public void testRejectedExecution() throws Exception
+    {
+        _manager.stop();
+        _threadPool.stop();
+        
+        final CountDownLatch latch = new CountDownLatch(1);
+        
+        BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4);
+        _threadPool = new QueuedThreadPool(4,4,60000,q);
+        _manager = new SelectorManager(_threadPool, _scheduler, 1)
+        {
+            @Override
+            public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
+            {
+                return new TestConnection(endpoint,latch);
+            }
+
+            @Override
+            protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
+            {
+                SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
+                _lastEndPoint = endp;
+                _lastEndPointLatch.countDown();
+                return endp;
+            }
+        };
+        
+        _threadPool.start();
+        _manager.start();
+        
+        AtomicInteger timeout = new AtomicInteger();
+        AtomicInteger rejections = new AtomicInteger();
+        AtomicInteger echoed = new AtomicInteger();
+        
+        CountDownLatch closed = new CountDownLatch(10);
+        for (int i=0;i<10;i++)
+        {
+            new Thread()
+            {
+                public void run()
+                {
+                    try(Socket client = newClient();)
+                    {
+                        client.setSoTimeout(5000);
+
+                        SocketChannel server = _connector.accept();
+                        server.configureBlocking(false);
+
+                        _manager.accept(server);
+
+                        // Write client to server
+                        client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
+                        client.getOutputStream().flush();
+                        client.shutdownOutput();
+
+                        // Verify echo server to client
+                        for (char c : "HelloWorld".toCharArray())
+                        {
+                            int b = client.getInputStream().read();
+                            assertTrue(b > 0);
+                            assertEquals(c, (char)b);
+                        }
+                        assertEquals(-1,client.getInputStream().read());
+                        echoed.incrementAndGet();
+                    }
+                    catch(SocketTimeoutException x)
+                    {
+                        x.printStackTrace();
+                        timeout.incrementAndGet();
+                    }
+                    catch(Throwable x)
+                    {
+                        rejections.incrementAndGet();
+                    }
+                    finally
+                    {
+                        closed.countDown();
+                    }
+                }
+            }.start();
+        }
+
+        latch.countDown();
+        closed.await();
+        Assert.assertThat(rejections.get(),Matchers.greaterThan(0));
+        Assert.assertThat(rejections.get(),Matchers.lessThan(10));
+        Assert.assertThat(timeout.get(),Matchers.equalTo(0));
+        Assert.assertThat(echoed.get(),Matchers.equalTo(10-rejections.get()));        
+    }
 }
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
index eef0910..31a17c8 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
@@ -20,6 +20,7 @@
 
 import java.lang.reflect.Constructor;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.eclipse.jetty.util.Loader;
 import org.eclipse.jetty.util.log.Log;
@@ -53,6 +54,15 @@
      */
     public void execute();
 
+    
+    /**
+     * A task that can handle {@link RejectedExecutionException}
+     */
+    public interface Rejectable
+    {
+        public void reject();
+    }
+    
     /**
      * <p>A producer of {@link Runnable} tasks to run.</p>
      * <p>The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
index d305322..c3436ad 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
@@ -19,6 +19,7 @@
 package org.eclipse.jetty.util.thread.strategy;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
@@ -140,7 +141,14 @@
             while (_threadpool!=null && _threadpool.isLowOnThreads())
             {
                 LOG.debug("EWYK low resources {}",this);
-                _lowresources.execute();
+                try
+                {
+                    _lowresources.execute();
+                }
+                catch(Throwable e)
+                {
+                    LOG.warn(e);
+                }
             }
             
             // no longer low resources so produceAndRun normally
@@ -204,13 +212,37 @@
                 // Spawn a new thread to continue production by running the produce loop.
                 if (LOG.isDebugEnabled())
                     LOG.debug("{} dispatch",this);
-                _executor.execute(this);
+                try
+                {
+                    _executor.execute(this);
+                }
+                catch(RejectedExecutionException e)
+                {
+                    // If we cannot execute, the close or discard the task and keep producing
+                    LOG.debug(e);
+                    LOG.warn("RejectedExecution {}",task);
+                    try
+                    {
+                        if (task instanceof Rejectable)
+                            ((Rejectable)task).reject();
+                    }
+                    catch (Exception x)
+                    {
+                        e.addSuppressed(x);
+                        LOG.warn(e);
+                    }
+                    finally
+                    {
+                        task=null;
+                    }
+                }
             }
 
             // Run the task.
             if (LOG.isDebugEnabled())
                 LOG.debug("{} run {}",this,task);
-            task.run();
+            if (task != null)
+                task.run();
             if (LOG.isDebugEnabled())
                 LOG.debug("{} ran {}",this,task);
 
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
index 64903a6..6e7c95e 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
@@ -19,6 +19,7 @@
 package org.eclipse.jetty.util.thread.strategy;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
@@ -55,7 +56,26 @@
                 break;
 
             // Execute the task.
-            _executor.execute(task);
+            try
+            {
+                _executor.execute(task);
+            }
+            catch(RejectedExecutionException e)
+            {
+                // Close or discard tasks that cannot be executed
+                if (task instanceof Rejectable)
+                {
+                    try
+                    {
+                        ((Rejectable)task).reject();
+                    }
+                    catch (Throwable x)
+                    {
+                        e.addSuppressed(x);
+                        LOG.warn(e);
+                    }
+                }
+            }
         }
     }