Merge "Make StackEngine share threads"
diff --git a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java
index 0337026..f72d721 100644
--- a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java
+++ b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java
@@ -10,6 +10,10 @@
  */
 package org.eclipse.sensinact.gateway.util.stack;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Abstract implementation of a {@link StackEnginetHandler}
  *
@@ -22,18 +26,13 @@
      */
     protected final StackEngine<E, StackEngineHandler<E>> eventEngine;
 
-    protected final Thread stackEngineThread;
-
     /**
      * Constructor
      */
     //TODO : allow restart by defining a separated start method
     public AbstractStackEngineHandler() {
         //instantiate the engine
-        this.eventEngine = new StackEngine<E, StackEngineHandler<E>>(this);
-        //start the engine
-        this.stackEngineThread = new Thread(eventEngine);
-        this.stackEngineThread.start();
+        this.eventEngine = new StackEngine<E, StackEngineHandler<E>>(this, getWorker());
     }
 
     /**
@@ -42,6 +41,14 @@
     public void stop() {
         //stop the engine
         this.eventEngine.stop();
+        try {
+            this.eventEngine.awaitTermination();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } finally {
+            releaseWorker();
+        }
     }
 
     /**
@@ -53,10 +60,39 @@
         //wait for the stack emptiness for stopping
         this.eventEngine.closeWhenEmpty();
         try {
-            this.stackEngineThread.join();
-
+            this.eventEngine.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            this.stackEngineThread.interrupt();
+            Thread.currentThread().interrupt();
+        } finally {
+            releaseWorker();
         }
     }
+    
+    private static ScheduledExecutorService sharedExecutor;
+    private static long referenceCount = 0;
+    
+    private static ScheduledExecutorService getWorker() {
+        synchronized (AbstractStackEngineHandler.class) {
+            referenceCount++;
+            
+            if(sharedExecutor == null) {
+                ScheduledThreadPoolExecutor worker = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "Stack Engine Worker Thread"));
+                worker.setMaximumPoolSize(8);
+                sharedExecutor = worker;
+            }
+            return sharedExecutor;
+        }
+    }
+
+    private static void releaseWorker() {
+        synchronized (AbstractStackEngineHandler.class) {
+            referenceCount--;
+            
+            if(referenceCount == 0) {
+                sharedExecutor.shutdownNow();
+                sharedExecutor = null;
+            }
+        }
+    }
+    
 }
diff --git a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java
index c676ffb..b389977 100644
--- a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java
+++ b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java
@@ -12,8 +12,11 @@
 
 import java.util.Deque;
 import java.util.LinkedList;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -23,59 +26,62 @@
  *
  * @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
  */
