blob: 764dc5c9c3a309ec731a0c7f4a8c55e286b9098a [file] [log] [blame]
/*
* Copyright (c) 2020 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.listener.MqttConnectionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
public class MqttBroker {
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 1883;
private static final Protocol DEFAULT_PROTOCOL = Protocol.TCP;
private static final Logger LOG = LoggerFactory.getLogger(MqttBroker.class);
public enum Protocol {
TCP, SSL;
@Override
public String toString() {
return name().toLowerCase();
}
}
private final String clientId;
private String host;
private int port;
private Protocol protocol;
private MqttSession session;
private MqttAuthentication authentication;
private List<MqttTopic> topics;
private MqttConnectionHandler handler;
private MqttClient client;
private MqttBroker() {
this.clientId = UUID.randomUUID().toString();
}
public String getClientId() {
return clientId;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public Protocol getProtocol() {
return protocol;
}
public MqttSession getSession() {
return session;
}
public MqttAuthentication getAuthentication() {
return authentication;
}
public void publish(String topic, String message){
try {
LOG.info("Publishing message {} on the topic {}", message,topic);
MqttMessage mqMessage = new MqttMessage(message.getBytes());
mqMessage.setQos(0);
client.publish(topic, mqMessage);
} catch (Exception e) {
LOG.error("Unable to publishing message {} on the topic {}", message,topic);
}
}
public void subscribeToTopic(MqttTopic topic) {
synchronized(this.topics) {
if(this.topics.indexOf(topic) < 0) {
topics.add(topic);
LOG.info("Subscription to the topic {} added to the list", topic.getTopic());
try {
if(client == null || !client.isConnected()) {
this.connect();
}
client.subscribe(topic.getTopic(), topic.getListener());
LOG.info("Subscription to the topic {} done", topic.getTopic());
} catch (MqttException e) {
LOG.error("Error when subscribing to the topic {}", topic.getTopic(),e);
}
}
}
}
public void unsubscribeFromTopic(MqttTopic topic) {
try {
client.unsubscribe(topic.getTopic());
LOG.info("Unsubscription to the topic {} done", topic.getTopic());
synchronized(this.topics) {
topics.remove(topic);
if(topics.isEmpty()) {
this.disconnect();
}
}
} catch (MqttException e) {
LOG.error("Error when unsubscribing from the topic {}", topic.getTopic(), e);
}
}
public List<MqttTopic> getTopics() {
return topics;
}
public MqttConnectionHandler getConnectionHandler() {
return handler;
}
public void connect() throws MqttException {
if (client!=null && client.isConnected()) {
client.disconnect();
}
client = null;
final String brokerUrl = String.format("%s://%s:%d", protocol, host, port);
client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
if (handler == null) {
LOG.info("Custom Connection Handler not defined, using default reconnection for {}", brokerUrl);
this.handler=new MqttConnectionHandlerImpl(this);
}
client.setCallback(handler);
doConnect();
}
public void doConnect() throws MqttException {
if(client == null) {
throw new MqttException(new NullPointerException("Valid client is required"));
}
final String brokerUrl = String.format("%s://%s:%d", protocol, host, port);
MqttConnectOptions connectOptions = new MqttConnectOptions();
if (session != null) {
if (session.getCleanSession() != null) {
connectOptions.setCleanSession(session.getCleanSession());
}
if (session.getAutoReconnect() != null) {
connectOptions.setAutomaticReconnect(session.getAutoReconnect());
}
if (session.getMaxInFlight() != null) {
connectOptions.setMaxInflight(session.getMaxInFlight());
}
}
if (authentication != null) {
if (authentication.getUsername() != null) {
connectOptions.setUserName(authentication.getUsername());
}
if (authentication.getPassword() != null) {
connectOptions.setPassword(authentication.getPassword().toCharArray());
}
if (authentication.getSslProperties() != null) {
connectOptions.setSSLProperties(authentication.getSslProperties());
}
}
LOG.info("Connecting to broker: {}", brokerUrl);
try {
if (!client.isConnected()) {
client.connect(connectOptions);
handler.connectionEstablished(this);
} else {
LOG.error("Already connected to the MQTT broker: {}", brokerUrl);
}
} catch (MqttException e) {
handler.connectionFailed(this);
LOG.error("Failed to connect to MQTT broker", e);
}
LOG.info("Connected to broker: {}:{}", this.host, this.port);
}
public void disconnect() throws MqttException {
synchronized(this.topics) {
if(!topics.isEmpty()) {
for (Iterator<MqttTopic> it = topics.iterator(); it.hasNext();) {
MqttTopic topic=null;
try {
topic=(MqttTopic) it.next();
LOG.info("Unsubscribing from topic {} done", topic.getTopic());
client.unsubscribe(topic.getTopic());
it.remove();
LOG.info("Unsubscription to the topic {} done", topic.getTopic());
} catch (MqttException e) {
LOG.error("Unable to unsubscribe from the topic {}", topic.getTopic());
}
}
}
}
if (client!=null && client.isConnected()) {
client.disconnect();
LOG.info("Disconnected from MQTT broker: {}", this.host);
}
client = null;
}
public MqttClient getClient() {
return client;
}
public void setSession(MqttSession session) {
this.session = session;
}
public void setHost(String host) {
this.host = host;
}
public void setPort(int port) {
this.port = port;
}
public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}
public void setAuthentication(MqttAuthentication authentication) {
this.authentication = authentication;
}
public void setHandler(MqttConnectionHandler handler) {
this.handler = handler;
}
/**
* The builder abstraction.
*/
public static class Builder {
private String host = DEFAULT_HOST;
private int port = DEFAULT_PORT;
private Protocol protocol = DEFAULT_PROTOCOL;
private MqttSession session;
private MqttAuthentication authentication;
private List<MqttTopic> topics;
private MqttConnectionHandler handler;
public Builder() {
this.session = (new MqttSession.Builder()).build();
this.authentication = (new MqttAuthentication.Builder()).build();
this.topics = Collections.synchronizedList(new ArrayList<MqttTopic>());
}
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder protocol(Protocol protocol) {
this.protocol = protocol;
return this;
}
public Builder session(MqttSession session) {
this.session = session;
return this;
}
public Builder authentication(MqttAuthentication authentication) {
this.authentication = authentication;
return this;
}
public Builder topics(List<MqttTopic> topics) {
this.topics = topics;
return this;
}
public Builder handler(MqttConnectionHandler handler) {
this.handler = handler;
return this;
}
public MqttBroker build() {
MqttBroker broker=new MqttBroker();
broker.handler=handler;
broker.topics=topics;
broker.authentication=authentication;
broker.host=this.host;
broker.port=this.port;
broker.protocol=this.protocol;
broker.session=this.session;
broker.authentication=this.authentication;
broker.topics=this.topics;
broker.handler=this.handler;
return broker;
}
}
private class MqttConnectionHandlerImpl extends MqttConnectionHandler {
private Timer timer = new Timer();
public MqttConnectionHandlerImpl(MqttBroker broker) {
super(broker);
}
@Override
public void connectionFailed(final MqttBroker broker) {
this.timer.schedule(new TimerTask() {
@Override
public void run() {
try {
broker.doConnect();
} catch (MqttException e) {
LOG.error("Connection Failed with {}://{}:{}",broker.getProtocol().name(),broker.getHost(),broker.getPort());
}
}
}, 5000);
}
@Override
public void connectionEstablished(MqttBroker broker) {
LOG.debug("Connection with {}://{}:{} established",broker.getProtocol().name(),broker.getHost(),broker.getPort());
for(MqttTopic topic:topics){
try {
LOG.info("Subscription to the topic {} done", topic.getTopic());
broker.getClient().subscribe(topic.getTopic(),topic.getListener());
} catch (Exception e) {
LOG.error("Unable to subscribe to the topic {}", topic.getTopic());
}
}
}
@Override
public void connectionLost(MqttBroker broker) {
try {
broker.connect();
} catch (MqttException e) {
LOG.debug("Connection Lost with {}://{}:{}",broker.getProtocol().name(),broker.getHost(),broker.getPort());
}
}
}
}