Avoid creating a thread per PacketReader
diff --git a/platform/sensinact-generic/src/main/java/org/eclipse/sensinact/gateway/generic/packet/AbstractPacketReader.java b/platform/sensinact-generic/src/main/java/org/eclipse/sensinact/gateway/generic/packet/AbstractPacketReader.java
index b731fb8..16d87da 100644
--- a/platform/sensinact-generic/src/main/java/org/eclipse/sensinact/gateway/generic/packet/AbstractPacketReader.java
+++ b/platform/sensinact-generic/src/main/java/org/eclipse/sensinact/gateway/generic/packet/AbstractPacketReader.java
@@ -11,9 +11,7 @@
package org.eclipse.sensinact.gateway.generic.packet;
import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,64 +24,41 @@
public abstract class AbstractPacketReader<P extends Packet> implements PacketReader<P> {
private static final Logger LOG = LoggerFactory.getLogger(PacketReader.class);
-
- protected CountDownLatch countDown;
- protected ExecutorService worker;
+
protected PayloadFragment subPacket;
/**
* Constructor
*/
protected AbstractPacketReader() {
- this.countDown = new CountDownLatch(1);
- this.worker = Executors.newSingleThreadExecutor();
}
@Override
public Iterator<PayloadFragment> iterator() {
return new Iterator<PayloadFragment>() {
- private void parse (){
- AbstractPacketReader.this.worker.submit(new Runnable() {
- @Override
- public void run() {
- try {
- AbstractPacketReader.this.parse();
- } catch (InvalidPacketException e) {
- LOG.error(e.getMessage(),e);
- }
- }
-
- });
- }
-
- private void await() {
- try {
- AbstractPacketReader.this.countDown.await();
- AbstractPacketReader.this.countDown = new CountDownLatch(1);
- } catch (InterruptedException e) {
- Thread.interrupted();
- AbstractPacketReader.this.setSubPacket(PayloadFragment.EOF_FRAGMENT);
- }
- }
-
@Override
public boolean hasNext() {
if(AbstractPacketReader.this.subPacket == null) {
- parse();
- await();
+ try {
+ parse();
+ if(subPacket == null) {
+ LOG.error("Parsing a packet did not return a fragment");
+ setSubPacket(PayloadFragment.EOF_FRAGMENT);
+ }
+ } catch (InvalidPacketException e) {
+ LOG.error("An exception occurred parsing a packet",e);
+ setSubPacket(PayloadFragment.EOF_FRAGMENT);
+ }
}
return PayloadFragment.EOF_FRAGMENT != AbstractPacketReader.this.subPacket;
}
@Override
public PayloadFragment next() {
- if(AbstractPacketReader.this.subPacket == null) {
- parse();
- await();
+ if(!hasNext()) {
+ throw new NoSuchElementException("There are no more sub-packets");
}
- if(PayloadFragment.EOF_FRAGMENT == AbstractPacketReader.this.subPacket)
- return null;
PayloadFragment fragment = AbstractPacketReader.this.subPacket;
AbstractPacketReader.this.subPacket = null;
return fragment;
@@ -101,6 +76,5 @@
if (subPacket == null)
return;
this.subPacket = subPacket;
- this.countDown.countDown();
}
}