-public class StackEngine<E, H extends StackEngineHandler<E>> implements Runnable {
+public class StackEngine<E, H extends StackEngineHandler<E>> {
     private final int UNLIMITED_SIZE = -1;
     private final Object lock = new Object();
     private int maxStackSize;
 
     private final Deque<E> elements;
     private final H handler;
+    private final AtomicBoolean locked;
+
+    private final Semaphore semaphore;
+    private final CountDownLatch completionLatch;
     private final AtomicBoolean running;
     private final AtomicBoolean closing;
-    private final AtomicBoolean locked;
-    private TimerTask timer;
-
-
+    
+    private final ScheduledExecutorService worker;
+    
+    private ScheduledFuture<?> unlockTask;
+    
     /**
      * Constructor
      */
-    public StackEngine(H handler) {
+    StackEngine(H handler, ScheduledExecutorService worker) {
         this.handler = handler;
         this.elements = new LinkedList<E>();
-        this.running = new AtomicBoolean(false);
+        this.semaphore = new Semaphore(1);
+        this.completionLatch = new CountDownLatch(1);
+        this.running = new AtomicBoolean(true);
         this.closing = new AtomicBoolean(false);
         this.locked = new AtomicBoolean(false);
+        this.worker = worker;
         this.maxStackSize = UNLIMITED_SIZE;
+        requestProcessingIfNeeded();
+    }
+    
+    void requestProcessingIfNeeded() {
+        if(running.get() && semaphore.tryAcquire()) {
+            worker.execute(this::dequeue);
+        }
     }
 
     /**
-     * @inheritDoc
-     * @see java.lang.Runnable#run()
+     * Dequeue and handle up to 5 items before releasing the
+     * thread so other StackEngine instances may do their work
+     * too
      */
-    @Override
-    public void run() {
-        synchronized (lock) {
-            this.running.set(true);
-        }
-        while (running()) {
+    void dequeue() {
+        for(int i = 0; i < 5; i++) {
             E element = pop();
             try {
                 if (element != null) {
                     handler.doHandle(element);
-
                 } else {
                     if (closing.get()) {
                         stop();
-                        break;
                     }
-                    try {
-                        Thread.sleep(150);
-
-                    } catch (InterruptedException e) {
-                        Thread.interrupted();
-                        stop();
-                        break;
-                    }
+                    break;
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -83,8 +89,20 @@
                 break;
             }
         }
+        
+        boolean runAgain;
         synchronized (lock) {
-            this.elements.clear();
+            runAgain = !elements.isEmpty();
+        }
+        
+        if(runAgain) {
+            worker.execute(this::dequeue);
+        } else {
+            semaphore.release();
+        }
+        
+        if(!running()) {
+            completionLatch.countDown();
         }
     }
 
@@ -109,6 +127,7 @@
         synchronized (lock) {
             this.elements.addLast(element);
         }
+        requestProcessingIfNeeded();
     }
 
     /**
@@ -132,6 +151,7 @@
         synchronized (lock) {
             this.elements.addFirst(element);
         }
+        requestProcessingIfNeeded();
     }
 
     /**
@@ -191,9 +211,7 @@
             return element;
         }
         synchronized (lock) {
-            if (!this.elements.isEmpty()) {
-                element = this.elements.removeFirst();
-            }
+            element = this.elements.pollFirst();
         }
         return element;
     }
@@ -201,9 +219,13 @@
     /**
      * Stops this StackEngine
      */
-    public void stop() {
+    void stop() {
         synchronized (lock) {
             this.running.set(false);
+            elements.clear();
+        }
+        if(semaphore.tryAcquire()) {
+            completionLatch.countDown();
         }
     }
 
@@ -214,7 +236,7 @@
      * @return the running state of this
      * StackEngine
      */
-    public boolean running() {
+    boolean running() {
         boolean running = false;
         synchronized (lock) {
             running = this.running.get();
@@ -244,13 +266,17 @@
      * @param locked the lock state of this
      *               StackEngine
      */
-    protected void locked(boolean locked) {
+    void unlock() {
         synchronized (lock) {
-            if (timer != null) {
-                timer.cancel();
-                timer = null;
+            if(Thread.interrupted()) {
+                return;
             }
-            this.locked.set(locked);
+            
+            if (unlockTask != null) {
+                unlockTask.cancel(false);
+                unlockTask = null;
+            }
+            locked.set(false);
         }
     }
 
@@ -258,25 +284,15 @@
      * Defines this StackEngine's lock state as
      * true for the delay specified as parameter
      *
-     * @param delay
-     * 		the lock delay of this StackEngine
-     */
-    /**
-     * @param delay
+     * @param delay the lock delay of this StackEngine
      */
     public void locked(long delay) {
         synchronized (lock) {
-            if (timer != null) {
-                timer.cancel();
-                timer = null;
+            if (unlockTask != null) {
+                unlockTask.cancel(true);
+                unlockTask = null;
             }
-            timer = new TimerTask() {
-                @Override
-                public void run() {
-                    StackEngine.this.locked(false);
-                }
-            };
-            new Timer().schedule(timer, delay);
+            unlockTask = worker.schedule(this::unlock, delay, TimeUnit.MILLISECONDS);
             this.locked.set(true);
         }
     }
@@ -285,7 +301,24 @@
      * Waits until the stack is empty for closing
      * it
      */
-    public void closeWhenEmpty() {
+    void closeWhenEmpty() {
         this.closing.set(true);
+        synchronized (lock) {
+            if(elements.isEmpty()) {
+                this.running.set(false);
+            }
+        }
+        
+        if(semaphore.tryAcquire()) {
+            completionLatch.countDown();
+        }
+    }
+    
+    void awaitTermination() throws InterruptedException {
+        completionLatch.await(100, TimeUnit.MILLISECONDS);
+    }
+
+    void awaitTermination(long time, TimeUnit unit) throws InterruptedException {
+        completionLatch.await(time, unit);
     }
 }
\ No newline at end of file