Implementation of MQTT Protocol binding TS-0010

This implements features from the MQTT protocol binding TS-0010 from oneM2M.
It does NOT include the MQTT Broker.
diff --git a/org.eclipse.om2m.binding.mqtt/.classpath b/org.eclipse.om2m.binding.mqtt/.classpath
new file mode 100644
index 0000000..f7edce6
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/.classpath
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
+	<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
+	<classpathentry exported="true" kind="lib" path="libs/org.eclipse.paho.client.mqttv3-1.0.2.jar"/>
+	<classpathentry kind="src" path="src/main/java/"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/org.eclipse.om2m.binding.mqtt/.project b/org.eclipse.om2m.binding.mqtt/.project
new file mode 100644
index 0000000..fed26a3
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/.project
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>org.eclipse.om2m.binding.mqtt</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.eclipse.pde.ManifestBuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.eclipse.pde.SchemaBuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.eclipse.m2e.core.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.m2e.core.maven2Nature</nature>
+		<nature>org.eclipse.pde.PluginNature</nature>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>
diff --git a/org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF b/org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF
new file mode 100644
index 0000000..a09741c
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/META-INF/MANIFEST.MF
@@ -0,0 +1,18 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: MQTT Binding
+Bundle-SymbolicName: org.eclipse.om2m.binding.mqtt
+Bundle-Version: 1.0.0.qualifier
+Bundle-Activator: org.eclipse.om2m.binding.mqtt.Activator
+Bundle-RequiredExecutionEnvironment: JavaSE-1.7
+Import-Package: org.apache.commons.logging,
+ org.eclipse.om2m.binding.service,
+ org.eclipse.om2m.commons.constants,
+ org.eclipse.om2m.commons.resource,
+ org.eclipse.om2m.core.service,
+ org.eclipse.om2m.datamapping.service,
+ org.osgi.framework;version="1.3.0",
+ org.osgi.util.tracker;version="1.5.1"
+Bundle-ActivationPolicy: lazy
+Bundle-ClassPath: libs/org.eclipse.paho.client.mqttv3-1.0.2.jar,
+ .
diff --git a/org.eclipse.om2m.binding.mqtt/README b/org.eclipse.om2m.binding.mqtt/README
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/README
diff --git a/org.eclipse.om2m.binding.mqtt/build.properties b/org.eclipse.om2m.binding.mqtt/build.properties
new file mode 100644
index 0000000..65cee24
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/build.properties
@@ -0,0 +1,4 @@
+source.. = src/main/java/
+bin.includes = META-INF/,\
+               .,\
+               libs/org.eclipse.paho.client.mqttv3-1.0.2.jar
diff --git a/org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jar b/org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jar
new file mode 100644
index 0000000..9a88162
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/libs/org.eclipse.paho.client.mqttv3-1.0.2.jar
Binary files differ
diff --git a/org.eclipse.om2m.binding.mqtt/pom.xml b/org.eclipse.om2m.binding.mqtt/pom.xml
new file mode 100644
index 0000000..254787e
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/pom.xml
@@ -0,0 +1,11 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>org.eclipse.om2m.binding.mqtt</artifactId>
+	<packaging>eclipse-plugin</packaging>
+	<parent>
+		<groupId>org.eclipse.om2m</groupId>
+		<artifactId>org.eclipse.om2m</artifactId>
+		<version>1.0.0-SNAPSHOT</version>
+	</parent>
+</project>
\ No newline at end of file
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java
new file mode 100644
index 0000000..0a74e78
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/Activator.java
@@ -0,0 +1,151 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.core.service.CseService;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+public class Activator implements BundleActivator {
+
+	private static BundleContext context;
+
+	static BundleContext getContext() {
+		return context;
+	}
+
+	private static final Log LOGGER = LogFactory.getLog(Activator.class);
+
+	/** {@link DataMapperServiceTracker} reference */
+	private static ServiceTracker<DataMapperService, DataMapperService> dataMapperServiceTracker;
+	/** {@link CseService} reference */
+	private static ServiceTracker<CseService, CseService> cseServiceTracker;
+
+	/** MQTT Request Handler that connects to the MQTT Broker */
+	private static MqttRequestHandler mqttRequestHandler;
+
+	public void start(BundleContext bundleContext) throws Exception {
+		Activator.context = bundleContext;
+		
+		// Listening on Cse Service
+		cseServiceTracker = new ServiceTracker<CseService, CseService>(
+				bundleContext, CseService.class,
+				new CseServiceTrackerCustomizer());
+		cseServiceTracker.open();
+
+		// Listening on DataMapper Service
+		dataMapperServiceTracker = new ServiceTracker<DataMapperService, DataMapperService>(
+				bundleContext, DataMapperService.class,
+				new DataMapperServiceTracker());
+		dataMapperServiceTracker.open();
+		
+		// Registering RestClientService of MQTT
+		getContext().registerService(RestClientService.class, new MqttRestClient(), null);
+	}
+
+	public void stop(BundleContext bundleContext) throws Exception {
+		Activator.context = null;
+		if (cseServiceTracker != null) {
+			cseServiceTracker.close();
+			cseServiceTracker = null;
+		}
+		if (dataMapperServiceTracker != null) {
+			dataMapperServiceTracker.close();
+			dataMapperServiceTracker = null;
+		}
+		if (mqttRequestHandler != null){
+			mqttRequestHandler.close();
+			mqttRequestHandler = null;
+		}
+	}
+
+	private static class CseServiceTrackerCustomizer implements
+			ServiceTrackerCustomizer<CseService, CseService> {
+
+		@Override
+		public CseService addingService(ServiceReference<CseService> reference) {
+			if (reference == null) {
+				return null;
+			}
+			Object service = Activator.getContext().getService(reference);
+			if (service != null && service instanceof CseService) {
+				LOGGER.debug("New CseService discovered");
+				CseService cse = (CseService) service;
+				MqttRequestHandler.setCseService(cse);
+				if (mqttRequestHandler == null) {
+					new Thread() {
+						public void run() {
+							LOGGER.info("Creating MQTT Request Handler");
+							mqttRequestHandler = new MqttRequestHandler();
+						};
+					}.start();
+				}
+				return cse;
+			}
+			return null;
+		}
+
+		@Override
+		public void modifiedService(ServiceReference<CseService> reference,
+				CseService service) {
+			if (service != null) {
+				LOGGER.info("CseService modified");
+				MqttRequestHandler.setCseService(service);
+			}
+		}
+
+		@Override
+		public void removedService(ServiceReference<CseService> reference,
+				CseService service) {
+			MqttRequestHandler.setCseService(null);
+		}
+
+	}
+
+	private static class DataMapperServiceTracker implements
+			ServiceTrackerCustomizer<DataMapperService, DataMapperService> {
+
+		@Override
+		public DataMapperService addingService(
+				ServiceReference<DataMapperService> reference) {
+			if (reference == null) {
+				return null;
+			}
+			Object service = Activator.getContext().getService(reference);
+			if (service != null && service instanceof DataMapperService) {
+				DataMapperService dms = (DataMapperService) service;
+				LOGGER.debug("New DataMapper Service discovered: "
+						+ dms.getServiceDataType());
+				DataMapperRegistry.register(dms);
+				return dms;
+			}
+			return null;
+		}
+
+		@Override
+		public void modifiedService(
+				ServiceReference<DataMapperService> reference,
+				DataMapperService service) {
+			if (service != null) {
+				DataMapperRegistry.register(service);
+			}
+		}
+
+		@Override
+		public void removedService(
+				ServiceReference<DataMapperService> reference,
+				DataMapperService service) {
+			if (service != null) {
+				DataMapperRegistry.remove(service);
+			}
+		}
+
+	}
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java
new file mode 100644
index 0000000..1105a89
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRequestHandler.java
@@ -0,0 +1,255 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.math.BigInteger;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.core.service.CseService;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+/**
+ * MQTT Request Handler class that subscribe to oneM2M request topic.
+ * When a request is received in the request topic, it is de-serialized and send
+ * to the CseService implementation available. Then the response from the service
+ * is serialized and sent to the oneM2M response topic.
+ */
+public class MqttRequestHandler implements MqttCallback {
+
+	// Static attributes of the class 
+	
+	/** MQTT Client ID */
+	private static final String CLIENT_ID = Constants.CSE_ID;
+	/** Logger reference */
+	private static final Log LOGGER = LogFactory.getLog(MqttRequestHandler.class);
+	/** MQTT Request Topic */
+	private static final String REQUEST_TOPIC = "/oneM2M/req/+/" + Constants.CSE_ID + "/+";
+	
+
+	/** Reference to the current cseService implementation*/
+	private static CseService cseService;
+
+	/**
+	 * Set the current CseService used when a request is 
+	 * received on the oneM2M request topic.
+	 * @param cse the CseService implementation to use
+	 */
+	public static void setCseService(CseService cse) {
+		cseService = cse;
+	}
+
+	// Private attributes
+	
+	/** MQTT Client from the Paho library */
+	private MqttClient mainMqttClient;
+
+	/** The MQTT connection options to use */
+	private MqttConnectOptions connOpts;
+	
+	/** Tell the thread to keep retrying or not */
+	private boolean retry = true;
+	/** Connection retry thread */
+	private Thread retryThread;
+	
+	/**
+	 * Default constructor of the Request Handler
+	 */
+	public MqttRequestHandler() {
+		MemoryPersistence persistence = new MemoryPersistence();
+		String url = "tcp://" + MqttConstants.MQTT_BROKER_HOSTNAME + ":"
+				+ MqttConstants.MQTT_BROKER_PORT;
+		this.connOpts = new MqttConnectOptions();
+		connOpts.setCleanSession(true);
+		if(MqttConstants.MQTT_BROKER_USERNAME != null && MqttConstants.MQTT_BROKER_PASSWORD != null){
+			connOpts.setUserName(MqttConstants.MQTT_BROKER_USERNAME);
+			connOpts.setPassword(MqttConstants.MQTT_BROKER_PASSWORD.toCharArray());
+		}
+		try {
+			LOGGER.debug("Connecting MQTT client to: " + url);
+			this.mainMqttClient = new MqttClient(url, CLIENT_ID, persistence);
+			this.mainMqttClient.setCallback(MqttRequestHandler.this);
+			this.connect(this.connOpts);
+		} catch (MqttException e) {
+			LOGGER.error("Error in MQTT Client creation", e);
+		}
+	}
+	
+	/**
+	 * Connect and retry if the connection fails
+	 * @param connOpts
+	 */
+	private void connect(final MqttConnectOptions connOpts){
+		if(retry && retryThread == null){
+			retryThread = new Thread("mqtt-connection-retrier"){
+				public void run() {
+					while(retry && !MqttRequestHandler.this.mainMqttClient.isConnected()){
+						try {
+							MqttRequestHandler.this.mainMqttClient.connect(connOpts);
+							
+							LOGGER.info("Subscribing on MQTT topic: " + REQUEST_TOPIC);
+							MqttRequestHandler.this.mainMqttClient.subscribe(REQUEST_TOPIC, 2);
+						} catch (MqttException e) {
+							LOGGER.warn("Cannot connect to MQTT Broker, retrying in 10s. Cause: " + e.getMessage());
+						}
+						if(!MqttRequestHandler.this.mainMqttClient.isConnected()){
+							try {
+								Thread.sleep(10000);
+							} catch (InterruptedException e) {
+								// Ignore
+							}
+						}
+					}
+					MqttRequestHandler.this.retryThread = null;
+				};
+			};
+		}
+		retryThread.start();
+	}
+
+	@Override
+	public void connectionLost(Throwable throwable) {
+		LOGGER.warn("Connection lost on MQTT Broker at " + MqttConstants.MQTT_BROKER_HOSTNAME + ":" + MqttConstants.MQTT_BROKER_PORT);
+		this.connect(this.connOpts);
+	}
+
+	@Override
+	public void deliveryComplete(IMqttDeliveryToken token) {
+		// Empty
+	}
+
+	@Override
+	public void messageArrived(String topic, MqttMessage message)
+			throws Exception {
+		Matcher matcher = MqttConstants.REQUEST_PATTERN_IN.matcher(topic);
+		if (matcher.matches()) {
+			String aeId = matcher.group(1);
+			String format = matcher.group(2);
+			String responseTopic = "/oneM2M/resp/" + Constants.CSE_ID + "/" + aeId + "/" + format;
+			
+			if (message.getPayload() == null) {
+				LOGGER.info("Null message received on " + topic);
+				sendErrorResponse("The message is null", responseTopic, aeId, format);
+				return;
+			}
+			String payload = new String(message.getPayload());
+			LOGGER.debug("(" + topic + ") Message received (qos: " + message.getQos() + "):\n" + payload);
+			DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format);
+
+			if (dms == null) {
+				LOGGER.warn("MQTT Request received with unhandled content type: " + format);
+				sendErrorResponse("The format type is not handled", 
+						responseTopic.replace("/" + format, "/" + MqttConstants.MQTT_XML), 
+						aeId, MqttConstants.MQTT_XML);
+				return;
+			}
+			Object objectPayload = dms.stringToObj(payload);
+			if(objectPayload == null || !(objectPayload instanceof RequestPrimitive)){
+				LOGGER.info("Invalid content provided in MQTT request");
+				sendErrorResponse("Invalid content provided in request primitive", responseTopic, aeId, format);
+				return;
+			}
+			RequestPrimitive requestPrimitive = (RequestPrimitive) objectPayload;
+			requestPrimitive.setRequestContentType(MimeMediaType.OBJ);
+			requestPrimitive.setReturnContentType(MimeMediaType.OBJ);
+			// Primitive content handling
+			if(requestPrimitive.getPrimitiveContent() != null && 
+					!requestPrimitive.getPrimitiveContent().getAny().isEmpty() && 
+					requestPrimitive.getContent() == null){
+				requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0));
+			}
+			
+			ResponsePrimitive responsePrimitive = null;
+			
+			if(cseService != null){
+				// Sending the request to the CSE
+				responsePrimitive = cseService.doRequest(requestPrimitive);
+				
+				// Handling the custom "content" field and map it to PrimitiveContent for serialization
+				if(responsePrimitive.getContent() != null && 
+						responsePrimitive.getPrimitiveContent() == null){
+					PrimitiveContent pc = new PrimitiveContent();
+					pc.getAny().add(responsePrimitive.getContent());
+					responsePrimitive.setPrimitiveContent(pc);
+				}
+				
+				// Building and sending response
+				final String responsePayload = dms.objToString(responsePrimitive);
+				LOGGER.debug("Response to be sent on topic: " + responseTopic + ". Payload:\n" + responsePayload);
+				
+				// Sending the request in another thread otherwise it blocks the reception thread of Paho
+				QueueSender.queue(mainMqttClient, responseTopic, responsePayload.getBytes());
+			} else {
+				sendErrorResponse("/" + Constants.CSE_ID + " is not available", responseTopic, aeId, format, ResponseStatusCode.SERVICE_UNAVAILABLE);
+			}
+
+		} else {
+			LOGGER.debug("The topic is not well formed. (" + topic + ")");
+		}
+	}
+	
+	/**
+	 * Util method that send an error message to the client
+	 * @param message the message to send
+	 * @param responseTopic the response topic to reply on
+	 * @param aeId the id of the client
+	 * @param format the format of exchange
+	 */
+	private void sendErrorResponse(String message, String responseTopic,
+			String aeId, String format, BigInteger responseStatusCode) {
+		ResponsePrimitive responsePrimitive = new ResponsePrimitive();
+		responsePrimitive.setTo(aeId);
+		responsePrimitive.setFrom("/" + Constants.CSE_ID);
+		responsePrimitive.setResponseStatusCode(responseStatusCode);
+		responsePrimitive.setPrimitiveContent(new PrimitiveContent());
+		responsePrimitive.getPrimitiveContent().getAny().add(message);
+		DataMapperService dms = DataMapperRegistry.getFromMqttFormat(format);
+		byte[] errorPayload = dms.objToString(responsePrimitive).getBytes();
+		QueueSender.queue(mainMqttClient, responseTopic, errorPayload);
+	}
+	
+	/**
+	 * Send a bad request response to the client 
+	 * @param message the message
+	 * @param responseTopic the response topic
+	 * @param aeId the id of the client
+	 * @param format the format of exchange
+	 */
+	private void sendErrorResponse(String message, String responseTopic,
+			String aeId, String format){
+		sendErrorResponse(message, responseTopic, aeId, format, ResponseStatusCode.BAD_REQUEST);
+	}
+
+	/**
+	 * Disconnecting and closing the MQTT Client.
+	 */
+	public void close(){
+		// Disconnect the MQTT Client
+		try {
+			this.mainMqttClient.disconnect();
+		} catch (MqttException e) {
+			LOGGER.debug("Error disconnecting the MQTT Client", e);
+		}
+		// Prevent on any reconnection retry
+		retry = false;
+		// Wake up the retry thread after the false on "retry" has been set
+		if(retryThread != null && retryThread.isAlive()){
+			retryThread.interrupt();
+		}
+	}
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
new file mode 100644
index 0000000..7936749
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/MqttRestClient.java
@@ -0,0 +1,155 @@
+package org.eclipse.om2m.binding.mqtt;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.om2m.binding.mqtt.util.DataMapperRegistry;
+import org.eclipse.om2m.binding.mqtt.util.MqttConstants;
+import org.eclipse.om2m.binding.mqtt.util.QueueSender;
+import org.eclipse.om2m.binding.mqtt.util.ResponseRegistry;
+import org.eclipse.om2m.binding.mqtt.util.ResponseSemaphore;
+import org.eclipse.om2m.binding.service.RestClientService;
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.constants.ResponseStatusCode;
+import org.eclipse.om2m.commons.resource.PrimitiveContent;
+import org.eclipse.om2m.commons.resource.RequestPrimitive;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+public class MqttRestClient implements RestClientService {
+	
+	private static final Log LOGGER = LogFactory.getLog(MqttRestClient.class);
+	
+	@Override
+	public ResponsePrimitive sendRequest(RequestPrimitive requestPrimitive) {
+		if(requestPrimitive.getContent() != null){
+			PrimitiveContent pc = new PrimitiveContent();
+			switch(requestPrimitive.getRequestContentType()){
+			case MimeMediaType.XML:
+				pc.getAny().add(DataMapperRegistry.get(MimeMediaType.XML).stringToObj((String)requestPrimitive.getContent()));
+				break;
+			case MimeMediaType.JSON:
+				pc.getAny().add(DataMapperRegistry.get(MimeMediaType.JSON).stringToObj((String)requestPrimitive.getContent()));
+				break;
+			case MimeMediaType.OBJ: case MimeMediaType.TEXT_PLAIN:
+				pc.getAny().add(requestPrimitive.getContent());
+				break;
+			default:
+				break;
+			}
+			if(!pc.getAny().isEmpty()){
+				requestPrimitive.setPrimitiveContent(pc);
+			}
+		}
+		
+		ResponsePrimitive responsePrimitive = new ResponsePrimitive(requestPrimitive);
+
+		if(requestPrimitive.getMqttTopic() == null || requestPrimitive.getMqttUri() == null){
+			responsePrimitive.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);
+			return responsePrimitive;
+		}
+		
+		if(requestPrimitive.getRequestIdentifier() == null){
+			requestPrimitive.setRequestIdentifier(UUID.randomUUID().toString());
+		}
+		
+		String uri = requestPrimitive.getMqttUri();
+		if(uri.startsWith("mqtt://")){
+			uri = uri.replaceFirst("mqtt://", "tcp://");
+		}
+		
+		if(requestPrimitive.getTo().startsWith("mqtt://")){
+			Pattern mqttUriPatter = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");
+			Matcher matcher = mqttUriPatter.matcher(requestPrimitive.getTo());
+			if(matcher.matches()){
+				requestPrimitive.setTo(matcher.group(3));
+			}
+		}
+		
+		String topic = requestPrimitive.getMqttTopic();
+		String payload = null;
+		String format = null;
+		if (topic.endsWith("/json")){
+			payload = DataMapperRegistry.get(MimeMediaType.JSON).objToString(requestPrimitive);
+			format = "json";
+		} else {
+			// Case of XML and default
+			payload = DataMapperRegistry.get(MimeMediaType.XML).objToString(requestPrimitive);
+			format = "xml";
+		}
+		
+		try {
+			MqttClient mqttClient = new MqttClient(uri, requestPrimitive.getRequestIdentifier(), new MemoryPersistence());
+			mqttClient.connect();
+			LOGGER.debug("Sending request on topic: " + topic + " with payload:\n" + payload);
+			ResponseSemaphore responseSemaphore = null;
+			if(requestPrimitive.isMqttResponseExpected()){
+				Matcher matcher = MqttConstants.REQUEST_PATTERN_OUT.matcher(topic);
+				if(matcher.matches()){
+					String responseTopic = "/oneM2M/resp/" + matcher.group(1) + "/"+ Constants.CSE_ID + "/" + format; 
+					responseSemaphore = ResponseRegistry.createSemaphore(requestPrimitive.getRequestIdentifier(), mqttClient, responseTopic);
+				} else {					
+					responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+				}
+			} else {
+				mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
+				responsePrimitive.setResponseStatusCode(ResponseStatusCode.OK);
+			} 
+			if(responseSemaphore != null){
+				QueueSender.queue(mqttClient, topic, payload.getBytes());
+				LOGGER.debug("Waiting for response... (" + MqttConstants.TIME_OUT_DURATION + "s)");
+				boolean released = responseSemaphore.getSemaphore().tryAcquire(1, MqttConstants.TIME_OUT_DURATION, TimeUnit.SECONDS);
+				if(released){
+					responsePrimitive = responseSemaphore.getResponsePrimitive();
+					fillAndConvertContent(requestPrimitive, responsePrimitive);
+					LOGGER.debug("Response received: " + responsePrimitive);
+				} else {
+					responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+				}
+			}
+			mqttClient.disconnect();
+			mqttClient.close();
+		} catch (MqttException e) {
+			LOGGER.warn("Cannot connect to: " + requestPrimitive.getMqttUri());
+			responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+			return responsePrimitive;
+		} catch (InterruptedException e) {
+			LOGGER.error("Interrupted exception caught in MqttRestClient: " + e.getMessage());
+			responsePrimitive.setResponseStatusCode(ResponseStatusCode.TARGET_NOT_REACHABLE);
+			return responsePrimitive;
+		}
+		
+		return responsePrimitive;
+	}
+
+	private void fillAndConvertContent(RequestPrimitive requestPrimitive,
+			ResponsePrimitive responsePrimitive) {
+		if(responsePrimitive.getPrimitiveContent() != null && 
+				!responsePrimitive.getPrimitiveContent().getAny().isEmpty() && 
+				responsePrimitive.getContent() == null){
+			if(requestPrimitive.getReturnContentType().equals(MimeMediaType.OBJ)){
+				responsePrimitive.setContent(responsePrimitive.getPrimitiveContent().getAny().get(0));				
+			} else {
+				DataMapperService dms = DataMapperRegistry.get(requestPrimitive.getReturnContentType()); 
+				String content = dms.objToString(responsePrimitive.getPrimitiveContent().getAny().get(0));
+				responsePrimitive.setContent(content);
+				responsePrimitive.setContentType(requestPrimitive.getReturnContentType());
+			}
+		}
+	}
+
+	@Override
+	public String getProtocol() {
+		return MqttConstants.PROTOCOL;
+	}	
+	
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java
new file mode 100644
index 0000000..18127ab
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/DataMapperRegistry.java
@@ -0,0 +1,75 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+
+/**
+ * This class is used to store instances of {@link DataMapperService} classes.
+ *
+ */
+public class DataMapperRegistry {
+
+	/** Private constructor to avoid creation */
+	private DataMapperRegistry(){}
+	
+	/**
+	 * Service registry classified by data type handled.
+	 */
+	private static Map<String, DataMapperService> serviceRegistery = new HashMap<String, DataMapperService>();
+
+	/**
+	 * Add a new {@link DataMapperService} to the registery.
+	 * @param dms the service to register
+	 */
+	public static void register(DataMapperService dms){
+		if(dms != null && dms.getServiceDataType() != null){
+			serviceRegistery.put(dms.getServiceDataType(), dms);
+		}
+	}
+
+	/**
+	 * Retrieve a {@link DataMapperService} from a data type.
+	 * @param dataType the 
+	 * @return the {@link DataMapperService} that handle the data type or null if none
+	 */
+	public static DataMapperService get(String dataType){
+		return serviceRegistery.get(dataType);
+	}
+	
+	/**
+	 * Remove the {@link DataMapperService} from the registry
+	 * @param dataType the data type of the service to remove
+	 */
+	public static void remove(String dataType){
+		serviceRegistery.remove(dataType);
+	}
+	
+	/**
+	 * Remove the {@link DataMapperService} from the registry
+	 * @param dms the service to remove from the registry
+	 */
+	public static void remove(DataMapperService dms){
+		remove(dms.getServiceDataType());
+	}
+	
+	/**
+	 * Retrieve the {@link DataMapperService} from the registry 
+	 * from the MQTT format String
+	 * @param format the format of the DMS
+	 * @return the DMS with the specified format or null
+	 */
+	public static DataMapperService getFromMqttFormat(String format){
+		switch (format) {
+		case "xml":
+			return DataMapperRegistry.get(MimeMediaType.XML);
+		case "json":
+			return DataMapperRegistry.get(MimeMediaType.JSON);
+		default:
+			return null;
+		}
+	}
+	
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java
new file mode 100644
index 0000000..24fceff
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/MqttConstants.java
@@ -0,0 +1,48 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.regex.Pattern;
+
+import org.eclipse.om2m.commons.constants.Constants;
+
+/**
+ * A set of MQTT constants retrieved from the System.getProperty method.
+ *
+ */
+public final class MqttConstants {
+	
+	private MqttConstants(){}
+
+	/** Hostname of the main broker */
+	public static final String MQTT_BROKER_HOSTNAME = System.getProperty("org.eclipse.om2m.mqtt.ip", "127.0.0.1");
+	
+	/** IP of the main broker */
+	public static final int MQTT_BROKER_PORT = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.port", "1883"));
+	
+	/** Username to connect to broker */
+	public static final String MQTT_BROKER_USERNAME = System.getProperty("org.eclipse.om2m.mqtt.username");
+	
+	/** Password to connect to broker */
+	public static final String MQTT_BROKER_PASSWORD = System.getProperty("org.eclipse.om2m.mqtt.password");
+	
+	/** MQTT Protocol prefix */
+	public static final String PROTOCOL = "mqtt";
+	
+	/** Size of the request sender queue */
+	public static final int MQTT_QUEUE_SENDER_SIZE = Integer.valueOf(System.getProperty("org.eclipse.om2m.mqtt.queue.size", "8"));
+	
+	/** Request pattern to parse the request topic on message reception */
+	public static final Pattern REQUEST_PATTERN_IN = Pattern.compile("/oneM2M/req/([^/]+)/" + Constants.CSE_ID + "/(.*)");
+	
+	/** Request pattern when sending a message. */
+	public static final Pattern REQUEST_PATTERN_OUT = Pattern.compile("/oneM2M/req/" + Constants.CSE_ID+ "/([^/]+)+/(.*)");
+	
+	/** Time out duration when waiting for a response. Unit in second. */ 
+	public static final long TIME_OUT_DURATION = Long.valueOf(System.getProperty("org.eclipse.om2m.mqtt.timeout", "20"));
+	
+	/** MQTT format for XML in topic */
+	public static final String MQTT_XML = "xml";
+	
+	/** MQTT format for JSON in topic */
+	public static final String MQTT_JSON = "json";
+	
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java
new file mode 100644
index 0000000..43be0ec
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/QueueSender.java
@@ -0,0 +1,60 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+public final class QueueSender {
+	
+	private static final Log LOGGER = LogFactory.getLog(QueueSender.class);
+	private static ExecutorService threadPool;
+
+	static {
+		int queueSize = MqttConstants.MQTT_QUEUE_SENDER_SIZE <= 2 ? 2
+				: MqttConstants.MQTT_QUEUE_SENDER_SIZE;
+		threadPool = new ThreadPoolExecutor(2, queueSize, 1, TimeUnit.MINUTES,
+				new SynchronousQueue<Runnable>());
+	}
+
+	public static void queue(MqttClient mqttClient, String topic, byte[] payload){
+		LOGGER.debug("Sending MQTT message to " + mqttClient.getServerURI() + " topic: " + topic);
+		threadPool.execute(new MqttSender(mqttClient, topic, payload));
+	}
+	
+	private static class MqttSender implements Runnable {
+
+		private MqttClient mqttClient;
+		private String topic;
+		private byte[] payload;
+
+		public MqttSender(MqttClient mqttClient, String topic, byte[] payload) {
+			super();
+			this.mqttClient = mqttClient;
+			this.topic = topic;
+			this.payload = payload;
+		}
+
+		@Override
+		public void run() {
+			try {
+				this.mqttClient.publish(topic, payload, 1, false);
+			} catch (MqttException e) {
+				LOGGER.warn("Error publishing on topic: " + this.topic
+						+ " of broker " + this.mqttClient.getServerURI()
+						+ ". Error: " + e.getMessage());
+			}
+		}
+
+	}
+	
+	private QueueSender(){
+		// Empty and private constructor to avoid class creation
+	}
+
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java
new file mode 100644
index 0000000..20c7f24
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseRegistry.java
@@ -0,0 +1,99 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.eclipse.om2m.commons.constants.Constants;
+import org.eclipse.om2m.commons.constants.MimeMediaType;
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+import org.eclipse.om2m.datamapping.service.DataMapperService;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public final class ResponseRegistry {
+
+	private ResponseRegistry() {
+		// Empty and private constructor to avoid instantiation of this class
+	}
+	
+	private static final Map<String, ResponseSemaphore> responseMap = new HashMap<String, ResponseSemaphore>();
+
+	public static ResponseSemaphore createSemaphore(String requestIdentifier, MqttClient mqttClient, 
+			String responseTopic) throws MqttException{
+		synchronized (responseMap) {
+			mqttClient.setCallback(new ResponseCallback());
+			mqttClient.subscribe(responseTopic, 1);
+			if(!responseMap.containsKey(requestIdentifier)){
+				ResponseSemaphore respSemaphore = new ResponseSemaphore(new Semaphore(0));
+				responseMap.put(requestIdentifier, respSemaphore);
+				return respSemaphore;
+			}
+			return null;
+		}
+	}
+	
+	private static void responseReceived(ResponsePrimitive responsePrimitive){
+		synchronized (responseMap) {
+			if(responseMap.containsKey(responsePrimitive.getRequestIdentifier())){
+				ResponseSemaphore responseSemanphore = responseMap.get(responsePrimitive.getRequestIdentifier());
+				responseSemanphore.setResponsePrimitive(responsePrimitive);
+				responseSemanphore.getSemaphore().release();
+				responseMap.remove(responsePrimitive.getRequestIdentifier());
+			}			
+		}
+	}
+	
+	private static class ResponseCallback implements MqttCallback {
+
+		private static Pattern responsePattern = Pattern.compile("/oneM2M/resp/([^/]+)/" + Constants.CSE_ID + "/(.*)");
+
+		@Override
+		public void connectionLost(Throwable cause) {
+			// Ignore
+		}
+
+		@Override
+		public void deliveryComplete(IMqttDeliveryToken token) {
+			// Ignore
+		}
+
+		@Override
+		public void messageArrived(String topic, MqttMessage message)
+				throws Exception {
+			Matcher matcher = responsePattern.matcher(topic);
+			if(!matcher.matches()){
+				return;
+			}
+			String format = matcher.group(2);
+			DataMapperService dms = null;
+			switch (format) {
+			case "xml":
+				dms = DataMapperRegistry.get(MimeMediaType.XML);
+				break;
+			case "json":
+				dms = DataMapperRegistry.get(MimeMediaType.JSON);
+				break;
+			default:
+				break;
+			}
+			if(dms == null){
+				// The format is not handled here
+				return;
+			}
+			String payload = new String(message.getPayload());
+			ResponsePrimitive resp = (ResponsePrimitive) dms.stringToObj(payload);
+			if(resp == null || resp.getRequestIdentifier() == null){
+				return;
+			}
+			responseReceived(resp);
+		}
+		
+	}
+	
+}
diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java
new file mode 100644
index 0000000..7588712
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/ResponseSemaphore.java
@@ -0,0 +1,32 @@
+package org.eclipse.om2m.binding.mqtt.util;

+

+import java.util.concurrent.Semaphore;

+

+import org.eclipse.om2m.commons.resource.ResponsePrimitive;

+

+public class ResponseSemaphore {

+

+	private Semaphore semaphore;

+	private ResponsePrimitive responsePrimitive;

+

+	public ResponseSemaphore(Semaphore semaphore) {

+		this.semaphore = semaphore;

+	}

+

+	public Semaphore getSemaphore() {

+		return semaphore;

+	}

+

+	public void setSemaphore(Semaphore semaphore) {

+		this.semaphore = semaphore;

+	}

+

+	public ResponsePrimitive getResponsePrimitive() {

+		return responsePrimitive;

+	}

+

+	public void setResponsePrimitive(ResponsePrimitive responsePrimitive) {

+		this.responsePrimitive = responsePrimitive;

+	}

+

+}

diff --git a/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java
new file mode 100644
index 0000000..219fa66
--- /dev/null
+++ b/org.eclipse.om2m.binding.mqtt/src/main/java/org/eclipse/om2m/binding/mqtt/util/Utils.java
@@ -0,0 +1,19 @@
+package org.eclipse.om2m.binding.mqtt.util;
+
+import org.eclipse.om2m.commons.resource.ResponsePrimitive;
+
+public final class Utils {
+
+	public static void fillPrimitiveContent(){
+		
+	}
+	
+	public static void fillContent(ResponsePrimitive requestPrimitive){
+		if(requestPrimitive.getPrimitiveContent() != null && 
+				!requestPrimitive.getPrimitiveContent().getAny().isEmpty() && 
+				requestPrimitive.getContent() == null){
+			requestPrimitive.setContent(requestPrimitive.getPrimitiveContent().getAny().get(0));
+		}
+	}
+	
+}
diff --git a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java
index 7bb68b1..9e35199 100644
--- a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java
+++ b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/RequestPrimitive.java
@@ -135,7 +135,15 @@
 	protected String requestContentType;
 	@XmlTransient
 	protected Map<String,List<String>> queryStrings;
-	
+	@XmlTransient
+	protected String targetId;
+	@XmlTransient
+	protected String mqttTopic;
+	@XmlTransient
+	protected String mqttUri;
+	@XmlTransient
+	protected boolean mqttResponseExpected = true;
+
 	/**
 	 * @return the queryStrings
 	 */
@@ -558,7 +566,7 @@
 	 * @return the targetId
 	 */
 	public String getTargetId() {
-		return this.getTo();
+		return this.targetId;
 	}
 
 	/**
@@ -566,7 +574,7 @@
 	 *            the targetId to set
 	 */
 	public void setTargetId(String targetId) {
-		this.setTo(targetId);
+		this.targetId= targetId;
 	}
 
 	/**
@@ -597,6 +605,38 @@
 		this.requestContentType = requestContentType;
 	}
 
+	public PrimitiveContent getPrimitiveContent() {
+		return primitiveContent;
+	}
+
+	public void setPrimitiveContent(PrimitiveContent primitiveContent) {
+		this.primitiveContent = primitiveContent;
+	}
+
+	public String getMqttTopic() {
+		return mqttTopic;
+	}
+
+	public void setMqttTopic(String mqttTopic) {
+		this.mqttTopic = mqttTopic;
+	}
+
+	public String getMqttUri() {
+		return mqttUri;
+	}
+
+	public void setMqttUri(String mqttUri) {
+		this.mqttUri = mqttUri;
+	}
+	
+	public boolean isMqttResponseExpected() {
+		return mqttResponseExpected;
+	}
+
+	public void setMqttResponseExpected(boolean mqttResponseExpected) {
+		this.mqttResponseExpected = mqttResponseExpected;
+	}
+
 	/* (non-Javadoc)
 	 * @see java.lang.Object#toString()
 	 */
@@ -675,5 +715,5 @@
 		result.to = this.to;
 		return result;
 	}
-	
+
 }
diff --git a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java
index 93b6b1b..d6ad846 100644
--- a/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java
+++ b/org.eclipse.om2m.commons/src/main/java/org/eclipse/om2m/commons/resource/ResponsePrimitive.java
@@ -331,7 +331,7 @@
 		this.content = content;
 	}
 	
