Lock the add/remove methods
diff --git a/platform/southbound/mqtt/smart-topic-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/smarttopic/MqttPojoConfigTracker.java b/platform/southbound/mqtt/smart-topic-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/smarttopic/MqttPojoConfigTracker.java
index 337a5a1..8854a63 100644
--- a/platform/southbound/mqtt/smart-topic-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/smarttopic/MqttPojoConfigTracker.java
+++ b/platform/southbound/mqtt/smart-topic-device/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/smarttopic/MqttPojoConfigTracker.java
@@ -52,6 +52,7 @@
private static final Logger LOG = LoggerFactory.getLogger(MqttActivator.class);
private final BundleContext bundleContext;
private MqttProtocolStackEndpoint endpoint;
+ private static final Object locker=new Object();
public MqttPojoConfigTracker(MqttProtocolStackEndpoint endpoint, BundleContext bundleContext) {
this.bundleContext = bundleContext;
@@ -74,56 +75,59 @@
@Override
public Object addingService(ServiceReference serviceReference) {
- LOG.info("Loading POJO device configuration");
- final Provider provider = (Provider) bundleContext.getService(serviceReference);
- LOG.debug("Loading POJO device configuration {}", provider.getName());
- try {
- final MqttBroker broker = provider.getBroker();
- broker.connect();
- for (final Service service : provider.getServices()) {
- for (final Resource resource : service.getResources()) {
- MqttTopicMessage listener = new MqttTopicMessage() {
- @Override
- public void messageReceived(String s, String s1) {
- try {
- String value = processorExecutor.execute(s1, ProcessorUtil.transformProcessorListInSelector(resource.getProcessor() == null ? "" : resource.getProcessor()));
- MqttPacket packet = new MqttPacket(provider.getName(), service.getName(), resource.getName(), value);
- endpoint.process(packet);
- } catch (Exception e) {
- LOG.error("Failed to process MQTT package", e);
+ synchronized (locker){
+ LOG.info("Loading POJO device configuration");
+ final Provider provider = (Provider) bundleContext.getService(serviceReference);
+ LOG.debug("Loading POJO device configuration {}", provider.getName());
+ try {
+ final MqttBroker broker = provider.getBroker();
+ broker.connect();
+ for (final Service service : provider.getServices()) {
+ for (final Resource resource : service.getResources()) {
+ MqttTopicMessage listener = new MqttTopicMessage() {
+ @Override
+ public void messageReceived(String s, String s1) {
+ try {
+ String value = processorExecutor.execute(s1, ProcessorUtil.transformProcessorListInSelector(resource.getProcessor() == null ? "" : resource.getProcessor()));
+ MqttPacket packet = new MqttPacket(provider.getName(), service.getName(), resource.getName(), value);
+ endpoint.process(packet);
+ } catch (Exception e) {
+ LOG.error("Failed to process MQTT package", e);
+ }
}
+ };
+ LOG.info("Subscribing to topic: {}", resource.getTopic());
+ if (resource.getTopic() != null) {
+ MqttTopic topic = new MqttTopic(resource.getTopic(), listener);
+ broker.subscribeToTopic(topic);
+ } else {
+ LOG.warn("Failed to register device {}, topic assigned cannot be null", provider.getName());
}
- };
- LOG.info("Subscribing to topic: {}", resource.getTopic());
- if (resource.getTopic() != null) {
- MqttTopic topic = new MqttTopic(resource.getTopic(), listener);
- broker.subscribeToTopic(topic);
- } else {
- LOG.warn("Failed to register device {}, topic assigned cannot be null", provider.getName());
+ if (!provider.isDiscoveryOnFirstMessage()) {
+ LOG.info("Initiating {}/{}/{} with empty value", provider.getName(), service.getName(), resource.getName());
+ MqttPacket packet = new MqttPacket(provider.getName(), service.getName(), resource.getName(), resource.getValue() == null ? "" : resource.getValue());
+ packet.setHelloMessage(true);
+ endpoint.process(packet);
+ //runtime.updateValue(provider.getName(), service.getName(), resource.getName(), "");
+ } else {
+ LOG.warn("Device {}/{}/{} is hidden until the first message is received", provider.getName(), service.getName(), resource.getName());
+ }
+ LOG.info("Subscribed {}/{}/{} to the topic {}", provider.getName(), service.getName(), resource.getName(), resource.getTopic());
}
- if (!provider.isDiscoveryOnFirstMessage()) {
- LOG.info("Initiating {}/{}/{} with empty value", provider.getName(), service.getName(), resource.getName());
- MqttPacket packet = new MqttPacket(provider.getName(), service.getName(), resource.getName(), resource.getValue() == null ? "" : resource.getValue());
- packet.setHelloMessage(true);
- endpoint.process(packet);
- //runtime.updateValue(provider.getName(), service.getName(), resource.getName(), "");
- } else {
- LOG.warn("Device {}/{}/{} is hidden until the first message is received", provider.getName(), service.getName(), resource.getName());
- }
- LOG.info("Subscribed {}/{}/{} to the topic {}", provider.getName(), service.getName(), resource.getName(), resource.getTopic());
}
+ /*
+ if(!provider.isDiscoveryOnFirstMessage()){
+ runtime.updateValue(provider.getName(), null,null,null);
+ }else {
+ LOG.info("Device {} will appear as soon as one of the topic associated received the first message",provider.getName());
+ }*/
+ LOG.info("sensiNact Device created with the id {}", provider.getName());
+ } catch (Exception e) {
+ LOG.warn("Failed to create device {}, ignoring device", provider.getName(), e);
}
- /*
- if(!provider.isDiscoveryOnFirstMessage()){
- runtime.updateValue(provider.getName(), null,null,null);
- }else {
- LOG.info("Device {} will appear as soon as one of the topic associated received the first message",provider.getName());
- }*/
- LOG.info("sensiNact Device created with the id {}", provider.getName());
- } catch (Exception e) {
- LOG.warn("Failed to create device {}, ignoring device", provider.getName(), e);
+ return bundleContext.getService(serviceReference);
+
}
- return bundleContext.getService(serviceReference);
}
@Override
@@ -133,19 +137,21 @@
@Override
public void removedService(ServiceReference serviceReference, Object o) {
- LOG.info("Detaching devices MQTT Bus service");
- try {
- Provider provider = (Provider) o;
- MqttPacket packet = new MqttPacket(provider.getName());
- packet.setGoodbyeMessage(true);
- endpoint.process(packet);
- LOG.info("sensiNact device {} removed", provider.getName());
- LOG.info("Detaching devices {} MQTT Bus service", provider.getName());
- provider.getBroker().disconnect();
- } catch (InvalidPacketException e) {
- LOG.error("Failed to read internal package", e);
- } catch (MqttException e) {
- LOG.error("Failed to disconnect", e);
+ synchronized (locker){
+ LOG.info("Detaching devices MQTT Bus service");
+ try {
+ Provider provider = (Provider) o;
+ MqttPacket packet = new MqttPacket(provider.getName());
+ packet.setGoodbyeMessage(true);
+ endpoint.process(packet);
+ LOG.info("sensiNact device {} removed", provider.getName());
+ LOG.info("Detaching devices {} MQTT Bus service", provider.getName());
+ provider.getBroker().disconnect();
+ } catch (InvalidPacketException e) {
+ LOG.error("Failed to read internal package", e);
+ } catch (MqttException e) {
+ LOG.error("Failed to disconnect", e);
+ }
}
}
}