435322 Added a idleTimeout to the SharedBlockerCallback
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
index 602e484..168a19d 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
@@ -48,7 +48,7 @@
  */
 public class SharedBlockingCallback
 {
-    private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
+    static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
 
     final ReentrantLock _lock = new ReentrantLock();
     final Condition _idle = _lock.newCondition();
@@ -128,6 +128,12 @@
         return _blocker;
     }
 
+    protected void notComplete(Blocker blocker)
+    {
+        LOG.warn("Blocker not complete {}",blocker);
+        if (LOG.isDebugEnabled())
+            LOG.debug(new Throwable());
+    }
     
     /* ------------------------------------------------------------ */
     /** A Closeable Callback.
@@ -152,8 +158,8 @@
                     _state = SUCCEEDED;
                     _complete.signalAll();
                 }
-                else if (_state == IDLE)
-                    throw new IllegalStateException("IDLE");
+                else
+                    throw new IllegalStateException(_state);
             }
             finally
             {
@@ -169,11 +175,17 @@
             {
                 if (_state == null)
                 {
-                    _state = cause==null?FAILED:cause;
+                    if (cause==null)
+                        _state=FAILED;
+                    else if (cause instanceof BlockerTimeoutException)
+                        // Not this blockers timeout
+                        _state=new IOException(cause);
+                    else 
+                        _state=cause;
                     _complete.signalAll();
                 }
-                else if (_state == IDLE)
-                    throw new IllegalStateException("IDLE",cause);
+                else 
+                    throw new IllegalStateException(_state);
             }
             finally
             {
@@ -202,15 +214,9 @@
                     if (idle>0)
                     {
                         if (!_complete.await(idle,TimeUnit.MILLISECONDS))
-                        {
                             // The callback has not arrived in sufficient time.
-                            // We will synthesize a TimeoutException and then
-                            // create a new Blocker, so that any late arriving callback
-                            // does not cause a problem with the next cycle.
-                            _state=new TimeoutException("No Blocker CB");
-                            LOG.warn(_state);
-                            _blocker=new Blocker();
-                        }
+                            // We will synthesize a TimeoutException 
+                            _state=new BlockerTimeoutException();
                     }
                     else
                         _complete.await();
@@ -260,17 +266,19 @@
                 if (_state == IDLE)
                     throw new IllegalStateException("IDLE");
                 if (_state == null)
-                {
-                    LOG.warn("Blocker not complete {}",this);
-                    if (LOG.isDebugEnabled())
-                        LOG.debug(new Throwable());
-                }
+                    notComplete(this);
             }
             finally
             {
                 try 
                 {
-                    _state = IDLE;
+                    // If the blocker timed itself out, remember the state
+                    if (_state instanceof BlockerTimeoutException)
+                        // and create a new Blocker
+                        _blocker=new Blocker();
+                    else
+                        // else reuse Blocker
+                        _state = IDLE;
                     _idle.signalAll();
                     _complete.signalAll();
                 } 
@@ -295,4 +303,8 @@
             }
         }
     }
+    
+    private class BlockerTimeoutException extends TimeoutException
+    { 
+    }
 }
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java
index 4ee93a6..c06e507 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java
@@ -18,9 +18,17 @@
 
 package org.eclipse.jetty.util;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
 import org.hamcrest.Matchers;
@@ -29,7 +37,23 @@
 
 public class SharedBlockingCallbackTest
 {
-    final SharedBlockingCallback sbcb= new SharedBlockingCallback();
+    final AtomicInteger notComplete = new AtomicInteger();
+    final SharedBlockingCallback sbcb= new SharedBlockingCallback()
+    {
+        @Override
+        protected long getIdleTimeout()
+        {
+            return 150;
+        }
+
+        @Override
+        protected void notComplete(Blocker blocker)
+        {
+            super.notComplete(blocker);
+            notComplete.incrementAndGet();
+        }
+
+    };
     
     public SharedBlockingCallbackTest()
     {
@@ -46,7 +70,8 @@
             start=System.currentTimeMillis();
             blocker.block();
         }
-        Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));     
+        Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));  
+        Assert.assertEquals(0,notComplete.get());   
     }
     
     @Test
@@ -74,6 +99,7 @@
         }
         Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); 
         Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); 
+        Assert.assertEquals(0,notComplete.get());   
     }
     
     @Test
@@ -95,7 +121,8 @@
             start=System.currentTimeMillis();
             Assert.assertEquals(ex,ee.getCause());
         }
-        Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));     
+        Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));    
+        Assert.assertEquals(0,notComplete.get());    
     }
     
     @Test
@@ -133,6 +160,7 @@
         }
         Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); 
         Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
+        Assert.assertEquals(0,notComplete.get());   
     }
 
 
@@ -174,6 +202,58 @@
             blocker.succeeded();
             blocker.block();
         };
-        Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));   
+        Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); 
+        Assert.assertEquals(0,notComplete.get());     
+    }
+
+    @Test
+    public void testBlockerClose() throws Exception
+    {
+        try (Blocker blocker=sbcb.acquire())
+        {
+            SharedBlockingCallback.LOG.info("Blocker not complete "+blocker+" warning is expected...");
+        }
+        
+        Assert.assertEquals(1,notComplete.get());
+    }
+    
+    @Test
+    public void testBlockerTimeout() throws Exception
+    {
+        Blocker b0=null;
+        try
+        {
+            try (Blocker blocker=sbcb.acquire())
+            {
+                b0=blocker;
+                Thread.sleep(400);
+                blocker.block();
+            }
+            fail();
+        }
+        catch(IOException e)
+        {
+            Throwable cause = e.getCause();
+            assertThat(cause,instanceOf(TimeoutException.class));
+        }
+        
+        Assert.assertEquals(0,notComplete.get());
+        
+
+        try (Blocker blocker=sbcb.acquire())
+        {
+            assertThat(blocker,not(equalTo(b0)));
+            try
+            {
+                b0.succeeded();
+                fail();
+            }
+            catch(Exception e)
+            {
+                assertThat(e,instanceOf(IllegalStateException.class));
+                assertThat(e.getCause(),instanceOf(TimeoutException.class));
+            }
+            blocker.succeeded();
+        }
     }
 }