-	public PrimitiveContent getPritimitiveContent(){
+	public PrimitiveContent getPrimitiveContent(){
 		return content;
 	}
 	
diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java
index 1fc142a..424ea3f 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/controller/FanOutPointController.java
@@ -158,7 +158,7 @@
 		public ResponsePrimitive call() throws Exception {
 			ResponsePrimitive resp = new Router().doRequest(request);
 			resp.setPrimitiveContent(new PrimitiveContent());
-			resp.getPritimitiveContent().getAny().add(resp.getContent());
+			resp.getPrimitiveContent().getAny().add(resp.getContent());
 			return resp;
 		}
 		
diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java
index 4ea0d70..191f26c 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/notifier/Notifier.java
@@ -22,6 +22,8 @@
 import java.math.BigInteger;

 import java.util.ArrayList;

 import java.util.List;

+import java.util.regex.Matcher;

+import java.util.regex.Pattern;

 

 import org.apache.commons.logging.Log;

 import org.apache.commons.logging.LogFactory;

@@ -133,6 +135,25 @@
 		LOGGER.info("Sending notify request to: " + contact);

 		if(contact.matches(".*://.*")){ 

 			// Contact = protocol-dependent -> direct notification using the rest client.

+			// In case of MQTT, the URI of the broker and the Topic has to be handled separatly

+			if(contact.startsWith("mqtt://")){

+				Pattern mqttUriPattern = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");

+				Matcher matcher = mqttUriPattern.matcher(contact);

+				if(matcher.matches()){

+					String uri = matcher.group(1);

+					String topic = matcher.group(3) == null ? "" : matcher.group(3).substring(1);

+					request.setMqttTopic(topic);

+					request.setMqttUri(uri);

+					// We do not want to wait for a response on AE topic

+					request.setMqttResponseExpected(false);

+				} else {

+					ResponsePrimitive resp = new ResponsePrimitive(request);

+					resp.setResponseStatusCode(ResponseStatusCode.BAD_REQUEST);

+					resp.setContent("Error in mqtt URI");

+					resp.setContentType(MimeMediaType.TEXT_PLAIN);

+					return resp;

+				}

+			}

 			request.setTo(contact);

 			return RestClient.sendRequest(request);

 		}else{

@@ -267,7 +288,7 @@
 			for(final String uri : sub.getNotificationURI()){

 				CoreExecutor.postThread(new Runnable(){

 					public void run() {

-						Notifier.notify(request, uri);    					

+						Notifier.notify(request, uri);			

 					};

 				});

 			}

diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java
index fd20c05..b513235 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/redirector/Redirector.java
@@ -19,6 +19,9 @@
  *******************************************************************************/

 package org.eclipse.om2m.core.redirector;

 

+import java.util.regex.Matcher;

+import java.util.regex.Pattern;

+

 import org.apache.commons.logging.Log;

 import org.apache.commons.logging.LogFactory;

 import org.eclipse.om2m.commons.constants.CSEType;

@@ -114,15 +117,32 @@
 					LOGGER.debug("Removing / at the end of poa: " + url);

 					url = url.substring(0, url.length() - 1);

 				}

+								

 				

-				if(request.getTo().startsWith("//")){

-					url += request.getTo().replaceFirst("//", "/_/");

-				} else if(request.getTo().startsWith("/")){

-					url += request.getTo().replaceFirst("/", "/~/");

+				if(url.startsWith("mqtt://")){

+					url += request.getTo();

+					Pattern mqttUriPattern = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)");

+					Matcher matcher = mqttUriPattern.matcher(url);

+					if(matcher.matches()){

+						// FIXME we need a response but not yet implemented in MQTT binding

+						request.setMqttResponseExpected(true);

+						// TODO Format type can be enhanced

+						request.setMqttTopic("/oneM2M/req/" + Constants.CSE_ID + "/" + csrEntity.getRemoteCseId().replaceAll("/", "") + "/xml");

+						request.setMqttUri(matcher.group(1));

+					} else {

+						LOGGER.warn("Incorrect MQTT URI specified in remoteCSE: " + url);

+						i++;

+						continue;

+					}

 				} else {

-					url+= "/" + request.getTo();

+					if(request.getTo().startsWith("//")){

+						url += request.getTo().replaceFirst("//", "/_/");

+					} else if(request.getTo().startsWith("/")){

+						url += request.getTo().replaceFirst("/", "/~/");

+					} else {

+						url+= "/" + request.getTo();

+					}

 				}

-				

 				request.setTo(url);

 				ResponsePrimitive response = RestClient.sendRequest(request);

 				if(!(response.getResponseStatusCode()

@@ -192,6 +212,26 @@
 					done = true;

 				} else {

 					request.setTo(poa);

+					if(poa.startsWith("mqtt://")){

+						Pattern mqttUriPattern = Pattern.compile("(mqtt://[^:/]*(:[0-9]{1,5})?)(/.*)?");

+						Matcher matcher = mqttUriPattern.matcher(poa);

+						if(matcher.matches()){

+							String topic = matcher.group(3);

+							String aeId = ae.getAeid();

+							if(topic != null){

+								request.setMqttTopic(topic);

+								request.setMqttResponseExpected(false);

+							} else {

+								request.setMqttTopic("/oneM2M/req/" + Constants.CSE_ID + "/" + aeId + "/xml");

+								request.setMqttResponseExpected(true);

+							}

+							request.setMqttUri(matcher.group(1));

+						} else {

+							LOGGER.warn("POA is incorrect for MQTT: " + poa);

+							i++;

+							continue;

+						}

+					}

 					response = RestClient.sendRequest(request);

 					if(!response.getResponseStatusCode().equals(ResponseStatusCode.TARGET_NOT_REACHABLE)){

 						done = true;

diff --git a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java
index 44f4f5c..44b27cb 100644
--- a/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java
+++ b/org.eclipse.om2m.core/src/main/java/org/eclipse/om2m/core/router/Router.java
@@ -126,6 +126,10 @@
 			}
 
 			// URI Handling
+			if(request.getTo() == null && request.getTargetId() == null){
+				throw new BadRequestException("No To parameter provided provided");
+			}
+			
 			if(request.getTargetId() == null){
 				request.setTargetId(request.getTo());
 			}
diff --git a/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml b/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml
index 74b8d2a..583a3ee 100644
--- a/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml
+++ b/org.eclipse.om2m.datamapping.jaxb/src/main/resources/json-binding.xml
@@ -19,6 +19,19 @@
 			</java-attributes>
 		</java-type>
 
+        <!--  Request and Response Descriptions -->
+        <java-type name="RequestPrimitive">
+            <xml-root-element name="m2m:rqp"/>
+        </java-type>
+        
+        <java-type name="PrimitiveContent">
+            <xml-root-element name="pc"/>
+        </java-type>
+        
+        <java-type name="ResponsePrimitive">
+            <xml-root-element name="m2m:rsp"/>
+        </java-type>
+
 		<!-- CSE Descriptions -->
 		<java-type name="CSEBase">
 			<xml-root-element name="m2m:cb" />
diff --git a/org.eclipse.om2m.site.in-cse/om2m.product b/org.eclipse.om2m.site.in-cse/om2m.product
index 517b1d7..47305cb 100644
--- a/org.eclipse.om2m.site.in-cse/om2m.product
+++ b/org.eclipse.om2m.site.in-cse/om2m.product
@@ -52,6 +52,7 @@
       <plugin id="org.eclipse.jetty.util"/>
       <plugin id="org.eclipse.om2m.binding.coap"/>
       <plugin id="org.eclipse.om2m.binding.http"/>
+      <plugin id="org.eclipse.om2m.binding.mqtt"/>
       <plugin id="org.eclipse.om2m.binding.service"/>
       <plugin id="org.eclipse.om2m.commons"/>
       <plugin id="org.eclipse.om2m.commons.logging" fragment="true"/>
diff --git a/pom.xml b/pom.xml
index 9ee48c0..e365d1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,7 @@
 		<module>org.eclipse.om2m.commons.logging</module>
 		<module>org.eclipse.om2m.binding.http</module>
 		<module>org.eclipse.om2m.binding.coap</module>
+		<module>org.eclipse.om2m.binding.mqtt</module>
 		<module>org.eclipse.om2m.binding.service</module>
 		<module>org.eclipse.om2m.core.service</module>
 		<module>org.eclipse.om2m.datamapping.jaxb</module>