blob: 95ba01ceab754eb579d37ece76c6653672903a82 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2013-2016 LAAS-CNRS (www.laas.fr)
* 7 Colonel Roche 31077 Toulouse - France
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Initial Contributors:
* Thierry Monteil : Project manager, technical co-manager
* Mahdi Ben Alaya : Technical co-manager
* Samir Medjiah : Technical co-manager
* Khalil Drira : Strategy expert
* Guillaume Garzone : Developer
* François Aïssaoui : Developer
*
* New contributors :
*******************************************************************************/
package org.eclipse.om2m.binding.mqtt;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
import org.eclipse.om2m.binding.mqtt.util.QueueSender;
import org.eclipse.om2m.binding.mqtt.util.ResponseRegistry;
import org.eclipse.om2m.binding.mqtt.util.ResponseSemaphore;
import org.eclipse.om2m.binding.service.RestClientService;
import org.eclipse.om2m.commons.constants.Constants;
import org.eclipse.om2m.commons.constants.MimeMediaType;
import org.eclipse.om2m.commons.constants.ResponseStatusCode;
import org.eclipse.om2m.commons.resource.PrimitiveContent;
import org.eclipse.om2m.commons.resource.RequestPrimitive;
import org.eclipse.om2m.commons.resource.ResponsePrimitive;
import org.eclipse.om2m.datamapping.service.DataMapperService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttRestClient implements RestClientService {
private static final Log LOGGER = LogFactory.getLog(MqttRestClient.class);
@Override
public ResponsePrimitive sendRequest(RequestPrimitive requestPrimitive) {
if(requestPrimitive.getContent() != null){
PrimitiveContent pc = new PrimitiveContent();
switch(requestPrimitive.getRequestContentType()){
case MimeMediaType.XML:
pc.getAny().add(DataMapperRegistry.get(MimeMediaType.XML).stringToObj((String)requestPrimitive.getContent()));
break;
case MimeMediaType.JSON:
pc.getAny().add(DataMapperRegistry.get(MimeMediaType.JSON).stringToObj((String)requestPrimitive.getContent()));
break;
case MimeMediaType.OBJ: case MimeMediaType.TEXT_PLAIN:
pc.getAny().add(requestPrimitive.getContent());
break;
default:
break;
}
if(!pc.getAny().isEmpty()){
requestPrimitive.setPrimitiveContent(pc);
}
}
ResponsePrimitive responsePrimitive = new ResponsePrimitive(requestPrimitive);
if(requestPrimitive.getMqttTopic() == null || requestPrimitive.getMqttUri() == null){
responsePrimitive.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);
return responsePrimitive;
}
if(requestPrimitive.getRequestIdentifier() == null){
requestPrimitive.setRequestIdentifier(UUID.randomUUID().toString());
}
String uri = requestPrimitive.getMqttUri();
if(uri.startsWith("mqtt://")){
uri = uri.replaceFirst("mqtt://", "tcp://");
}
if(requestPrimitive.getTo().startsWith("mqtt://")){
Pattern mqttUriPatter = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
Matcher matcher = mqttUriPatter.matcher(requestPrimitive.getTo());
if(matcher.matches()){
requestPrimitive.setTo(matcher.group(3));
}
}
String topic = requestPrimitive.getMqttTopic();
String payload = null;
String format = null;
if (topic.endsWith("/json")){
payload = DataMapperRegistry.get(MimeMediaType.JSON).objToString(requestPrimitive);
format = "json";
} else {
// Case of XML and default
payload = DataMapperRegistry.get(MimeMediaType.XML).objToString(requestPrimitive);
format = "xml";
}
try {
MqttClient mqttClient = new MqttClient(uri, requestPrimitive.getRequestIdentifier(), new MemoryPersistence());
mqttClient.connect();
LOGGER.debug("Sending request on topic: " + topic + " with payload:\n" + payload);
ResponseSemaphore responseSemaphore = null;
if(requestPrimitive.isMqttResponseExpected()){
Matcher matcher = MqttConstants.REQUEST_PATTERN_OUT.matcher(topic);
if(matcher.matches()){
String responseTopic = "/oneM2M/resp/" + matcher.group(1) + "/"+ Constants.CSE_ID + "/" + format;
responseSemaphore = ResponseRegistry.createSemaphore(requestPrimitive.getRequestIdentifier(), mqttClient, responseTopic);
} else {
responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
}
} else {
mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
responsePrimitive.setResponseStatusCode(ResponseStatusCode.OK);
}
if(responseSemaphore != null){
QueueSender.queue(mqttClient, topic, payload.getBytes());
LOGGER.debug("Waiting for response... (" + MqttConstants.TIME_OUT_DURATION + "s)");
boolean released = responseSemaphore.getSemaphore().tryAcquire(1, MqttConstants.TIME_OUT_DURATION, TimeUnit.SECONDS);
if(released){
responsePrimitive = responseSemaphore.getResponsePrimitive();
fillAndConvertContent(requestPrimitive, responsePrimitive);
LOGGER.debug("Response received: " + responsePrimitive);
} else {
responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
}
}
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
LOGGER.warn("Cannot connect to: " + requestPrimitive.getMqttUri());
responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
return responsePrimitive;
} catch (InterruptedException e) {
LOGGER.error("Interrupted exception caught in MqttRestClient: " + e.getMessage());
responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
return responsePrimitive;
}
return responsePrimitive;
}
private void fillAndConvertContent(RequestPrimitive requestPrimitive,
ResponsePrimitive responsePrimitive) {
if(responsePrimitive.getPrimitiveContent() != null &&
!responsePrimitive.getPrimitiveContent().getAny().isEmpty() &&
responsePrimitive.getContent() == null){
if(requestPrimitive.getReturnContentType().equals(MimeMediaType.OBJ)){
responsePrimitive.setContent(responsePrimitive.getPrimitiveContent().getAny().get(0));
} else {
DataMapperService dms = DataMapperRegistry.get(requestPrimitive.getReturnContentType());
String content = dms.objToString(responsePrimitive.getPrimitiveContent().getAny().get(0));
responsePrimitive.setContent(content);
responsePrimitive.setContentType(requestPrimitive.getReturnContentType());
}
}
}
@Override
public String getProtocol() {
return MqttConstants.PROTOCOL;
}
}