Merge "Make StackEngine share threads"
diff --git a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java
index 0337026..f72d721 100644
--- a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java
+++ b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/AbstractStackEngineHandler.java
@@ -10,6 +10,10 @@
*/
package org.eclipse.sensinact.gateway.util.stack;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
* Abstract implementation of a {@link StackEnginetHandler}
*
@@ -22,18 +26,13 @@
*/
protected final StackEngine<E, StackEngineHandler<E>> eventEngine;
- protected final Thread stackEngineThread;
-
/**
* Constructor
*/
//TODO : allow restart by defining a separated start method
public AbstractStackEngineHandler() {
//instantiate the engine
- this.eventEngine = new StackEngine<E, StackEngineHandler<E>>(this);
- //start the engine
- this.stackEngineThread = new Thread(eventEngine);
- this.stackEngineThread.start();
+ this.eventEngine = new StackEngine<E, StackEngineHandler<E>>(this, getWorker());
}
/**
@@ -42,6 +41,14 @@
public void stop() {
//stop the engine
this.eventEngine.stop();
+ try {
+ this.eventEngine.awaitTermination();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ releaseWorker();
+ }
}
/**
@@ -53,10 +60,39 @@
//wait for the stack emptiness for stopping
this.eventEngine.closeWhenEmpty();
try {
- this.stackEngineThread.join();
-
+ this.eventEngine.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- this.stackEngineThread.interrupt();
+ Thread.currentThread().interrupt();
+ } finally {
+ releaseWorker();
}
}
+
+ private static ScheduledExecutorService sharedExecutor;
+ private static long referenceCount = 0;
+
+ private static ScheduledExecutorService getWorker() {
+ synchronized (AbstractStackEngineHandler.class) {
+ referenceCount++;
+
+ if(sharedExecutor == null) {
+ ScheduledThreadPoolExecutor worker = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "Stack Engine Worker Thread"));
+ worker.setMaximumPoolSize(8);
+ sharedExecutor = worker;
+ }
+ return sharedExecutor;
+ }
+ }
+
+ private static void releaseWorker() {
+ synchronized (AbstractStackEngineHandler.class) {
+ referenceCount--;
+
+ if(referenceCount == 0) {
+ sharedExecutor.shutdownNow();
+ sharedExecutor = null;
+ }
+ }
+ }
+
}
diff --git a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java
index c676ffb..b389977 100644
--- a/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java
+++ b/platform/sensinact-utils/src/main/java/org/eclipse/sensinact/gateway/util/stack/StackEngine.java
@@ -12,8 +12,11 @@
import java.util.Deque;
import java.util.LinkedList;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -23,59 +26,62 @@
*
* @author <a href="mailto:christophe.munilla@cea.fr">Christophe Munilla</a>
*/
-public class StackEngine<E, H extends StackEngineHandler<E>> implements Runnable {
+public class StackEngine<E, H extends StackEngineHandler<E>> {
private final int UNLIMITED_SIZE = -1;
private final Object lock = new Object();
private int maxStackSize;
private final Deque<E> elements;
private final H handler;
+ private final AtomicBoolean locked;
+
+ private final Semaphore semaphore;
+ private final CountDownLatch completionLatch;
private final AtomicBoolean running;
private final AtomicBoolean closing;
- private final AtomicBoolean locked;
- private TimerTask timer;
-
-
+
+ private final ScheduledExecutorService worker;
+
+ private ScheduledFuture<?> unlockTask;
+
/**
* Constructor
*/
- public StackEngine(H handler) {
+ StackEngine(H handler, ScheduledExecutorService worker) {
this.handler = handler;
this.elements = new LinkedList<E>();
- this.running = new AtomicBoolean(false);
+ this.semaphore = new Semaphore(1);
+ this.completionLatch = new CountDownLatch(1);
+ this.running = new AtomicBoolean(true);
this.closing = new AtomicBoolean(false);
this.locked = new AtomicBoolean(false);
+ this.worker = worker;
this.maxStackSize = UNLIMITED_SIZE;
+ requestProcessingIfNeeded();
+ }
+
+ void requestProcessingIfNeeded() {
+ if(running.get() && semaphore.tryAcquire()) {
+ worker.execute(this::dequeue);
+ }
}
/**
- * @inheritDoc
- * @see java.lang.Runnable#run()
+ * Dequeue and handle up to 5 items before releasing the
+ * thread so other StackEngine instances may do their work
+ * too
*/
- @Override
- public void run() {
- synchronized (lock) {
- this.running.set(true);
- }
- while (running()) {
+ void dequeue() {
+ for(int i = 0; i < 5; i++) {
E element = pop();
try {
if (element != null) {
handler.doHandle(element);
-
} else {
if (closing.get()) {
stop();
- break;
}
- try {
- Thread.sleep(150);
-
- } catch (InterruptedException e) {
- Thread.interrupted();
- stop();
- break;
- }
+ break;
}
} catch (Exception e) {
e.printStackTrace();
@@ -83,8 +89,20 @@
break;
}
}
+
+ boolean runAgain;
synchronized (lock) {
- this.elements.clear();
+ runAgain = !elements.isEmpty();
+ }
+
+ if(runAgain) {
+ worker.execute(this::dequeue);
+ } else {
+ semaphore.release();
+ }
+
+ if(!running()) {
+ completionLatch.countDown();
}
}
@@ -109,6 +127,7 @@
synchronized (lock) {
this.elements.addLast(element);
}
+ requestProcessingIfNeeded();
}
/**
@@ -132,6 +151,7 @@
synchronized (lock) {
this.elements.addFirst(element);
}
+ requestProcessingIfNeeded();
}
/**
@@ -191,9 +211,7 @@
return element;
}
synchronized (lock) {
- if (!this.elements.isEmpty()) {
- element = this.elements.removeFirst();
- }
+ element = this.elements.pollFirst();
}
return element;
}
@@ -201,9 +219,13 @@
/**
* Stops this StackEngine
*/
- public void stop() {
+ void stop() {
synchronized (lock) {
this.running.set(false);
+ elements.clear();
+ }
+ if(semaphore.tryAcquire()) {
+ completionLatch.countDown();
}
}
@@ -214,7 +236,7 @@
* @return the running state of this
* StackEngine
*/
- public boolean running() {
+ boolean running() {
boolean running = false;
synchronized (lock) {
running = this.running.get();
@@ -244,13 +266,17 @@
* @param locked the lock state of this
* StackEngine
*/
- protected void locked(boolean locked) {
+ void unlock() {
synchronized (lock) {
- if (timer != null) {
- timer.cancel();
- timer = null;
+ if(Thread.interrupted()) {
+ return;
}
- this.locked.set(locked);
+
+ if (unlockTask != null) {
+ unlockTask.cancel(false);
+ unlockTask = null;
+ }
+ locked.set(false);
}
}
@@ -258,25 +284,15 @@
* Defines this StackEngine's lock state as
* true for the delay specified as parameter
*
- * @param delay
- * the lock delay of this StackEngine
- */
- /**
- * @param delay
+ * @param delay the lock delay of this StackEngine
*/
public void locked(long delay) {
synchronized (lock) {
- if (timer != null) {
- timer.cancel();
- timer = null;
+ if (unlockTask != null) {
+ unlockTask.cancel(true);
+ unlockTask = null;
}
- timer = new TimerTask() {
- @Override
- public void run() {
- StackEngine.this.locked(false);
- }
- };
- new Timer().schedule(timer, delay);
+ unlockTask = worker.schedule(this::unlock, delay, TimeUnit.MILLISECONDS);
this.locked.set(true);
}
}
@@ -285,7 +301,24 @@
* Waits until the stack is empty for closing
* it
*/
- public void closeWhenEmpty() {
+ void closeWhenEmpty() {
this.closing.set(true);
+ synchronized (lock) {
+ if(elements.isEmpty()) {
+ this.running.set(false);
+ }
+ }
+
+ if(semaphore.tryAcquire()) {
+ completionLatch.countDown();
+ }
+ }
+
+ void awaitTermination() throws InterruptedException {
+ completionLatch.await(100, TimeUnit.MILLISECONDS);
+ }
+
+ void awaitTermination(long time, TimeUnit unit) throws InterruptedException {
+ completionLatch.await(time, unit);
}
}
\ No newline at end of file