blob: 3edbc6b5bf9efec3eb89db0183f0212b82d76254 [file] [log] [blame]
using System;
using System.Text;
using BaSys40.Utils.ResultHandling;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using System.Threading;
using NLog;
namespace oneM2MClient.Client.Bindings
{
public class MqttBinding : IClient
{
public const string requestTopic = "/oneM2M/req";
public const string responseTopic = "/oneM2M/resp";
private bool subscribedFlag = false;
public ClientFactory.Protocol Protocol => ClientFactory.Protocol.Mqtt;
public string PayLoad { get; set; }
private ManualResetEvent manualResetEvent;
private ManualResetEvent connectionClosed;
private MqttResponse response;
private string tempRqi;
private MqttClient mqttClient;
private string clientId;
private MqttBindingConfig mqttConfig;
private static Logger logger = LogManager.GetCurrentClassLogger();
public MqttBinding(string clientId, BindingConfig mqttConfig)
{
this.clientId = clientId;
if (mqttConfig != null)
this.mqttConfig = mqttConfig as MqttBindingConfig;
else
this.mqttConfig = new MqttBindingConfig("MqttProviderDefault");
mqttClient = new MqttClient(this.mqttConfig.BrokerIPAddress, this.mqttConfig.BrokerPort, this.mqttConfig.SecureConnection, null, null, MqttSslProtocols.None, null, null);
}
public bool IsAlive
{
get
{
if (mqttClient != null)
return mqttClient.IsConnected;
else
return false;
}
}
public Result<Response> Send(Request request)
{
if (request == null && request.RequestPrimitive != null)
return null;
tempRqi = request.GetHashCode().ToString();
request.RequestIdentifier(tempRqi);
MqttRequest requestHelper = new MqttRequest(request);
string sMessage = requestHelper.MessageBody.ToString();
byte[] bMessage = Encoding.UTF8.GetBytes(sMessage);
mqttClient.MqttMsgPublished -= MqttClient_MqttMsgPublished;
mqttClient.MqttMsgPublished += MqttClient_MqttMsgPublished;
ushort msgIdPub = mqttClient.Publish(requestHelper.RequestTopic, bMessage, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
if (!subscribedFlag)
{
mqttClient.MqttMsgSubscribed += MqttClient_MqttMsgSubscribed;
string subTopic = string.Join("/", responseTopic, requestHelper.RequestPrimitive.Fr, requestHelper.CSEName, requestHelper.ContentMIME);
ushort msgIdSub = mqttClient.Subscribe(new string[] { subTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
mqttClient.MqttMsgPublishReceived += MqttClient_MqttMsgPublishReceived;
subscribedFlag = true;
}
manualResetEvent = new ManualResetEvent(false);
bool success = manualResetEvent.WaitOne(2000);
if(success && response != null && response.RequestIdentifier == tempRqi)
{
var clonedResponse = response.Clone();
response = null;
bool error = !string.IsNullOrEmpty(clonedResponse.ErrorMessage) ? true : false;
if(error)
return new Result<Response>(false, clonedResponse, new Message(MessageType.Error, clonedResponse.ErrorMessage));
else
return new Result<Response>(true, clonedResponse);
}
else if (response != null && response.RequestIdentifier != tempRqi)
{
var clonedResponse = response.Clone();
response = null;
return new Result<Response>(false, clonedResponse, new Message(MessageType.Error, "Response-Id: '" + clonedResponse.RequestIdentifier+"' does not match Request-Id: '"+ tempRqi + "'"));
}
else
{
return new Result<Response>(false, new Message(MessageType.Error, "Error getting response from host, response is null"));
}
}
private void MqttClient_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
Console.Out.WriteLine("Received = " + Encoding.UTF8.GetString(e.Message) + " on topic " + e.Topic);
response = new MqttResponse(e);
if(response.RequestIdentifier == tempRqi)
manualResetEvent.Set();
}
private void MqttClient_MqttMsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
{
Console.Out.WriteLine("Subscribed for id = " + e.MessageId);
}
private void MqttClient_MqttMsgPublished(object sender, MqttMsgPublishedEventArgs e)
{
Console.Out.WriteLine("MessageId = " + e.MessageId + " Published = " + e.IsPublished);
}
public void Start()
{
clientId = clientId ?? Guid.NewGuid().ToString();
try
{
byte success = mqttClient.Connect(clientId);
if (success != 0)
throw new Exception("Could not connect to MQTT-Broker");
}
catch (Exception e)
{
logger.Error(e, "Could not connect MQTT-Broker");
}
}
public void Stop()
{
if (mqttClient != null)
{
if (mqttClient.IsConnected)
{
connectionClosed = new ManualResetEvent(false);
mqttClient.ConnectionClosed += MqttClient_ConnectionClosed;
mqttClient.Disconnect();
bool success = connectionClosed.WaitOne(1000);
if (!success)
logger.Error("Could not close MQTT-Client");
}
mqttClient = null;
}
}
private void MqttClient_ConnectionClosed(object sender, EventArgs e)
{
connectionClosed.Set();
}
}
}