blob: d31019b054f29bbd8e2213f1c971b6baf912d3ce [file] [log] [blame]
package org.eclipse.basyx.extensions.shared.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of common parts of MQTT event propagation services.
* Extend this class to make a service MQTT extendable
*
* @author haque
*
*/
public class MqttEventService {
private static Logger logger = LoggerFactory.getLogger(MqttEventService.class);
// The MQTTClient
protected MqttClient mqttClient;
// QoS for MQTT messages (1, 2 or 3).
protected int qos = 1;
/**
* Constructor for creating an MqttClient (no authentication)
* @param serverEndpoint
* @param clientId
* @throws MqttException
*/
public MqttEventService(String serverEndpoint, String clientId) throws MqttException {
this.mqttClient = new MqttClient(serverEndpoint, clientId, new MqttDefaultFilePersistence());
mqttClient.connect();
}
/**
* Constructor for creating an MqttClient with authentication
* @param serverEndpoint
* @param clientId
* @param user
* @param pw
* @throws MqttException
*/
public MqttEventService(String serverEndpoint, String clientId, String user, char[] pw)
throws MqttException {
this.mqttClient = new MqttClient(serverEndpoint, clientId, new MqttDefaultFilePersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(user);
options.setPassword(pw);
mqttClient.connect(options);
}
/**
* Constructor for creating an MqttClient with existing client
* @param client
* @throws MqttException
*/
public MqttEventService(MqttClient client) throws MqttException {
this.mqttClient = client;
mqttClient.connect();
}
/**
* Sets the QoS for MQTT messages
*
* @param qos
*/
public void setQoS(int qos) {
if (qos >= 0 && qos <= 3) {
this.qos = qos;
} else {
throw new IllegalArgumentException("Invalid QoS: " + qos);
}
}
/**
* Gets the QoS for MQTT messages
*
* @param qos
*/
public int getQoS() {
return this.qos;
}
/**
* Sends MQTT message to connected broker
* @param topic in which the message will be published
* @param payload the actual message
*/
protected void sendMqttMessage(String topic, String payload) {
MqttMessage msg = new MqttMessage(payload.getBytes());
if (this.qos != 1) {
msg.setQos(this.qos);
}
try {
logger.debug("Send MQTT message to " + topic + ": " + payload);
mqttClient.publish(topic, msg);
} catch (MqttPersistenceException e) {
logger.error("Could not persist mqtt message", e);
} catch (MqttException e) {
logger.error("Could not send mqtt message", e);
}
}
}