First commit of the azure runtime implementation.
diff --git a/runtime-impl/azure-runtime/.gitignore b/runtime-impl/azure-runtime/.gitignore
new file mode 100644
index 0000000..d6967c2
--- /dev/null
+++ b/runtime-impl/azure-runtime/.gitignore
@@ -0,0 +1,5 @@
+.DS_Store
+.idea
+*.iml
+
+target/
diff --git a/runtime-impl/azure-runtime/pom.xml b/runtime-impl/azure-runtime/pom.xml
new file mode 100644
index 0000000..8c4b978
--- /dev/null
+++ b/runtime-impl/azure-runtime/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.eclipse.jemo</groupId>
+    <artifactId>azure-runtime</artifactId>
+    <version>1.0</version>
+    <packaging>jar</packaging>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <!-- add azure sdk dependancies -->
+    <dependencies>
+        <!-- jemo -->
+        <dependency>
+            <groupId>org.eclipse.jemo</groupId>
+            <artifactId>jemo</artifactId>
+            <version>1.0.1</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- azure dependencies -->
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-eventhubs</artifactId>
+            <version>0.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-documentdb</artifactId>
+            <version>1.10.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>fluent-hc</artifactId>
+            <version>4.5.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-storage</artifactId>
+            <version>8.0.0</version>
+        </dependency>
+        <!-- end of azure dependancies -->
+
+        <!-- additional utility classes -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.5</version>
+        </dependency>
+        <!-- end of additional utility classes -->
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.0</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                    <!-- Additional configuration. -->
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <shadedClassifierName>shaded</shadedClassifierName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/java/org/eclipse/jemo/runtime/azure/AESCrypt.java b/runtime-impl/azure-runtime/src/main/java/org/eclipse/jemo/runtime/azure/AESCrypt.java
new file mode 100644
index 0000000..8526b7e
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/java/org/eclipse/jemo/runtime/azure/AESCrypt.java
@@ -0,0 +1,72 @@
+/*
+ ********************************************************************************
+ * Copyright (c) 9th November 2018 Cloudreach Limited Europe
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the Eclipse
+ * Public License, v. 2.0 are satisfied: GNU General Public License, version 2
+ * with the GNU Classpath Exception which is
+ * available at https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ ********************************************************************************/
+package org.eclipse.jemo.runtime.azure;
+
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.UnsupportedEncodingException;
+import java.security.*;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.KeySpec;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+
+/**
+ * Created by alirezafallahi on 17/02/16.
+ * See Page 38 : http://www.sagepay.co.uk/file/25041/download-document/FORM_Integration_and_Protocol_Guidelines_270815.pdf?token=cPVKBIYVFlUvhtarLjtjcgzmuYzWPjIbuZQocUKFrXU
+ */
+
+public class AESCrypt {
+
+    public AESCrypt() {
+        super();
+    }
+		
+		public static byte[] getKeyFromPassword(String password) throws InvalidKeySpecException,NoSuchAlgorithmException,UnsupportedEncodingException {
+			PBEKeySpec spec = new PBEKeySpec(password.toCharArray(),new byte[] {0,1,1,3,4,9,1,2,0,1,1,3,4,9,1,2},65536,128); // AES-256
+			SecretKeyFactory f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
+			return f.generateSecret(spec).getEncoded();
+		}
+
+    public static byte[] AESEncrypt(String sEncrypt, String charset, String pwd) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException, UnsupportedEncodingException {
+        return AES(1, sEncrypt.getBytes(charset), pwd.getBytes(charset));
+    }
+		
+		public static byte[] AESEncrypt(String sEncrypt, String charset, byte[] key) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException, UnsupportedEncodingException {
+        return AES(1, sEncrypt.getBytes(charset), key);
+    }
+
+    public static byte[] AESDecrypt(byte[] data, String pwd) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException, InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException,UnsupportedEncodingException {
+        return AES(2, data, pwd.getBytes("UTF-8"));
+    }
+		
+		public static byte[] AESDecrypt(byte[] data, byte[] key) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException, InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException,UnsupportedEncodingException {
+        return AES(2, data, key);
+    }
+
+    private static byte[] AES(int opmode, byte[] data, byte[] pwd) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException, InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException {
+        byte[] key_str = pwd;
+        SecretKeySpec sks = new SecretKeySpec(key_str, "AES");
+        Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
+        cipher.init(opmode, sks, new IvParameterSpec(key_str));
+        return cipher.doFinal(data);
+    }
+}
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/java/org/eclipse/jemo/runtime/azure/MicrosoftAzureRuntime.java b/runtime-impl/azure-runtime/src/main/java/org/eclipse/jemo/runtime/azure/MicrosoftAzureRuntime.java
new file mode 100644
index 0000000..0f3a633
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/java/org/eclipse/jemo/runtime/azure/MicrosoftAzureRuntime.java
@@ -0,0 +1,1830 @@
+/*
+ ********************************************************************************
+ * Copyright (c) 9th November 2018 Cloudreach Limited Europe
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the Eclipse
+ * Public License, v. 2.0 are satisfied: GNU General Public License, version 2
+ * with the GNU Classpath Exception which is
+ * available at https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ ********************************************************************************/
+package org.eclipse.jemo.runtime.azure;
+
+import org.eclipse.jemo.AbstractJemo;
+import org.eclipse.jemo.Jemo;
+import org.eclipse.jemo.internal.model.*;
+
+import static org.eclipse.jemo.AbstractJemo.getListFromJSON;
+import static org.eclipse.jemo.Jemo.fromJSONString;
+import static org.eclipse.jemo.Jemo.getValueFromJSON;
+import static org.eclipse.jemo.Jemo.executeFunction;
+import static org.eclipse.jemo.sys.JemoRuntimeSetup.TFVARS_FILE_NAME;
+import static org.eclipse.jemo.sys.internal.Util.*;
+import static org.eclipse.jemo.runtime.azure.MicrosoftAzureRuntime.HttpMode.GET;
+import static org.eclipse.jemo.runtime.azure.MicrosoftAzureRuntime.MESSAGE_MODEL.EVENTHUB;
+import static org.eclipse.jemo.runtime.azure.MicrosoftAzureRuntime.MESSAGE_MODEL.QUEUE;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+
+import org.eclipse.jemo.internal.model.CloudBlob;
+import org.eclipse.jemo.sys.JemoPluginManager;
+import org.eclipse.jemo.sys.JemoRuntimeSetup.ClusterCreationResponse;
+import org.eclipse.jemo.sys.JemoRuntimeSetup.SetupParams;
+import org.eclipse.jemo.sys.JemoRuntimeSetup.TerraformJobResponse;
+import org.eclipse.jemo.sys.ClusterParams;
+import org.eclipse.jemo.sys.ClusterParams.ClusterParam;
+import org.eclipse.jemo.sys.internal.TerraformJob;
+import org.eclipse.jemo.sys.internal.Util;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.FeedOptions;
+import com.microsoft.azure.documentdb.SqlParameter;
+import com.microsoft.azure.documentdb.SqlParameterCollection;
+import com.microsoft.azure.documentdb.SqlQuerySpec;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.*;
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueClient;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+
+import java.io.*;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.ws.Holder;
+
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.util.Config;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.digest.HmacUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.fluent.Form;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * TODO: we need to bring the test coverage on the Azure Runtime to 100% right now there are no tests and this is bad.
+ *
+ * @author Christopher Stura "christopher.stura@cloudreach.com"
+ */
+public class MicrosoftAzureRuntime implements CloudRuntime {
+
+    /**
+     * Azure specific constants
+     */
+    private static final String AZURE_RESOURCE_PORTAL = "https://management.azure.com/";
+
+    private static final String TENANT_ID_VAR_NAME = "tenant_id";
+    private static final String CLIENT_ID_VAR_NAME = "client_id";
+    private static final String CLIENT_SECRET_VAR_NAME = "client_secret";
+
+    private static final String PROP_RESOURCEGROUP = "eclipse.jemo.azure.resourcegroup";
+    private static final String PROP_EVENTHUB = "eclipse.jemo.azure.eventhub";
+    private static final String PROP_DB = "eclipse.jemo.azure.db";
+    private static final String PROP_STORAGE = "eclipse.jemo.azure.storage";
+    private static final String PROP_LOG_WORKSPACE = "eclipse.jemo.azure.log-workspace";
+    private static final String PROP_KEYVAULT = "eclipse.jemo.azure.keyvault";
+    private static final String PROP_MSG_MODEL = "eclipse.jemo.azure.msg.model";
+
+    private static final Set<String> REQUIRED_ACTIONS = new HashSet<>(asList(
+            "Microsoft.Resources/subscriptions/read",
+            "Microsoft.Storage/storageAccounts/listKeys/action",
+            "Microsoft.EventHub/namespaces/eventhubs/write",
+            "Microsoft.EventHub/namespaces/AuthorizationRules/listKeys/action",
+            "Microsoft.DocumentDB/databaseAccounts/listKeys/action",
+            "Microsoft.DocumentDB/databaseAccounts/read",
+            "Microsoft.OperationalInsights/workspaces/read",
+            "Microsoft.Operationalinsights/workspaces/sharedkeys/read",
+            "Microsoft.Authorization/roleAssignments/read",
+            "Microsoft.Authorization/roleDefinitions/read",
+            "Microsoft.Network/virtualNetworks/read",
+            "Microsoft.ManagedIdentity/userAssignedIdentities/read",
+            "Microsoft.KeyVault/vaults/read",
+            "Microsoft.KeyVault/vaults/secrets/read",
+            "Microsoft.KeyVault/vaults/secrets/write"));
+
+    private static String REGION;
+    private static AzureCredentials AZURE_CREDENTIALS;
+
+    private static String RESOURCE_GROUP;
+    private static String EVENT_HUB_NAMESPACE;
+    private static String DATABASE_ACCOUNT;
+    private static String STORAGE_ACCOUNT;
+    private static String LOG_WORKSPACE;
+    private static String KEY_VAULT;
+    private static MESSAGE_MODEL MSG_MODEL;
+
+    private static final String STORAGE_MODULE_CONTAINER = "jemopluginlib";
+    private String ENCRYPTION_KEY;
+
+    public MicrosoftAzureRuntime() throws IOException {
+        if (System.getProperty("eclipse.jemo.azure.encryptionKey") != null) {
+            ENCRYPTION_KEY = System.getProperty("eclipse.jemo.azure.encryptionKey");
+        } else {
+            final Path path = Paths.get("/kv/encryptionKey");
+            if (Files.exists(path)) {
+                ENCRYPTION_KEY = Files.lines(path).collect(Collectors.joining(""));
+            }
+        }
+
+        Properties properties = readPropertiesFile();
+        RESOURCE_GROUP = readProperty(PROP_RESOURCEGROUP, properties, "jemorg");
+        EVENT_HUB_NAMESPACE = readProperty(PROP_EVENTHUB, properties, "jemoehn");
+        DATABASE_ACCOUNT = readProperty(PROP_DB, properties, "jemocdba");
+        STORAGE_ACCOUNT = readProperty(PROP_STORAGE, properties, "jemosa");
+        LOG_WORKSPACE = readProperty(PROP_LOG_WORKSPACE, properties, "jemo-log-workspace");
+        KEY_VAULT = readProperty(PROP_KEYVAULT, properties, "jemokv");
+        MSG_MODEL = MESSAGE_MODEL.valueOf(readProperty(PROP_MSG_MODEL, properties, QUEUE.name()));
+    }
+
+    private static String readProperty(String propertyName, Properties properties, String defaultValue) {
+        final String value = readParameterFromJvmOrEnv(propertyName);
+        if (value != null) {
+            return value;
+        }
+
+        return properties == null || properties.getProperty(propertyName) == null ? defaultValue : properties.getProperty(propertyName);
+    }
+
+    static class AzureCredentials {
+        private final String tenantId, clientId, clientSecret;
+
+        public AzureCredentials(String tenantId, String clientId, String clientSecret) {
+            this.tenantId = tenantId;
+            this.clientId = clientId;
+            this.clientSecret = clientSecret;
+        }
+
+        @Override
+        public String toString() {
+            return "AzureCredentials{" +
+                    "tenantId='" + tenantId + '\'' +
+                    ", clientId='" + clientId + '\'' +
+                    ", clientSecret='" + clientSecret + '\'' +
+                    '}';
+        }
+    }
+
+    private static final AzureCredentials AZURE_CREDENTIALS() {
+        if (AZURE_CREDENTIALS == null) {
+            if (System.getProperty("eclipse.jemo.azure.tenantid") != null
+                    && System.getProperty("eclipse.jemo.azure.clientid") != null
+                    && System.getProperty("eclipse.jemo.azure.clientsecret") != null) {
+                Jemo.log(Level.FINE, "[AZURE][AZURE_CREDENTIALS] Credentials are found from jvm properties");
+                AZURE_CREDENTIALS = new AzureCredentials(
+                        System.getProperty("eclipse.jemo.azure.tenantid"),
+                        System.getProperty("eclipse.jemo.azure.clientid"),
+                        System.getProperty("eclipse.jemo.azure.clientsecret"));
+            } else {
+                Jemo.log(Level.FINE, "[AZURE][AZURE_CREDENTIALS] Checking if the credentials file exists.");
+                AZURE_CREDENTIALS = readCredentialsFromFile();
+                if (AZURE_CREDENTIALS == null) {
+                    Jemo.log(Level.FINE, "[AZURE][AZURE_CREDENTIALS] Could not find the azure credentials file, reading them from the key vault instead.");
+
+                    AZURE_CREDENTIALS = readFromDiscMountedToKeyVault();
+                    if (AZURE_CREDENTIALS != null) {
+                        Jemo.log(Level.FINE, "[AZURE][AZURE_CREDENTIALS] Credentials read from key vault: " + AZURE_CREDENTIALS());
+                    }
+                } else {
+                    REGION = readRegionFromFile();
+                    Jemo.log(Level.FINE, "Credentials read from the azure credentials file.");
+                }
+            }
+        }
+
+        return AZURE_CREDENTIALS;
+    }
+
+    private static AzureCredentials readFromDiscMountedToKeyVault() {
+        final Path secretsDirPath = Paths.get("/kv");
+        if (!Files.exists(secretsDirPath)) {
+            return null;
+        }
+
+        try {
+            final String clientId = Files.lines(secretsDirPath.resolve("clientId")).collect(Collectors.joining(""));
+            final String clientSecret = Files.lines(secretsDirPath.resolve("clientSecret")).collect(Collectors.joining(""));
+            final String tenantId = Files.lines(secretsDirPath.resolve("tenantId")).collect(Collectors.joining(""));
+            return new AzureCredentials(tenantId, clientId, clientSecret);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static AzureCredentials readCredentialsFromFile() {
+        final Path azureDirPath = azureDirectory();
+        if (!Files.exists(azureDirPath)) {
+            return null;
+        }
+
+        final Path credentialsFilePath = azureDirPath.resolve("credentials");
+        if (!Files.exists(credentialsFilePath)) {
+            return null;
+        }
+
+        try (FileInputStream stream = new FileInputStream(credentialsFilePath.toFile())) {
+            final Properties properties = new Properties();
+            properties.load(stream);
+
+            String tenantId = properties.getProperty("tenant");
+            String clientId = properties.getProperty("client");
+            String clientSecret = properties.getProperty("key");
+            return new AzureCredentials(tenantId, clientId, clientSecret);
+        } catch (IOException e) {
+            Jemo.log(Level.FINE, "[AZURE][readCredentialsFromFile] I was unable to read the credentialsFilePath from %s because of the error %s", azureDirPath, e.getMessage());
+            return null;
+        }
+    }
+
+    private static String readRegionFromFile() {
+        final Path credentialsFilePath = azureDirectory();
+        if (!Files.exists(credentialsFilePath)) {
+            return null;
+        }
+
+        try (FileInputStream stream = new FileInputStream(credentialsFilePath.resolve("region").toFile())) {
+            final Properties properties = new Properties();
+            properties.load(stream);
+            return properties.getProperty("region");
+        } catch (IOException e) {
+            Jemo.log(Level.FINE, "[AZURE][readCredentialsFromFile] I was unable to read the credentials from %s because of the error %s", credentialsFilePath, e.getMessage());
+            return null;
+        }
+    }
+
+    @Override
+    public void start(AbstractJemo jemoServer) {
+    }
+
+    enum HttpMode {
+        GET, POST, PUT, PATCH, DELETE
+    }
+
+    private HttpResponse sendRequestWithSubscriptionAndResourceGroup(HttpMode httpMode, String uriSuffix, String requestBody) {
+        return Util.F(null, x -> {
+            final String subId = getSubscriptionId(getAuthToken());
+            final String uri = AZURE_RESOURCE_PORTAL + "subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + uriSuffix;
+            return sendRequest(httpMode, uri, requestBody);
+        });
+    }
+
+    private HttpResponse sendRequestWithSubscription(HttpMode httpMode, String uriSuffix, String requestBody) {
+        return Util.F(null, x -> {
+            final String subId = getSubscriptionId(getAuthToken());
+            final String uri = AZURE_RESOURCE_PORTAL + "subscriptions/" + subId + uriSuffix;
+            return sendRequest(httpMode, uri, requestBody);
+        });
+    }
+
+    @NotNull
+    private HttpResponse sendRequest(HttpMode httpMode, String uri, String requestBody) {
+        return Util.F(null, x -> {
+            final String authToken = getAuthToken();
+            final Request request = createRequest(httpMode, uri)
+                    .addHeader("Authorization", "Bearer " + authToken);
+            if (httpMode != GET) {
+                request.bodyString(requestBody, ContentType.APPLICATION_JSON);
+            }
+            final HttpResponse response = request.execute().returnResponse();
+            if (response.getStatusLine().getStatusCode() >= 300) {
+                throw new IllegalStateException(response.getStatusLine().getStatusCode() +
+                        " - " + response.getStatusLine().getReasonPhrase() + " - " + responseBody(response));
+            }
+            return response;
+        });
+    }
+
+    @NotNull
+    private Request createRequest(HttpMode httpMode, String uri) {
+        switch (httpMode) {
+            case GET:
+                return Request.Get(uri);
+            case POST:
+                return Request.Post(uri);
+            case PUT:
+                return Request.Put(uri);
+            case PATCH:
+                return Request.Patch(uri);
+            case DELETE:
+                return Request.Delete(uri);
+            default:
+                throw new IllegalStateException("httpMode not supported yet: " + httpMode);
+        }
+    }
+
+    private static String responseBody(HttpResponse response) {
+        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+        return Util.F(byteOut, bo -> {
+            response.getEntity().writeTo(byteOut);
+            return byteOut.toString("UTF-8");
+        });
+    }
+
+    @Override
+    public void delete(String category, String key) {
+        executeFunction(rt -> {
+            getStorageContainer(category).getBlockBlobReference(key).deleteIfExists();
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void write(String category, String path, String key, InputStream dataStream) {
+        executeFunction(rt -> {
+            File tmpFile = JemoPluginManager.cacheStreamToFile(dataStream);
+            try {
+                getStorageContainer(category).getBlockBlobReference(path + "/" + key).upload(new FileInputStream(tmpFile), tmpFile.length());
+                Jemo.log(Level.FINE, "[AZURE][write][%s][%s][%s] stored", category, path, key);
+                return null;
+            } finally {
+                tmpFile.delete();
+            }
+        }, this);
+    }
+
+    @Override
+    public InputStream read(String category, String path, String key) {
+        return executeFunction(rt -> {
+            try {
+                com.microsoft.azure.storage.blob.CloudBlob blob = getStorageContainer(category).getBlobReferenceFromServer(path + "/" + key);
+                if (blob != null) {
+                    return blob.openInputStream();
+                }
+            } catch (StorageException e) {
+                Jemo.log(Level.FINE, "[AZURE][read][%s][%s][%s] not found", category, path, key);
+                return null;
+            }
+
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void uploadModule(String pluginFile, InputStream in, long moduleSize) {
+        executeFunction(rt -> {
+            getStorageContainer(STORAGE_MODULE_CONTAINER).getBlockBlobReference(pluginFile).upload(in, moduleSize);
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void dropNoSQLTable(String tableName) {
+        executeFunction(rt -> {
+            Iterator<DocumentCollection> collectionList = getDocumentDB().readCollections(noSQLDB.getSelfLink(), null).getQueryIterator();
+            DocumentCollection collection = null;
+            while (collectionList.hasNext()) {
+                collection = collectionList.next();
+                if (collection.getId().equalsIgnoreCase(tableName)) {
+                    break;
+                }
+            }
+            if (collection != null) {
+                getDocumentDB().deleteCollection(collection.getSelfLink(), null);
+            }
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void deleteNoSQL(String tableName, SystemDBObject... data) {
+        if (data != null && data.length > 0) {
+            executeFunction((rt) -> {
+                DocumentClient docDb = getDocumentDB();
+                DocumentCollection docTbl = docDb.readCollections(noSQLDB.getSelfLink(), null).getQueryIterable().toList().stream().filter(tbl -> tbl.getId().equalsIgnoreCase(tableName)).findAny().orElse(null);
+                AtomicInteger ctr = new AtomicInteger(1);
+                SqlParameterCollection paramList = new SqlParameterCollection();
+                String sqlQuery = "select * from root r where " + Arrays.asList(data).stream().map(obj -> {
+                    int paramId = ctr.incrementAndGet();
+                    paramList.add(new SqlParameter("@p" + paramId, obj.getId()));
+                    return "r.id = @p" + paramId;
+                }).collect(Collectors.joining(" OR "));
+                FeedOptions feedOpts = new FeedOptions();
+                feedOpts.setEnableCrossPartitionQuery(true);
+                feedOpts.setPageSize(data.length);
+                docDb.queryDocuments(docTbl.getSelfLink(), new SqlQuerySpec(sqlQuery, paramList), feedOpts).getQueryIterable().toList().stream().forEach(azureDoc -> {
+                    try {
+                        docDb.deleteDocument(azureDoc.getSelfLink(), null);
+                    } catch (DocumentClientException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                });
+
+                return null;
+            }, this);
+        }
+    }
+
+    enum MESSAGE_MODEL {
+        EVENTHUB, QUEUE
+    }
+
+    private String subscriptionId = null;
+    private DocumentClient documentDb = null;
+    private Database noSQLDB = null;
+    private CloudBlobClient blobStorage = null;
+    private CloudStorageAccount cloudStorage = null;
+    private CloudQueueClient queueStorage = null;
+    private CloudBlobContainer queueStorageContainer = null;
+    private Map<String, CloudBlobContainer> blobContainerMap = new HashMap<>();
+    private String eventHubNamespaceKey = null;
+    private String lastAuthToken = null;
+    private long authTokenExpiresOn = 0;
+    private Map<String, CountDownLatch> queuePollMap = new HashMap<>();
+    private Map<String, PartitionReceiver> queueRecieverMap = new HashMap<>();
+    private Map<String, EventHubClient> eventHubMap = new HashMap<>();
+    private Map<Integer, LogMetadata> moduleLogMap = new HashMap<>();
+    private ExecutorService EventProcessingThreadPool = Executors.newCachedThreadPool();
+
+    private String formatQueue(String queueName) {
+        return queueName.toLowerCase();
+    }
+
+    private byte[] serializeObject(Object obj) throws IOException {
+        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+        try (ObjectOutputStream objOut = new ObjectOutputStream(byteOut)) {
+            objOut.writeObject(obj);
+        }
+
+        return byteOut.toByteArray();
+    }
+
+    private Object deserializeObject(InputStream data, ClassLoader clsLoader) throws IOException, ClassNotFoundException {
+        try (ObjectInputStream objIn = new ObjectInputStream(data) {
+            @Override
+            protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+                return Class.forName(desc.getName(), false, clsLoader);
+            }
+        }) {
+            return objIn.readObject();
+        }
+    }
+
+    @Override
+    public void store(String key, Object data) {
+        store(STORAGE_MODULE_CONTAINER, key, data);
+    }
+
+    @Override
+    public <T> T retrieve(String key, Class<T> objType) {
+        return retrieve(STORAGE_MODULE_CONTAINER, key, objType);
+    }
+
+    @Override
+    public void store(String category, String key, Object data) {
+        executeFunction(rt -> {
+            byte[] pluginBytes = serializeObject(data);
+            getStorageContainer(category).getBlockBlobReference(key.endsWith(".moduledata") ? key : key + ".moduledata").uploadFromByteArray(pluginBytes, 0, pluginBytes.length);
+            Jemo.log(Level.FINE, "[AZURE][store][%s][%s] stored with value %s", category, key, data.toString());
+            return null;
+        }, this);
+    }
+
+    @Override
+    public <T> T retrieve(String category, String key, Class<T> objType) {
+        return executeFunction(rt -> {
+            try {
+                String storageKey = key + ".moduledata";
+                CloudBlobContainer moduleContainer = getStorageContainer(category);
+                //we need a test to prove that when we retrieve and write to a blob at the same time with multiple threads this does not fail.
+                com.microsoft.azure.storage.blob.CloudBlob blob = moduleContainer.getBlobReferenceFromServer(storageKey);
+                if (blob != null) {
+                    CloudBlob cloudBlob = new CloudBlob(storageKey, blob.getProperties().getLastModified().getTime(), blob.getProperties().getLength(), blob.openInputStream());
+                    Object obj = deserializeObject(cloudBlob.getDataStream(), objType.getClassLoader());
+                    Jemo.log(Level.FINE, "[AZURE][retrieve][%s][%s] retrieved with value %s", category, key, obj.toString());
+                    if (objType.isAssignableFrom(obj.getClass())) {
+                        return objType.cast(obj);
+                    }
+                }
+            } catch (StorageException storageEx) {
+            } //if there is a storage exception then we should return null
+            return null;
+        }, this);
+    }
+
+    private static class AzureDocumentWrapper {
+        private String id = null;
+        private SystemDBObject data = null;
+
+        public AzureDocumentWrapper() {
+        }
+
+        public AzureDocumentWrapper(SystemDBObject data) {
+            this.id = data.getId();
+            this.data = data;
+        }
+
+        public String getId() {
+            return id;
+        }
+
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        public SystemDBObject getData() {
+            return data;
+        }
+
+        public void setData(SystemDBObject data) {
+            this.data = data;
+        }
+    }
+
+    private String getAuthToken() throws IOException {
+        if (lastAuthToken == null || authTokenExpiresOn <= System.currentTimeMillis()) {
+            getAuthToken(AZURE_CREDENTIALS());
+        }
+
+        return lastAuthToken;
+    }
+
+    private void getAuthToken(AzureCredentials azureCredentials) throws IOException {
+        HttpResponse response = Request.Post("https://login.microsoftonline.com/" + azureCredentials.tenantId + "/oauth2/token")
+                .addHeader("Content-Type", "application/x-www-form-urlencoded")
+                .bodyForm(Form.form().add("grant_type", "client_credentials")
+                        .add("client_id", azureCredentials.clientId)
+                        .add("client_secret", azureCredentials.clientSecret)
+                        .add("resource", AZURE_RESOURCE_PORTAL).build()).execute().returnResponse();
+
+        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+        response.getEntity().writeTo(byteOut);
+        String authJsonResponse = new String(byteOut.toByteArray(), "UTF-8");
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IllegalStateException("Authentication failed. Http response body is: \n" + authJsonResponse);
+        }
+        Map<String, String> authResult = fromJSONString(Map.class, authJsonResponse);
+        final String accessToken = authResult.get("access_token");
+        lastAuthToken = accessToken;
+        authTokenExpiresOn = (Long.parseLong(authResult.get("expires_on")) - 5) * 1000; //we will actually consider the token invalid 5 seconds before it actually expires.
+    }
+
+    private String getSubscriptionId(String accessToken) throws IOException {
+        if (subscriptionId == null) {
+            Map<String, List<Map<String, String>>> subscriptionList = fromJSONString(Map.class, Request.Get(AZURE_RESOURCE_PORTAL + "subscriptions?api-version=2016-06-01")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2016-06-01").execute().returnContent().asString(Charset.forName("UTF-8")));
+            subscriptionId = subscriptionList.get("value").get(0).get("subscriptionId");
+        }
+
+        return subscriptionId;
+    }
+
+    @Override
+    public String defineQueue(String queueName) {
+        return executeFunction((rt) -> {
+            //on azure this is slightly different because we will have to create an event hub for each queue so the steps are.
+            //1. lets assume that the namespace already exists.
+            String authToken = getAuthToken();
+            String subId = getSubscriptionId(authToken);
+
+            switch (MSG_MODEL) {
+                case EVENTHUB:
+                    HttpResponse response = Request.Put(AZURE_RESOURCE_PORTAL + "subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.EventHub/namespaces/" + EVENT_HUB_NAMESPACE + "/eventhubs/" + queueName + "?api-version=2015-08-01")
+                            .addHeader("Authorization", "Bearer " + authToken)
+                            .addHeader("x-ms-version", "2016-06-01")
+                            .bodyString("{ \"properties\": { \"messageRetentionInDays\": 1, \"partitionCount\": 32 } }", ContentType.APPLICATION_JSON).execute().returnResponse();
+
+                    if (response.getStatusLine().getStatusCode() == 200) {
+                        Map<String, Object> retval = fromJSONString(Map.class, responseBody(response));
+                        return retval.get("id").toString();
+                    } else {
+                        throw new Exception("[" + queueName + "] queue could not be defined [" + response.getStatusLine().getStatusCode() + "] azure return value: " + responseBody(response));
+                    }
+                case QUEUE:
+                    CloudQueueClient cloudQueue = getQueueStorage();
+                    CloudQueue queue = cloudQueue.getQueueReference(formatQueue(queueName));
+                    try {
+                        queue.createIfNotExists();
+                    } catch (StorageException storageEx) {
+                        if (storageEx.getMessage().equals("The specified queue is being deleted.")) {
+                            //we should wait until this operation is complete and re-create the queue
+                            Jemo.log(Level.INFO, "[AZURE][%s] the queue is being deleted but is actually needed we are waiting 20 seconds for the race condition to be resolved and then we will retry the operation.", queueName);
+                            Thread.sleep(20000); //wait for 20 seconds before attempting recovery.
+                            return defineQueue(queueName);
+                        }
+                    }
+                    return queue.getName();
+            }
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void storeModuleList(String moduleJar, List<String> moduleList) throws Throwable {
+        executeFunction(rt -> {
+            uploadModule(moduleJar + ".modulelist", Jemo.toJSONString(moduleList).getBytes("UTF-8"));
+            return null;
+        }, this);
+    }
+
+    @Override
+    public List<String> getModuleList(String moduleJar) throws Throwable {
+        return executeFunction(rt -> {
+            CloudBlob ref = getModule(moduleJar + ".modulelist");
+            if (ref != null) {
+                return Jemo.fromJSONArray(String.class, new String(ref.getData(), "UTF-8"));
+            }
+
+            return null;
+        }, this);
+    }
+
+    @Override
+    public CloudBlob getModule(String moduleJar) throws IOException {
+        return executeFunction(rt -> {
+            try {
+                CloudBlobContainer moduleContainer = getModuleContainer();
+                com.microsoft.azure.storage.blob.CloudBlob blob = moduleContainer.getBlobReferenceFromServer(moduleJar);
+                return new CloudBlob(moduleJar, blob.getProperties().getLastModified().getTime(), blob.getProperties().getLength(), blob.openInputStream());
+            } catch (StorageException storageEx) {
+                return null;
+            }
+        }, this);
+    }
+
+    @Override
+    public Long getModuleInstallDate(String moduleJar) throws IOException {
+        return executeFunction(rt -> {
+            CloudBlob blob = getModule(moduleJar + ".installed");
+            if (blob != null) {
+                return Long.parseLong(new String(blob.getData(), "UTF-8"));
+            }
+
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void setModuleInstallDate(String moduleJar, long installDate) throws IOException {
+        executeFunction(rt -> {
+            uploadModule(moduleJar + ".installed", String.valueOf(installDate).getBytes("UTF-8"));
+            return null;
+        }, this);
+    }
+
+    private static class LogMetadata {
+        private String customerId = null;
+        private String provisioningState = null;
+        private String logAuthKey = null;
+        private String logName;
+
+        public LogMetadata(String logName) {
+            this.logName = logName;
+        }
+
+        public void init(MicrosoftAzureRuntime runtime) throws IOException {
+            String accessToken = runtime.getAuthToken();
+            String subId = runtime.getSubscriptionId(accessToken);
+
+            final HttpResponse response = Request.Get(AZURE_RESOURCE_PORTAL + "/subscriptions/" + subId +
+                    "/resourcegroups/" + RESOURCE_GROUP + "/providers/Microsoft.OperationalInsights/workspaces/" + URLEncoder.encode(logName, "UTF-8") + "?api-version=2015-11-01-preview")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2015-11-01-preview").execute().returnResponse();
+
+            Map<String, Map<String, String>> logObj = Jemo.fromJSONString(Map.class, responseBody(response));
+            this.customerId = logObj.get("properties").get("customerId");
+            this.provisioningState = logObj.get("properties").get("provisioningState");
+
+            final HttpResponse logAuthKeyResponse = Request.Post(AZURE_RESOURCE_PORTAL + "/subscriptions/" + subId + "/resourcegroups/" + RESOURCE_GROUP + "/providers/Microsoft.OperationalInsights/workspaces/" + URLEncoder.encode(logName, "UTF-8") + "/sharedKeys?api-version=2015-11-01-preview")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2015-11-01-preview").execute().returnResponse();
+            final String logAuthKeyResponseBody = responseBody(logAuthKeyResponse);
+
+            if (logAuthKeyResponse.getStatusLine().getStatusCode() >= 300) {
+                throw new IllegalStateException("Failed to get log auth key, http status code: [" + logAuthKeyResponse.getStatusLine().getStatusCode() + "] azure return value: " + logAuthKeyResponseBody);
+            }
+
+            Map<String, String> logAuthKeys = Jemo.fromJSONString(Map.class, logAuthKeyResponseBody);
+            this.logAuthKey = logAuthKeys.get("primarySharedKey");
+        }
+
+        public String getCustomerId() {
+            return customerId;
+        }
+
+        public void setCustomerId(String customerId) {
+            this.customerId = customerId;
+        }
+
+        public String getProvisioningState() {
+            return provisioningState;
+        }
+
+        public void setProvisioningState(String provisioningState) {
+            this.provisioningState = provisioningState;
+        }
+
+        public String getLogAuthKey() {
+            return logAuthKey;
+        }
+
+        public void setLogAuthKey(String logAuthKey) {
+            this.logAuthKey = logAuthKey;
+        }
+
+        public String getLogName() {
+            return logName;
+        }
+
+        public void setLogName(String logName) {
+            this.logName = logName;
+        }
+
+        public void write(List<CloudLogEvent> logEvents) throws NoSuchAlgorithmException, InvalidKeyException, UnsupportedEncodingException, JsonProcessingException, ClientProtocolException, IOException {
+            if ("Succeeded".equals(getProvisioningState())) {
+                SimpleDateFormat msXDateFormat = new SimpleDateFormat("EEE, dd MMM YYYY HH:mm:ss zzz");
+                msXDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
+                String xMsDate = msXDateFormat.format(new java.util.Date());
+                String bodyString = Jemo.toJSONString(logEvents);
+                Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
+                SecretKeySpec secret_key = new SecretKeySpec(Base64.decodeBase64(logAuthKey), "HmacSHA256");
+                sha256_HMAC.init(secret_key);
+                String message = "POST\n" + bodyString.length() + "\napplication/json\nx-ms-date:" + xMsDate + "\n/api/logs";
+                //String signature = Base64.encodeBase64String(sha256_HMAC.doFinal(message.getBytes("US-ASCII")));
+                String signature = Base64.encodeBase64String(HmacUtils.hmacSha256(Base64.decodeBase64(logAuthKey), message.getBytes("US-ASCII")));
+                Request.Post("https://" + customerId + ".ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
+                        .setHeader("Content-Type", "application/json")
+                        .setHeader("Authorization", "SharedKey " + customerId + ":" + signature)
+                        .setHeader("Log-Type", "APPLOG")
+                        .setHeader("x-ms-date", xMsDate)
+                        .bodyByteArray(bodyString.getBytes("UTF-8")).execute().returnResponse().getEntity().writeTo(System.out);
+            }
+        }
+    }
+
+    @Override
+    public void log(List<CloudLogEvent> eventList) {
+        //we need to make sure this is logged to the azure event log
+        if (eventList != null && !eventList.isEmpty()) {
+            //we need to organise the logs which need to be added by module id
+            Map<Integer, List<CloudLogEvent>> eventMap = eventList.parallelStream().collect(Collectors.groupingBy(CloudLogEvent::getModuleId));
+            eventMap.entrySet().parallelStream().forEach(e -> {
+                executeFunction(x -> {
+                    LogMetadata logEngine = moduleLogMap.get(e.getKey());
+                    if (logEngine == null) {
+                        logEngine = new LogMetadata(LOG_WORKSPACE);
+                        moduleLogMap.put(e.getKey(), logEngine);
+                        logEngine.init(this);
+                    }
+                    try {
+                        logEngine.write(e.getValue());
+                    } catch (IOException | InvalidKeyException ex) {
+                        Jemo.log(Level.WARNING, "[AZURE][%d] events were discared because of an error communicating with OMS %s", eventList.size(), ex.getMessage());
+                    }
+                    return null;
+                }, this);
+            });
+
+        }
+    }
+
+    @Override
+    public void deleteQueue(String queueId) {
+        executeFunction(rt -> {
+            //DELETE /subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{namespaceName}/eventhubs/{eventHubName}?api-version=2015-08-01
+            String authKey = getAuthToken();
+            switch (MSG_MODEL) {
+                case EVENTHUB:
+                    String response = Request.Delete(AZURE_RESOURCE_PORTAL + queueId + "?api-version=2015-08-01")
+                            .addHeader("Authorization", "Bearer " + authKey)
+                            .addHeader("x-ms-version", "2015-08-01").execute().returnContent().asString(Charset.forName("UTF-8"));
+                    break;
+                case QUEUE:
+                    getQueueStorage().getQueueReference(formatQueue(queueId)).deleteIfExists();
+                    break;
+            }
+
+            return null;
+        }, this);
+    }
+
+    private CloudBlobContainer getMessageContainer() throws ClientProtocolException, IOException, URISyntaxException, StorageException {
+        if (queueStorageContainer == null) {
+            queueStorageContainer = getBlobStorage().getContainerReference("queue-msg-data");
+            queueStorageContainer.createIfNotExists();
+        }
+
+        return queueStorageContainer;
+    }
+
+    @Override
+    public String sendMessage(String queueId, String jsonMessage) {
+        return executeFunction(rt -> {
+            switch (MSG_MODEL) {
+                case EVENTHUB:
+                    EventHubClient eHub = getEventHub(queueId);
+                    PartitionSender sender = eHub.createPartitionSenderSync("0");
+                    sender.sendSync(new EventData(jsonMessage.getBytes("UTF-8")));
+                    return UUID.randomUUID().toString();
+                case QUEUE:
+                    try {
+                        CloudBlobContainer msgStorage = getMessageContainer();
+                        byte[] msgBytes = jsonMessage.getBytes("UTF-8");
+                        String msgBlobId = UUID.randomUUID().toString();
+                        msgStorage.getBlockBlobReference(msgBlobId).uploadFromByteArray(msgBytes, 0, msgBytes.length);
+                        CloudQueueMessage msg = new CloudQueueMessage(msgBlobId);
+                        CloudQueue queueRef = getQueueStorage().getQueueReference(formatQueue(queueId));
+                        if (queueRef.exists()) {
+                            queueRef.addMessage(msg);
+                        } else {
+                            Jemo.log(Level.WARNING, "[AZURE][%s] does not exist. The message [%s] will not be sent", queueId, jsonMessage);
+                        }
+                        return msg.getId();
+                    } catch (StorageException storageEx) {
+                        throw new RuntimeException("The queue [" + queueId + "] does not exist", storageEx);
+                    }
+            }
+
+            return null;
+        }, this);
+    }
+
+    @Override
+    public String getQueueId(String queueName) {
+        return executeFunction(rt -> {
+            switch (MSG_MODEL) {
+                case EVENTHUB:
+                    return "/subscriptions/" + getSubscriptionId(getAuthToken()) + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.EventHub/namespaces/" + EVENT_HUB_NAMESPACE + "/eventhubs/" + queueName;
+                case QUEUE:
+                    return formatQueue(queueName);
+            }
+            return null;
+        }, this);
+    }
+
+    @Override
+    public List<String> listQueueIds(String location) {
+        return listQueueIds(location, false);
+    }
+
+    public EventHubClient getEventHub(String queueId) throws ClientProtocolException, IOException, ServiceBusException {
+        //POST /subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{namespaceName}/eventhubs/{eventHubName}/authorizationRules/{authorizationRuleName}/ListKeys?api-version=2015-08-01
+        if (eventHubNamespaceKey == null) {
+            String accessToken = getAuthToken();
+            String subId = getSubscriptionId(accessToken);
+            final HttpResponse response = Request.Post(AZURE_RESOURCE_PORTAL + "/subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.EventHub/namespaces/" + EVENT_HUB_NAMESPACE + "/AuthorizationRules/RootManageSharedAccessKey/listKeys?api-version=2015-08-01")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2015-08-01").execute().returnResponse();
+
+            if (response.getStatusLine().getStatusCode() >= 300) {
+                throw new IllegalStateException("Failed to list keys under eventhub namespace, http status code: [" + response.getStatusLine().getStatusCode() + "] azure return value: " + responseBody(response));
+            }
+
+            eventHubNamespaceKey = getValueFromJSON(responseBody(response), "$.primaryKey");
+        }
+
+        EventHubClient client = eventHubMap.get(queueId);
+        if (client == null) {
+            String queueName = queueId.substring(queueId.lastIndexOf('/') + 1);
+            ConnectionStringBuilder connString = new ConnectionStringBuilder(EVENT_HUB_NAMESPACE, queueName, "RootManageSharedAccessKey", eventHubNamespaceKey);
+            client = EventHubClient.createFromConnectionStringSync(connString.toString());
+            eventHubMap.put(queueId, client);
+        }
+
+        return client;
+    }
+
+    @Override
+    public int pollQueue(String queueId, CloudQueueProcessor processor) throws QueueDoesNotExistException {
+        switch (MSG_MODEL) {
+            case EVENTHUB:
+                return pollEventHub(queueId, processor);
+            case QUEUE:
+                return pollStorageQueue(queueId, processor);
+        }
+
+        return 0;
+    }
+
+    public int pollStorageQueue(String queueId, CloudQueueProcessor processor) throws QueueDoesNotExistException {
+        try {
+            //we should store a file on blob storage which contains the last time this queue was polled but only if this is an instance
+            //queue.
+            if (queueId.toLowerCase().startsWith("jemo-") && !queueId.toLowerCase().endsWith("-work-queue")) {
+                store("jemo-queue-usage", queueId, System.currentTimeMillis());
+            }
+            CloudQueue queue = getQueueStorage().getQueueReference(queueId);
+            queue.createIfNotExists();
+            List<CloudQueueMessage> msgList = new ArrayList<>();
+            List<Callable<Object>> msgRunList = new ArrayList<>();
+            long startRetrieval = System.currentTimeMillis();
+            try {
+                StreamSupport.stream(queue.retrieveMessages(32).spliterator(), true).forEach(cmsg -> {
+                    msgList.add(cmsg);
+                    try {
+                        queue.deleteMessage(cmsg);
+                        try {
+                            CloudBlockBlob msgBlob = getMessageContainer().getBlockBlobReference(cmsg.getMessageContentAsString());
+                            JemoMessage msg = Jemo.fromJSONString(JemoMessage.class, msgBlob.downloadText());
+                            msgBlob.delete();
+                            if (msg.getSourceInstance() != null) {
+                                processor.processMessage(msg);
+                            }
+                        } catch (IOException | StorageException | URISyntaxException ioex) {
+                        }
+                    } catch (StorageException ex) {
+                    }
+                });
+            } catch (StorageException ex) {
+            }
+			/*if(!msgList.isEmpty()) {
+				queue.downloadAttributes();
+				Jemo.log(Level.FINE, "[AZURE][%s] queue processing complete processed [%d] remaining [%d] - retrieval [%d ms] - processing [%d ms]",
+					queueId, msgList.size(), queue.getApproximateMessageCount(), endRetrieval - startRetrieval, endProcessing - endRetrieval);
+			}*/
+            return msgList.size();
+        } catch (StorageException | URISyntaxException | IOException storeEx) {
+            throw new QueueDoesNotExistException(String.format("[AZURE][%s] polling failed because of the error: %s", queueId, storeEx.getMessage()));
+        }
+    }
+
+    public int pollEventHub(String queueId, CloudQueueProcessor processor) throws QueueDoesNotExistException {
+        return executeFunction(rt -> {
+            Holder<Throwable> lastError = new Holder<>();
+            Holder<Integer> numMessages = new Holder<>(0);
+            try {
+                EventHubClient hubClient = getEventHub(queueId);
+                PartitionReceiver rcv = hubClient.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, "0", Instant.now());
+                CountDownLatch latch = queuePollMap.get(queueId);
+                if (latch != null) {
+                    PartitionReceiver prevReciever = queueRecieverMap.get(queueId);
+                    prevReciever.closeSync();
+                    queueRecieverMap.remove(queueId);
+                    queuePollMap.remove(queueId);
+                }
+                final CountDownLatch pollLatch = new CountDownLatch(1);
+                queueRecieverMap.put(queueId, rcv);
+                queuePollMap.put(queueId, latch);
+                rcv.setReceiveHandler(new PartitionReceiveHandler(10) {
+                    @Override
+                    public void onReceive(Iterable<EventData> itr) {
+                        ArrayList<EventData> eventList = new ArrayList<>();
+                        itr.forEach(e -> eventList.add(e));
+                        numMessages.value += eventList.size();
+                        eventList.parallelStream().forEach(e -> {
+                            try {
+                                JemoMessage msg = Jemo.fromJSONString(JemoMessage.class, new String(e.getBody(), "UTF-8"));
+                                if (msg.getSourceInstance() != null) {
+                                    processor.processMessage(msg);
+                                }
+                            } catch (IOException encEx) {
+                            }
+                        });
+                    }
+
+                    @Override
+                    public void onError(Throwable ex) {
+                        lastError.value = ex;
+                        try {
+                            rcv.closeSync();
+                        } catch (ServiceBusException svcBusEx) {
+                        }
+                        pollLatch.countDown();
+                    }
+                });
+                pollLatch.await(); //wait until there is an error.
+                queueRecieverMap.remove(queueId);
+                queuePollMap.remove(queueId);
+            } catch (Throwable ex) {
+                lastError.value = ex;
+            }
+            //we need to wait for the process to terminate
+            if (lastError.value != null) {
+                Jemo.log(Level.WARNING, "[%s][%s] Error thrown while polling queue %s", CloudProvider.AZURE.name(), queueId, JemoError.toString(lastError.value));
+            }
+            return numMessages.value;
+        }, this);
+    }
+
+    private DocumentClient getDocumentDB() throws ClientProtocolException, IOException, DocumentClientException {
+        if (documentDb == null) {
+            String accessToken = getAuthToken();
+            String subId = getSubscriptionId(accessToken);
+            HttpResponse response = Request.Post(AZURE_RESOURCE_PORTAL + "/subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.DocumentDB/databaseAccounts/" + DATABASE_ACCOUNT + "/listKeys?api-version=2015-04-08")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2015-04-08").execute().returnResponse();
+            if (response.getStatusLine().getStatusCode() >= 300) {
+                throw new IllegalStateException("Failed to list keys under database account, http status code: [" + response.getStatusLine().getStatusCode() + "] azure return value: " + responseBody(response));
+            }
+
+            String primaryKey = getValueFromJSON(responseBody(response), "$.primaryMasterKey");
+
+            //we should also get the URI for the document db account.
+            response = Request.Get(AZURE_RESOURCE_PORTAL + "/subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.DocumentDB/databaseAccounts/" + DATABASE_ACCOUNT + "?api-version=2015-04-08")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2015-04-08").execute().returnResponse();
+
+            if (response.getStatusLine().getStatusCode() >= 300) {
+                throw new IllegalStateException("Failed to get document db, http status code: [" + response.getStatusLine().getStatusCode() + "] azure return value: " + responseBody(response));
+            }
+
+            String endpoint = getValueFromJSON(responseBody(response), "$.properties.documentEndpoint");
+            documentDb = new DocumentClient(endpoint, primaryKey, ConnectionPolicy.GetDefault(), ConsistencyLevel.Session);
+
+            //we need to make sure a database named DATABASE_ACCOUNT exists
+            Holder<Database> db = new Holder<>();
+            documentDb.readDatabases(null).getQueryIterable().forEach(dbr -> {
+                if (dbr.getId().equals(DATABASE_ACCOUNT)) {
+                    db.value = dbr;
+                }
+            });
+            if (db.value == null) {
+                //we should create the database.
+                db.value = new Database();
+                db.value.setId(DATABASE_ACCOUNT);
+                db.value = documentDb.createDatabase(db.value, null).getResource();
+            }
+
+            this.noSQLDB = db.value;
+        }
+        return documentDb;
+    }
+
+    @Override
+    public boolean hasNoSQLTable(String tableName) {
+        return executeFunction((rt) -> {
+            DocumentClient docDb = getDocumentDB();
+            return docDb.readCollections(noSQLDB.getSelfLink(), null).getQueryIterable().toList().parallelStream().anyMatch(tbl -> tbl.getId().equalsIgnoreCase(tableName));
+        }, this);
+    }
+
+    @Override
+    public void createNoSQLTable(String tableName) {
+        if (!hasNoSQLTable(tableName)) {
+            executeFunction((rt) -> {
+                DocumentClient docDb = getDocumentDB();
+                DocumentCollection docTable = new DocumentCollection();
+                docTable.setId(tableName);
+                docDb.createCollection(noSQLDB.getSelfLink(), docTable, null);
+                return null;
+            }, this);
+        }
+    }
+
+    @Override
+    public <T> List<T> listNoSQL(String tableName, Class<T> objectType) {
+        return executeFunction((rt) -> {
+            List retval = new ArrayList<>();
+            DocumentClient docDb = getDocumentDB();
+            DocumentCollection docTbl = docDb.readCollections(noSQLDB.getSelfLink(), null).getQueryIterable().toList().stream().filter(tbl -> tbl.getId().equalsIgnoreCase(tableName)).findAny().orElse(null);
+            List<Document> docList = docDb.queryDocuments(docTbl.getSelfLink(), new SqlQuerySpec("SELECT * FROM root r"), null).getQueryIterable().toList();
+            docList.forEach(doc -> retval.add(doc.getObject("data", objectType)));
+            return retval;
+        }, this);
+    }
+
+    @Override
+    public <T> List<T> queryNoSQL(String tableName, Class<T> objectType, String... pkList) {
+        return executeFunction((rt) -> {
+            if (pkList == null || pkList.length == 0) {
+                return listNoSQL(tableName, objectType);
+            }
+            List retval = new ArrayList<>();
+            DocumentClient docDb = getDocumentDB();
+            String orSql = IntStream.range(0, pkList.length).mapToObj(i -> i).collect(Collectors.mapping(pk -> "r.id = @p" + String.valueOf(pk), Collectors.joining(" OR ")));
+            List<SqlParameter> paramList = IntStream.range(0, pkList.length).mapToObj(i -> i).collect(Collectors.mapping(pk -> new SqlParameter("@p" + String.valueOf(pk), pkList[pk]), toList()));
+            DocumentCollection docTbl = docDb.readCollections(noSQLDB.getSelfLink(), null).getQueryIterable().toList().stream().filter(tbl -> tbl.getId().equalsIgnoreCase(tableName)).findAny().orElse(null);
+            List<Document> docList = docDb.queryDocuments(docTbl.getSelfLink(), new SqlQuerySpec("SELECT * FROM root r WHERE "
+                    + orSql, new SqlParameterCollection(paramList)), null).getQueryIterable().toList();
+            docList.forEach(doc -> retval.add(doc.getObject("data", objectType)));
+            return retval;
+        }, this);
+    }
+
+    @Override
+    public <T> T getNoSQL(String tableName, String id, Class<T> objectType) throws IOException {
+        return executeFunction((rt) -> {
+            DocumentClient docDb = getDocumentDB();
+            DocumentCollection docTbl = docDb.readCollections(noSQLDB.getSelfLink(), null).getQueryIterable().toList().stream().filter(tbl -> tbl.getId().equalsIgnoreCase(tableName)).findAny().orElse(null);
+            List<Document> docList = docDb.queryDocuments(docTbl.getSelfLink(), new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id))), null).getQueryIterable().toList();
+            if (!docList.isEmpty()) {
+                return docList.get(0).getObject("data", objectType);
+            }
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void saveNoSQL(String tableName, SystemDBObject... data) {
+        if (data != null && data.length > 0) {
+            executeFunction((rt) -> {
+                DocumentClient docDb = getDocumentDB();
+                DocumentCollection docTbl = docDb.readCollections(noSQLDB.getSelfLink(), null).getQueryIterable().toList().stream().filter(tbl -> tbl.getId().equalsIgnoreCase(tableName)).findAny().orElse(null);
+                Arrays.asList(data).parallelStream().forEach(obj -> {
+                    String jsonObj = null;
+                    try {
+                        AzureDocumentWrapper azureDoc = new AzureDocumentWrapper(obj);
+                        jsonObj = Jemo.toJSONString(azureDoc);
+                        docDb.upsertDocument(docTbl.getSelfLink(), new Document(jsonObj), null, true);
+                    } catch (DocumentClientException | JsonProcessingException docEx) {
+                        throw new RuntimeException(jsonObj, docEx);
+                    }
+                });
+                return null;
+            }, this);
+        }
+    }
+
+    @Override
+    public void watchdog(String location, String instanceId, String instanceQueueUrl) {
+        //it would be good in the watchdog process to sweep the queue storage to get rid of dead ugly queues
+        //that nobody uses anymore.
+        Holder<Integer> cleanedQueues = new Holder<>(0);
+        switch (MSG_MODEL) {
+            case QUEUE:
+                listQueueIds(null).stream().forEach(qId -> {
+                    Long lastQueueUse = retrieve("jemo-queue-usage", qId, Long.class);
+                    if (lastQueueUse == null || TimeUnit.MINUTES.convert(System.currentTimeMillis() - lastQueueUse, TimeUnit.MILLISECONDS) > 15) {
+                        //if a queue was inactive for more than 15 minutes then delete it.
+                        executeFunction((rt) -> {
+                            CloudQueueClient cloudQueue = getQueueStorage();
+                            CloudQueue queue = cloudQueue.getQueueReference(qId);
+                            queue.downloadAttributes();
+                            Jemo.log(Level.INFO, "[AZURE][CLOUDGC][%s] appears to be dead and contains [%d] messages to process it was last polled on [%s]",
+                                    qId, queue.getApproximateMessageCount(), lastQueueUse == null ? "never" : new SimpleDateFormat("dd-MM-yyyy HH:MM:ss").format(new java.util.Date(lastQueueUse)));
+                            queue.deleteIfExists();
+                            return null;
+                        }, this);
+                        cleanedQueues.value++;
+                    }
+                });
+                break;
+        }
+        Jemo.log(Level.INFO, "[AZURE][CLOUDGC] - Cloud Infrastructure Garbage Collection Running - Queues [%d]", cleanedQueues.value);
+    }
+
+    private CloudStorageAccount getCloudStorage() throws ClientProtocolException, IOException, URISyntaxException {
+        if (cloudStorage == null) {
+            String accessToken = getAuthToken();
+            String subId = getSubscriptionId(accessToken);
+            final Response response = Request.Post(AZURE_RESOURCE_PORTAL + "subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT + "/listKeys?api-version=2016-12-01")
+                    .addHeader("Authorization", "Bearer " + accessToken)
+                    .addHeader("x-ms-version", "2016-12-01").execute();
+            String postResult = response.returnContent().asString(Charset.forName("UTF-8"));
+            String authKey = Jemo.getValueFromJSON(postResult, "$.keys[0].value");
+
+            cloudStorage = new CloudStorageAccount(new StorageCredentialsAccountAndKey(STORAGE_ACCOUNT, authKey), true);
+        }
+
+        return cloudStorage;
+    }
+
+    private CloudBlobClient getBlobStorage() throws ClientProtocolException, IOException, URISyntaxException {
+        //POST /subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{accountName}/listKeys?api-version=2016-12-01
+        if (blobStorage == null) {
+            blobStorage = getCloudStorage().createCloudBlobClient();
+        }
+
+        return blobStorage;
+    }
+
+    private CloudQueueClient getQueueStorage() throws ClientProtocolException, IOException, URISyntaxException {
+        if (queueStorage == null) {
+            queueStorage = getCloudStorage().createCloudQueueClient();
+        }
+
+        return queueStorage;
+    }
+
+    private CloudBlobContainer getModuleContainer() throws ClientProtocolException, IOException, URISyntaxException, StorageException {
+        return getStorageContainer(STORAGE_MODULE_CONTAINER);
+    }
+
+    private CloudBlobContainer getStorageContainer(String containerKey) throws ClientProtocolException, IOException, URISyntaxException, StorageException {
+        CloudBlobContainer storageContainer = blobContainerMap.get(containerKey);
+        if (storageContainer == null) {
+            CloudBlobClient blobClient = getBlobStorage();
+            storageContainer = blobClient.getContainerReference(containerKey);
+            storageContainer.createIfNotExists();
+            blobContainerMap.put(containerKey, storageContainer);
+        }
+
+        return storageContainer;
+    }
+
+    @Override
+    public Set<String> listModules() {
+        //this should work with Azure Blob storage
+        return executeFunction(rt -> {
+            HashSet<String> moduleList = new HashSet<>();
+            getModuleContainer().listBlobs().forEach(b -> {
+                if (b instanceof CloudBlockBlob) {
+                    CloudBlockBlob cb = CloudBlockBlob.class.cast(b);
+                    if (cb.getName().endsWith(".jar")) {
+                        moduleList.add(cb.getName());
+                    }
+                }
+            });
+            return moduleList;
+        }, this);
+    }
+
+    @Override
+    public void uploadModule(String pluginFile, byte[] pluginBytes) {
+        executeFunction(rt -> {
+            getModuleContainer().getBlockBlobReference(pluginFile).uploadFromByteArray(pluginBytes, 0, pluginBytes.length);
+            return null;
+        }, this);
+    }
+
+    @Override
+    public void setModuleConfiguration(int pluginId, ModuleConfiguration config) {
+        executeFunction(rt -> {
+            Map<String, String> modConfig = getModuleConfiguration(pluginId);
+            config.getParameters().forEach(p -> {
+                switch (p.getOperation()) {
+                    case delete:
+                        modConfig.remove(p.getKey());
+                        break;
+                    case upsert:
+                        modConfig.put(p.getKey(), p.getValue());
+                        break;
+                }
+            });
+            uploadModule(String.valueOf(pluginId) + "_configuration.json", AESCrypt.AESEncrypt(Jemo.toJSONString(modConfig), "UTF-8", ENCRYPTION_KEY));
+
+            return null;
+        }, this);
+    }
+
+    @Override
+    public Map<String, String> getModuleConfiguration(int pluginId) {
+        return executeFunction(rt -> {
+            Map<String, String> config = new HashMap<>(); //we could very well store this on blob store
+            CloudBlob configBlob = getModule(String.valueOf(pluginId) + "_configuration.json");
+            if (configBlob != null) {
+                //decrypt the configuration string
+                String configData = new String(AESCrypt.AESDecrypt(configBlob.getData(), ENCRYPTION_KEY), "UTF-8");
+                Map<String, String> savedConfig = Jemo.fromJSONString(Map.class, configData);
+                if (!savedConfig.isEmpty()) {
+                    config.putAll(savedConfig);
+                }
+            }
+            return config;
+        }, this);
+    }
+
+    @Override
+    public List<String> listQueueIds(String location, boolean includeWorkQueues) {
+        return executeFunction(rt -> {
+            //GET /subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{namespaceName}/eventhubs?api-version=2015-08-01
+            List<String> queueList = new ArrayList<>();
+            switch (MSG_MODEL) {
+                case EVENTHUB:
+                    String authKey = getAuthToken();
+                    String subId = getSubscriptionId(authKey);
+                    final Response response = Request.Get(AZURE_RESOURCE_PORTAL + "subscriptions/" + subId + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.EventHub/namespaces/" + EVENT_HUB_NAMESPACE + "/eventhubs?api-version=2015-08-01")
+                            .addHeader("Authorization", "Bearer " + authKey)
+                            .addHeader("x-ms-version", "2015-08-01").execute();
+                    try {
+                        queueList.addAll(getListFromJSON(response.returnContent().asString(Charset.forName("UTF-8")), "$.value[*].id"));
+                    } catch (Exception e) {
+                        Jemo.log(Level.FINE, "[AZURE][listQueueIds] EVENTHUB failure status code: " + response.returnResponse().getStatusLine() + ", body: " + response.returnResponse().getEntity().getContent());
+                    }
+                    return queueList.parallelStream().filter(qId -> {
+                        String qName = qId.substring(qId.lastIndexOf('/') + 1).toUpperCase();
+                        return (includeWorkQueues || !qName.contains("WORK-QUEUE")) && qName.indexOf(("jemo-" + (location == null ? "" : location)).toUpperCase()) == 0;
+                    }).collect(toList());
+                case QUEUE:
+                    getQueueStorage().listQueues("jemo-").forEach(q -> {
+                        if ((includeWorkQueues || !q.getName().toUpperCase().contains("WORK-QUEUE")) && q.getName().toUpperCase().indexOf(("jemo-" + (location == null ? "" : location)).toUpperCase()) == 0) {
+                            queueList.add(q.getName());
+                        }
+                    });
+                    return queueList;
+            }
+            return null;
+        }, this);
+    }
+
+    @Override
+    public String getDefaultCategory() {
+        return STORAGE_MODULE_CONTAINER;
+    }
+
+    @Override
+    public Stream<InputStream> readAll(String category, String path) {
+        return executeFunction(rt -> {
+            return StreamSupport.stream(getStorageContainer(category).listBlobs(path + "/").spliterator(), true)
+                    .filter(i -> i instanceof CloudBlob || i instanceof CloudBlockBlob)
+                    .map(i -> {
+                        if (i instanceof CloudBlob) {
+                            return CloudBlob.class.cast(i).getDataStream();
+                        } else {
+                            try {
+                                return CloudBlockBlob.class.cast(i).openInputStream();
+                            } catch (StorageException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    }).parallel();
+        }, this);
+    }
+
+    @Override
+    public void remove(String category, String path, String key) {
+        executeFunction(rt -> {
+            getStorageContainer(category).getBlockBlobReference(path == null ? key : path + "/" + key).deleteIfExists();
+            return null;
+        }, this);
+    }
+
+    @Override
+    public String getQueueName(String queueId) {
+        return queueId.substring(queueId.lastIndexOf('/') + 1);
+    }
+
+    @Override
+    public ValidationResult validateCredentials(Map<String, String> map) {
+        final AzureCredentials azureCredentials = new AzureCredentials(map.get(TENANT_ID_VAR_NAME),
+                map.get(CLIENT_ID_VAR_NAME),
+                map.get(CLIENT_SECRET_VAR_NAME));
+        return validateCredentials(azureCredentials);
+    }
+
+    private ValidationResult validateCredentials(AzureCredentials azureCredentials) {
+        try {
+            getAuthToken(azureCredentials);
+            AZURE_CREDENTIALS = azureCredentials;
+            return ValidationResult.SUCCESS;
+        } catch (Throwable t) {
+            Jemo.log(Level.FINE, "[AZURE][validateCredentials] User credentials validation failed:  %s", t.getMessage());
+            return new ValidationResult(Collections.singletonList(t.getMessage()));
+        }
+    }
+
+    @Override
+    public ValidationResult validatePermissions() {
+        if (lastAuthToken == null) {
+            final ValidationResult validationResult = validateCredentials(AZURE_CREDENTIALS());
+            if (!validationResult.isSuccess()) {
+                return validationResult;
+            }
+        }
+
+        try {
+            HttpResponse authKeyResponse = Request.Post("https://login.microsoftonline.com/" + AZURE_CREDENTIALS().tenantId + "/oauth2/token")
+                    .addHeader("Content-Type", "application/x-www-form-urlencoded")
+                    .bodyForm(Form.form().add("grant_type", "client_credentials")
+                            .add("client_id", AZURE_CREDENTIALS().clientId)
+                            .add("client_secret", AZURE_CREDENTIALS().clientSecret)
+                            .add("resource", "https://graph.windows.net/").build()).execute().returnResponse();
+
+            Map<String, String> authResult = fromJSONString(Map.class, responseBody(authKeyResponse));
+            String authKey = authResult.get("access_token");
+
+            final HttpResponse principalObjectIdResponse = Request.Get(
+                    "https://graph.windows.net/" + AZURE_CREDENTIALS().tenantId + "/servicePrincipals?$filter=servicePrincipalNames%2Fany%28c%3Ac%20eq%20%27" + AZURE_CREDENTIALS().clientId + "%27%29&api-version=1.6")
+                    .addHeader("Authorization", "Bearer " + authKey)
+                    .execute().returnResponse();
+            final String principalServiceObjectId = getValueFromJSON(responseBody(principalObjectIdResponse), "value[0].objectId");
+
+            HttpResponse roleAssignmentsResponse = sendRequestWithSubscription(GET, "/providers/Microsoft.Authorization/roleAssignments?$filter=principalId%20eq%20'" + principalServiceObjectId + "'&api-version=2018-01-01-preview", null);
+            final List<String> assignedRoleDefinitionIds = getListFromJSON(responseBody(roleAssignmentsResponse), "$.value.*.properties.roleDefinitionId");
+            final List<String> allowedActions = assignedRoleDefinitionIds.stream()
+                    .flatMap(assignedRoleDefinitionId -> {
+                        final HttpResponse roleDefinitionResponse = sendRequest(GET, AZURE_RESOURCE_PORTAL + assignedRoleDefinitionId.substring(1) + "?api-version=2015-07-01", null);
+                        return getListFromJSON(responseBody(roleDefinitionResponse), "$.properties.permissions[0].actions").stream();
+                    }).collect(Collectors.toList());
+            final List<String> notPermittedActions = REQUIRED_ACTIONS.stream()
+                    .filter(action -> !allowedActions.contains(action))
+                    .collect(toList());
+            if (!notPermittedActions.isEmpty()) {
+                Jemo.log(Level.WARNING, "[AZURE][validatePermissions] The following required actions are not allowed to service principal: [%s]", notPermittedActions);
+            }
+            return new ValidationResult(notPermittedActions);
+        } catch (Exception e) {
+            return new ValidationResult(asList("Failed to validate permissions, exception message: " + e.getMessage()));
+        }
+    }
+
+    @Override
+    public void updateCredentials(Map<String, String> credentials) throws IOException {
+        AZURE_CREDENTIALS = new AzureCredentials(credentials.get(TENANT_ID_VAR_NAME), credentials.get(CLIENT_ID_VAR_NAME), credentials.get(CLIENT_SECRET_VAR_NAME));
+
+        final Path azureDirectory = azureDirectory();
+        if (!Files.exists(azureDirectory)) {
+            Files.createDirectory(azureDirectory);
+        }
+
+        final String content = "[default]\n" +
+                "client=" + credentials.get(CLIENT_ID_VAR_NAME) + "\n" +
+                "key=" + credentials.get(CLIENT_SECRET_VAR_NAME) + "\n" +
+                "tenant=" + credentials.get(TENANT_ID_VAR_NAME);
+        final Path credentialsFile = azureDirectory.resolve("credentials");
+        Files.deleteIfExists(credentialsFile);
+        Files.write(credentialsFile, content.getBytes(), CREATE);
+    }
+
+    private static Path azureDirectory() {
+        final Path path = Util.pathUnderHomdeDir(".azure");
+        if (!Files.exists(path)) {
+            Util.B(path, Files::createDirectory);
+        }
+        return path;
+    }
+
+    @Override
+    public void setRegion(String regionCode) throws IOException {
+        REGION = regionCode;
+        final String content = "region = " + regionCode + "\n";
+        Files.write(azureDirectory().resolve("region"), content.getBytes(), CREATE);
+    }
+
+    @Override
+    public String readInstanceTag(String s) {
+        HttpResponse response = sendRequestWithSubscriptionAndResourceGroup(GET, "/providers/Microsoft.Compute/virtualMachines?api-version=2018-06-01", "");
+        if (response.getStatusLine().getStatusCode() == 200) {
+            Jemo.log(Level.FINE, "[AZURE][readInstanceTag] Could n [%s]", RESOURCE_GROUP);
+        } else {
+            throw new IllegalStateException("Could not list virtual machines for resource group [" + RESOURCE_GROUP + "] azure return value: " + responseBody(response));
+        }
+
+        try {
+            return getValueFromJSON(responseBody(response), "$.value[0].tags.PARAM_SET_NAME");
+        } catch (PathNotFoundException e) {
+            return null;
+        }
+    }
+
+    @Override
+    public List<RegionInfo> getRegions() {
+        return Arrays.asList(
+                new RegionInfo("australiaeast", "Australia East"),
+                new RegionInfo("canadacentral", "Canada Central"),
+                new RegionInfo("canadaeast", "Canada East"),
+                new RegionInfo("centralus", "Central US"),
+                new RegionInfo("eastus", "East US"),
+                new RegionInfo("eastus2", "East US 2"),
+                new RegionInfo("japaneast", "Japan East"),
+                new RegionInfo("northeurope", "North Europe"),
+                new RegionInfo("southeastasia", "South East Asia"),
+                new RegionInfo("southindia", "South India"),
+                new RegionInfo("uksouth", "UK South"),
+                new RegionInfo("ukwest", "UK West"),
+                new RegionInfo("westeurope", "West Europe"),
+                new RegionInfo("westus", "West US"),
+                new RegionInfo("westus2", "West US 2")
+        );
+    }
+
+    private static final Map<String, String> AKS_REGION_TO_LOG_WORKSPACE_REGION = new HashMap<String, String>() {{
+        put("australiaeast", "australiaeast");
+
+        put("canadacentral", "canadacentral");
+        put("canadaeast", "canadacentral");
+
+        put("centralus", "eastus");
+        put("eastus", "eastus");
+        put("eastus2", "eastus");
+        put("westus", "westus2");
+        put("westus2", "westus2");
+
+        put("japaneast", "japaneast");
+
+        put("northeurope", "northeurope");
+        put("westeurope", "westeurope");
+
+        put("southeastasia", "southeastasia");
+        put("southindia", "centralindia");
+
+        put("uksouth", "uksouth");
+        put("ukwest", "uksouth");
+    }};
+
+    @Override
+    public void resetLogConsoleHandler(Handler handler) {
+        // Do nothing.
+    }
+
+    @Override
+    public List<String> getExistingNetworks() {
+        final HttpResponse response = sendRequestWithSubscriptionAndResourceGroup(GET, "/providers/Microsoft.Network/virtualNetworks?api-version=2018-10-01", null);
+        final String responseBody = responseBody(response);
+        return getListFromJSON(responseBody, "$.value.*.name");
+    }
+
+    @Override
+    public List<String> getCustomerManagedPolicies() {
+        return null;
+    }
+
+    @Override
+    public ValidationResult validatePolicy(String s) {
+        return null;
+    }
+
+    @Override
+    public TerraformJobResponse install(String region, StringBuilder builder) throws IOException {
+        final Path terraformDirPath = createInstallTerraformTemplates(region);
+        final TerraformJob terraformJob = new TerraformJob(terraformDirPath.toString(), terraformDirPath.toString() + "/" + TFVARS_FILE_NAME).run(builder);
+        Files.copy(Paths.get("terraform.tfstate"), terraformDirPath.resolve("terraform.tfstate"));
+        return TerraformJobResponse.fromTerraformJob(terraformJob);
+    }
+
+    @Override
+    public Path createInstallTerraformTemplates(String region) throws IOException {
+        final String terraformDir = getTerraformInstallDir();
+        final Path terraformDirPath = Paths.get(terraformDir);
+        if (Files.exists(terraformDirPath)) {
+            // Delete files from previous runs.
+            Util.deleteDirectory(terraformDirPath.toFile());
+        }
+        Files.createDirectories(terraformDirPath);
+
+        if (region != null) {
+            final String logWorkspaceLocation = getLogWorkSpaceLocationBasedOnRegion(region);
+            final String varsFileContent = "terraform_user_client_id=\"" + AZURE_CREDENTIALS().clientId + "\"\n" +
+                    "terraform_user_client_secret=\"" + AZURE_CREDENTIALS().clientSecret + "\"\n" +
+                    "tenant_id=\"" + AZURE_CREDENTIALS().tenantId + "\"\n" +
+                    "subscription_id=\"" + getSubscriptionId(getAuthToken()) + "\"\n" +
+                    "region=\"" + region + "\"\n" +
+                    "log-workspace-location=\"" + logWorkspaceLocation + "\"\n";
+            Files.write(Paths.get(terraformDirPath.toString() + "/" + TFVARS_FILE_NAME), varsFileContent.getBytes(), CREATE);
+        }
+
+        copy(terraformDir, terraformDirPath, "install.tf", getClass());
+        copy(terraformDir, terraformDirPath, "variables.tf", getClass());
+        copy(terraformDir, terraformDirPath, "service_principal.tf", getClass());
+        copy(terraformDir, terraformDirPath, "output.tf", getClass());
+        return terraformDirPath;
+    }
+
+    /**
+     * Regions offering the AKS service do not necessarily offer the Log Analytics service (see https://azure.microsoft.com/en-gb/pricing/details/monitor/),
+     * therefore this method finds the closest possible region offering the Analytics service
+     *
+     * @return
+     */
+    @NotNull
+    private String getLogWorkSpaceLocationBasedOnRegion(String region) {
+        return AKS_REGION_TO_LOG_WORKSPACE_REGION.getOrDefault(region, "uksouth");
+    }
+
+    @Override
+    public ClusterParams getClusterParameters() {
+        return new ClusterParams(
+                asList(
+                        new ClusterParam("resource_group_name", RESOURCE_GROUP, "Name of the resource group"),
+                        new ClusterParam("jemo_user_client_id", AZURE_CREDENTIALS().clientId, "The Jemo user client id"),
+                        new ClusterParam("jemo_user_client_secret", AZURE_CREDENTIALS().clientSecret, "The Jemo user client secret"),
+                        new ClusterParam("key_vault_name", KEY_VAULT, "The Jemo key vault name"),
+                        new ClusterParam("aks_name", "jemo-cluster", "Name of the AKS cluster"),
+                        new ClusterParam("aks_dns_prefix", "jemo", "Optional DNS prefix to use with hosted Kubernetes API server FQDN"),
+                        new ClusterParam("aks_service_cidr", "10.0.0.0/16", "A CIDR notation IP range from which to assign service cluster IPs")
+                ),
+                asList(
+                        new ClusterParam("aks_agent_os_disk_size", "30", "Disk size (in GB) to provision for each of the agent pool nodes"),
+                        new ClusterParam("aks_agent_count", "2", "The number of agent nodes for the cluster"),
+                        new ClusterParam("aks_agent_vm_size", "Standard_D1_v2", "The size of the Virtual Machine"),
+                        new ClusterParam("aks_dns_service_ip", "10.0.0.10", "Containers DNS server IP address"),
+                        new ClusterParam("aks_docker_bridge_cidr", "172.17.0.1/16", "A CIDR notation IP for Docker bridge")
+                ),
+                asList(
+                        new ClusterParam("virtual_network_name", "jemo-virtual-network", "Virtual network name"),
+                        new ClusterParam("virtual_network_address_prefix", "15.0.0.0/8", "Containers DNS server IP address"),
+                        new ClusterParam("aks_subnet_name", "jemo-subnet", "AKS Subnet Name"),
+                        new ClusterParam("aks_subnet_address_prefix", "15.0.0.0/16", "Containers DNS server IP address"),
+                        new ClusterParam("app_gateway_subnet_name", "appgwsubnet", "The App Gateway Subnet Name"),
+                        new ClusterParam("app_gateway_subnet_address_prefix", "15.1.0.0/16", "Containers DNS server IP address"),
+                        new ClusterParam("app_gateway_name", "jemo-app-gateway", "Name of the Application Gateway"),
+                        new ClusterParam("app_gateway_sku", "Standard_v2", "Name of the Application Gateway SKU"),
+                        new ClusterParam("app_gateway_tier", "Standard_v2", "Tier of the Application Gateway SKU")
+                )
+        );
+    }
+
+    @Override
+    public void deleteKubernetesResources(StringBuilder builder) throws IOException {
+        runProcess(builder, new String[]{
+                "/bin/sh", "-c", "echo \"$(terraform output kube_config)\" > ~/.kube/config ; " +
+                "kubectl delete statefulset jemo ; " +
+                "kubectl delete svc jemo"
+        });
+    }
+
+    @Override
+    public String getCspLabel() {
+        return "azure";
+    }
+
+    @Override
+    public HashMap<String, String> getCredentialsFromTerraformResult(TerraformJob.TerraformResult terraformResult) {
+        return new HashMap<String, String>() {{
+            put(CLIENT_ID_VAR_NAME, terraformResult.getOutput("jemo_user_client_id"));
+            put(CLIENT_SECRET_VAR_NAME, terraformResult.getOutput("jemo_user_client_secret"));
+            put(TENANT_ID_VAR_NAME, AZURE_CREDENTIALS().tenantId);
+        }};
+    }
+
+    @Override
+    public ClusterCreationResponse createCluster(SetupParams setupParams, StringBuilder builder) throws IOException, ApiException {
+        setupParams.parameters().put("terraform_user_client_id", AZURE_CREDENTIALS().clientId);
+        setupParams.parameters().put("terraform_user_client_secret", AZURE_CREDENTIALS().clientSecret);
+        final Path terraformDirPath = createClusterTerraformTemplates(setupParams);
+        final TerraformJob terraformJob = new TerraformJob(terraformDirPath.toString(), terraformDirPath.toString() + "/" + TFVARS_FILE_NAME).run(builder);
+        final Path source = Paths.get("terraform.tfstate");
+        if (Files.exists(source)) {
+            Files.copy(source, terraformDirPath.resolve("terraform.tfstate"));
+        }
+
+        final String kubernetesDir = getTerraformClusterDir() + "/kubernetes/";
+        String[] command = new String[]{
+                "/bin/sh", "-c", "echo \"$(terraform output kube_config)\" > ~/.kube/config ; " +
+                "kubectl create -f https://raw.githubusercontent.com/Azure/kubernetes-keyvault-flexvol/master/deployment/kv-flexvol-installer.yaml ; " +
+                "kubectl create secret generic kvcreds --from-literal clientid='" + setupParams.parameters().get("jemo_user_client_id") + "' --from-literal clientsecret='" + setupParams.parameters().get("jemo_user_client_secret") + "' --type=azure/kv ; " +
+                "kubectl create -f " + kubernetesDir + "/jemo-statefulset.yaml ; " +
+                "kubectl create -f " + kubernetesDir + "/jemo-svc.yaml ; " +
+                "kubectl rollout status statefulset jemo"
+        };
+
+        runProcess(builder, command);
+
+        final String clusterUrl = terraformJob.getResult().getOutput("host");
+        final String token = terraformJob.getResult().getOutput("cluster_password");
+        final ApiClient apiClient = Config.fromToken(clusterUrl, token, false);
+        Configuration.setDefaultApiClient(apiClient);
+        final CoreV1Api coreV1Api = new CoreV1Api();
+        final String loadBalancerUrl = getLoadBalancerUrl(coreV1Api);
+        return ClusterCreationResponse.fromTerraformJob(terraformJob).setLoadBalancerUrl(loadBalancerUrl);
+    }
+
+    @Override
+    public Path createClusterTerraformTemplates(SetupParams setupParams) throws IOException {
+        setupParams.parameters().put("tenant_id", AZURE_CREDENTIALS().tenantId);
+        setupParams.parameters().put("subscription_id", subscriptionId);
+        final Path terraformDirPath = prepareTerraformFiles(setupParams);
+
+        final String subscriptionId = getSubscriptionId(getAuthToken());
+        prepareClusterCreation(setupParams);
+
+        final Path kubernetesDirPath = terraformDirPath.resolve("kubernetes");
+        Files.createDirectories(kubernetesDirPath);
+
+        final String kubernetesSourceDir = getTerraformClusterDir() + "kubernetes/";
+        copy(kubernetesSourceDir, kubernetesDirPath, "jemo-svc.yaml", getClass());
+
+        final String replicas = setupParams.parameters().get("aks_agent_count");
+        applyTemplate(kubernetesSourceDir, kubernetesDirPath, "jemo-statefulset.yaml", getClass(),
+                x -> x.replaceAll("_REPLICAS_", replicas)
+                        .replaceAll("_RESOURCE_GROUP_", RESOURCE_GROUP)
+                        .replaceAll("_EVENT_HUB_", EVENT_HUB_NAMESPACE)
+                        .replaceAll("_DB_", DATABASE_ACCOUNT)
+                        .replaceAll("_STORAGE_", STORAGE_ACCOUNT)
+                        .replaceAll("_LOG_WORKSPACE_", LOG_WORKSPACE)
+                        .replaceAll("_KEYVAULT_", KEY_VAULT)
+                        .replaceAll("_SUBSCRIPTION_ID_", subscriptionId)
+                        .replaceAll("_TENANT_ID_", AZURE_CREDENTIALS().tenantId)
+                        .replaceAll("_MSG_MODEL_", MSG_MODEL.name())
+        );
+
+        return terraformDirPath;
+    }
+
+    @Override
+    public AdminUserCreationInstructions getAdminUserCreationInstructions() {
+        return new AdminUserCreationInstructions(
+                "Jemo setup requires a service principal (the \"terraform-user\") with \"Owner\" role to run terraform with. This service principal is used to create " +
+                        "another service principal (the \"jemo-user\") with which Jemo pods run on the cluster worker nodes. " +
+                        "Therefore the \"terraform-user\" must have permissions to both \"Read and write all applications\" and \"Sign in and read user profile\" within the Windows Azure Active Directory API." +
+                        "Please install the azure cli, see https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest",
+                asList(
+                        "az ad sp create-for-rbac -n \"terraform-user\" --role Owner",
+                        "az ad app permission add --id http://terraform-user --api 00000002-0000-0000-c000-000000000000 --api-permissions 1cda74f2-2616-4834-b122-5cb1b07f8a59=Role 311a71cc-e848-46a1-bdf8-97ff7156d8e6=Scope",
+                        "Then, open the console and navigate to \"Azure Active Directory\" -> \"App registrations (Preview)\" -> \"terraform-user\"\n" +
+                                "-> \"API permissions\" -> \"Grant admin consent for Default Directory\" -> \"Yes\""
+                )
+        );
+    }
+
+    @Override
+    public List<InstallProperty> getInstallProperties() {
+        return asList(
+                new InstallProperty().name(PROP_RESOURCEGROUP).description("The resource group name"),
+                new InstallProperty().name(PROP_EVENTHUB).description("The event hub namespace name"),
+                new InstallProperty().name(PROP_DB).description("The database account name"),
+                new InstallProperty().name(PROP_STORAGE).description("The storage account name"),
+                new InstallProperty().name(PROP_LOG_WORKSPACE).description("The log workspace name"),
+                new InstallProperty().name(PROP_KEYVAULT).description("The key vault name"),
+                new InstallProperty().name(PROP_MSG_MODEL).description("The Messaging Model").range(asList(QUEUE.name(), EVENTHUB.name())).value(QUEUE.name())
+        );
+    }
+
+    @Override
+    public void setInstallProperties(Map<String, String> properties) {
+        RESOURCE_GROUP = properties.getOrDefault(PROP_RESOURCEGROUP, RESOURCE_GROUP);
+        EVENT_HUB_NAMESPACE = properties.getOrDefault(PROP_EVENTHUB, EVENT_HUB_NAMESPACE);
+        DATABASE_ACCOUNT = properties.getOrDefault(PROP_DB, DATABASE_ACCOUNT);
+        STORAGE_ACCOUNT = properties.getOrDefault(PROP_STORAGE, STORAGE_ACCOUNT);
+        LOG_WORKSPACE = properties.getOrDefault(PROP_LOG_WORKSPACE, LOG_WORKSPACE);
+        KEY_VAULT = properties.getOrDefault(PROP_KEYVAULT, KEY_VAULT);
+        MSG_MODEL = MESSAGE_MODEL.valueOf(properties.getOrDefault(PROP_MSG_MODEL, MSG_MODEL.name()));
+
+        Properties propertiesFromFile = readPropertiesFile();
+        if (propertiesFromFile == null) {
+            propertiesFromFile = new Properties();
+        }
+
+        propertiesFromFile.setProperty(PROP_RESOURCEGROUP, RESOURCE_GROUP);
+        propertiesFromFile.setProperty(PROP_EVENTHUB, EVENT_HUB_NAMESPACE);
+        propertiesFromFile.setProperty(PROP_DB, DATABASE_ACCOUNT);
+        propertiesFromFile.setProperty(PROP_STORAGE, STORAGE_ACCOUNT);
+        propertiesFromFile.setProperty(PROP_LOG_WORKSPACE, LOG_WORKSPACE);
+        propertiesFromFile.setProperty(PROP_KEYVAULT, KEY_VAULT);
+        propertiesFromFile.setProperty(PROP_MSG_MODEL, MSG_MODEL.name());
+        storePropertiesFile(propertiesFromFile);
+    }
+
+    @NotNull
+    private Path prepareTerraformFiles(SetupParams setupParams) throws IOException {
+        final String terraformDir = getTerraformClusterDir();
+        final Path terraformDirPath = Paths.get(terraformDir);
+        if (Files.exists(terraformDirPath)) {
+            Util.deleteDirectory(terraformDirPath.toFile());
+        }
+        Files.createDirectories(terraformDirPath);
+
+        final String existingNetworkName = setupParams.parameters().get("existing-network-name");
+        final String dependsOnVn = existingNetworkName == null ? "  depends_on           = [\"azurerm_virtual_network.vn\"]" : "";
+        final String dependsOnVnAppend = existingNetworkName == null ? ", \"azurerm_virtual_network.vn\"" : "";
+
+        copy(terraformDir, terraformDirPath, "README.txt", getClass());
+        copy(terraformDir, terraformDirPath, "cluster.tf", getClass());
+        copy(terraformDir, terraformDirPath, "identity.tf", getClass());
+        copy(terraformDir, terraformDirPath, "locals.tf", getClass());
+        applyTemplate(terraformDir, terraformDirPath, "main.tf", getClass(), x -> x.replaceAll("_DEPENDS_ON_VN_", dependsOnVn));
+        applyTemplate(terraformDir, terraformDirPath, "gateway.tf", getClass(), x -> x.replaceAll("_DEPENDS_ON_VN_", dependsOnVnAppend));
+        copy(terraformDir, terraformDirPath, "output.tf", getClass());
+        copy(terraformDir, terraformDirPath, "roles.tf", getClass());
+        copy(terraformDir, terraformDirPath, "variables.tf", getClass());
+
+        if (existingNetworkName == null) {
+            copy(terraformDir, terraformDirPath, "vn.tf", getClass());
+        } else {
+            setupParams.parameters().put("virtual_network_name", existingNetworkName);
+        }
+
+        setupParams.parameters().put("location", REGION);
+        setupParams.parameters().put("resource_group_name", RESOURCE_GROUP);
+
+        final String varsFileContent = setupParams.parameters().keySet().stream()
+                .filter(key -> !key.equals("existing-network-name") && !key.equals("policy-id") && !key.equals("containersPerParamSet"))
+                .map(key -> key + "=\"" + setupParams.parameters().get(key) + "\"")
+                .collect(Collectors.joining("\n"));
+
+        Files.write(Paths.get(terraformDirPath.toString() + "/" + TFVARS_FILE_NAME), varsFileContent.getBytes(), CREATE);
+        return terraformDirPath;
+    }
+
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/README.txt b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/README.txt
new file mode 100644
index 0000000..070e36d
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/README.txt
@@ -0,0 +1,26 @@
+1. Create the cluster:
+   terraform init
+   terraform plan
+   terraform apply
+
+2. Allow kubectl to login to the cluster:
+   echo "$(terraform output kube_config)" > ~/.kube/config
+
+3. Configure the cluster with pod identity and instantiate the jemo pods:
+
+   a. Create all the needed pods:
+      kubectl create -f https://raw.githubusercontent.com/Azure/kubernetes-keyvault-flexvol/master/deployment/kv-flexvol-installer.yaml
+      kubectl create secret generic kvcreds --from-literal clientid='JEMO_USER_CLIENT_ID' --from-literal clientsecret='JEMO_USER_CLIENT_SECRET' --type=azure/kv
+      kubectl create -f kubernetes/jemo-statefulset.yaml
+      kubectl create -f kubernetes/jemo-svc.yaml
+
+   b. Wait until all pods are ready:
+      kubectl rollout status statefulset jemo
+
+4. Find the IP where jemo runs:
+   kubectl get svc jemo -o=jsonpath='{.status.loadBalancer.ingress[0].ip}'
+
+To delete everything, run:
+   kubectl delete statefulset jemo
+   kubectl delete svc jemo
+   terraform destroy
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/cluster.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/cluster.tf
new file mode 100644
index 0000000..8aef9a7
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/cluster.tf
@@ -0,0 +1,38 @@
+resource "azurerm_kubernetes_cluster" "k8s" {
+  name       = "${var.aks_name}"
+  location   = "${data.azurerm_resource_group.rg.location}"
+  dns_prefix = "${var.aks_dns_prefix}"
+
+  resource_group_name = "${data.azurerm_resource_group.rg.name}"
+
+  addon_profile {
+    http_application_routing {
+      enabled = false
+    }
+  }
+
+  agent_pool_profile {
+    name            = "agentpool"
+    count           = "${var.aks_agent_count}"
+    vm_size         = "${var.aks_agent_vm_size}"
+    os_type         = "Linux"
+    os_disk_size_gb = "${var.aks_agent_os_disk_size}"
+    vnet_subnet_id  = "${data.azurerm_subnet.kubesubnet.id}"
+  }
+
+  service_principal {
+    client_id     = "${var.jemo_user_client_id}"
+    client_secret = "${var.jemo_user_client_secret}"
+  }
+
+  network_profile {
+    network_plugin     = "azure"
+    dns_service_ip     = "${var.aks_dns_service_ip}"
+    docker_bridge_cidr = "${var.aks_docker_bridge_cidr}"
+    service_cidr       = "${var.aks_service_cidr}"
+  }
+
+  tags       = "${var.tags}"
+
+  depends_on = ["azurerm_application_gateway.gateway"]
+}
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/gateway.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/gateway.tf
new file mode 100644
index 0000000..f3b5e23
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/gateway.tf
@@ -0,0 +1,73 @@
+resource "azurerm_public_ip" "public-ip" {
+  name                         = "${var.public_ip_name}"
+  location                     = "${data.azurerm_resource_group.rg.location}"
+  resource_group_name          = "${data.azurerm_resource_group.rg.name}"
+  public_ip_address_allocation = "static"
+  sku                          = "Standard"
+
+  tags = "${var.tags}"
+}
+
+resource "azurerm_application_gateway" "gateway" {
+  name                = "${var.app_gateway_name}"
+  resource_group_name = "${data.azurerm_resource_group.rg.name}"
+  location            = "${data.azurerm_resource_group.rg.location}"
+
+  sku {
+    name     = "${var.app_gateway_sku}"
+    tier     = "${var.app_gateway_tier}"
+    capacity = 2
+  }
+
+  gateway_ip_configuration {
+    name      = "appGatewayIpConfig"
+    subnet_id = "${data.azurerm_subnet.appgwsubnet.id}"
+  }
+
+  frontend_port {
+    name = "${local.frontend_port_name}"
+    port = 80
+  }
+
+  frontend_port {
+    name = "httpsPort"
+    port = 443
+  }
+
+  frontend_ip_configuration {
+    name                 = "${local.frontend_ip_configuration_name}"
+    public_ip_address_id = "${azurerm_public_ip.public-ip.id}"
+  }
+
+  backend_address_pool {
+    name = "${local.backend_address_pool_name}"
+  }
+
+  backend_http_settings {
+    name                  = "${local.http_setting_name}"
+    cookie_based_affinity = "Disabled"
+    port                  = 80
+    protocol              = "Http"
+    request_timeout       = 1
+  }
+
+  http_listener {
+    name                           = "${local.listener_name}"
+    frontend_ip_configuration_name = "${local.frontend_ip_configuration_name}"
+    frontend_port_name             = "${local.frontend_port_name}"
+    protocol                       = "Http"
+  }
+
+  request_routing_rule {
+    name                       = "${local.request_routing_rule_name}"
+    rule_type                  = "Basic"
+    http_listener_name         = "${local.listener_name}"
+    backend_address_pool_name  = "${local.backend_address_pool_name}"
+    backend_http_settings_name = "${local.http_setting_name}"
+  }
+
+  tags = "${var.tags}"
+
+  depends_on           = ["azurerm_public_ip.public-ip"_DEPENDS_ON_VN_]
+
+}
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/identity.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/identity.tf
new file mode 100644
index 0000000..23ddc0d
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/identity.tf
@@ -0,0 +1,9 @@
+# User Assigned Idntities
+resource "azurerm_user_assigned_identity" "jemo-identity" {
+  resource_group_name = "${data.azurerm_resource_group.rg.name}"
+  location            = "${data.azurerm_resource_group.rg.location}"
+
+  name = "jemo-identity"
+
+  tags = "${var.tags}"
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/kubernetes/jemo-statefulset.yaml b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/kubernetes/jemo-statefulset.yaml
new file mode 100644
index 0000000..1627089
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/kubernetes/jemo-statefulset.yaml
@@ -0,0 +1,62 @@
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+  name: jemo
+spec:
+  selector:
+    matchLabels:
+      app: jemo
+  updateStrategy:
+      type: RollingUpdate
+  serviceName: "jemo"
+  replicas: _REPLICAS_
+  template:
+    metadata:
+      labels:
+        app: jemo
+        aadpodidbinding: "jemo"
+    spec:
+      containers:
+        - name: jemo
+          image: eclipse/jemo:1.0.1
+          env:
+            - name: eclipse.jemo.cloud
+              value: "AZURE"
+            - name: eclipse.jemo.http.port
+              value: "80"
+            - name: eclipse.jemo.https.port
+              value: "443"
+            - name: eclipse.jemo.azure.resourcegroup
+              value: "_RESOURCE_GROUP_"
+            - name: eclipse.jemo.azure.eventhub
+              value: "_EVENT_HUB_"
+            - name: eclipse.jemo.azure.db
+              value: "_DB_"
+            - name: eclipse.jemo.azure.storage
+              value: "_STORAGE_"
+            - name: eclipse.jemo.azure.log-workspace
+              value: "_LOG_WORKSPACE_"
+            - name: eclipse.jemo.azure.keyvault
+              value: "_KEYVAULT_"
+            - name: eclipse.jemo.azure.msg.model
+              value: "_MSG_MODEL_"
+          ports:
+            - containerPort: 80
+          volumeMounts:
+            - name: kv-volume
+              mountPath: /kv
+              readOnly: true
+      volumes:
+        - name: kv-volume
+          flexVolume:
+            driver: "azure/kv"
+            secretRef:
+              name: kvcreds
+            options:
+              usepodidentity: "false"
+              keyvaultname: "_KEYVAULT_"
+              keyvaultobjectnames: "clientId;clientSecret;tenantId;encryptionKey"
+              keyvaultobjecttypes: "secret;secret;secret;secret"
+              resourcegroup: "_RESOURCE_GROUP_"
+              subscriptionid: "_SUBSCRIPTION_ID_"
+              tenantid: "_TENANT_ID_"
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/kubernetes/jemo-svc.yaml b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/kubernetes/jemo-svc.yaml
new file mode 100644
index 0000000..3e6db4e
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/kubernetes/jemo-svc.yaml
@@ -0,0 +1,17 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: jemo
+  labels:
+    app: jemo
+spec:
+  ports:
+  - name: http
+    port: 80
+    protocol: TCP
+  - name: https
+    port: 443
+    protocol: TCP
+  selector:
+    app: jemo
+  type: LoadBalancer
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/locals.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/locals.tf
new file mode 100644
index 0000000..9caf3f2
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/locals.tf
@@ -0,0 +1,10 @@
+# # Locals block for hardcoded names.
+locals {
+  backend_address_pool_name      = "${var.virtual_network_name}-beap"
+  frontend_port_name             = "${var.virtual_network_name}-feport"
+  frontend_ip_configuration_name = "${var.virtual_network_name}-feip"
+  http_setting_name              = "${var.virtual_network_name}-be-htst"
+  listener_name                  = "${var.virtual_network_name}-httplstn"
+  request_routing_rule_name      = "${var.virtual_network_name}-rqrt"
+  app_gateway_subnet_name = "${var.app_gateway_subnet_name}"
+}
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/main.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/main.tf
new file mode 100644
index 0000000..8d4bc20
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/main.tf
@@ -0,0 +1,40 @@
+provider "azurerm" {
+  version = "=1.21.0"
+
+  subscription_id = "${var.subscription_id}"
+  client_id       = "${var.terraform_user_client_id}"
+  client_secret   = "${var.terraform_user_client_secret}"
+  tenant_id       = "${var.tenant_id}"
+}
+
+data "azurerm_resource_group" "rg" {
+  name = "${var.resource_group_name}"
+}
+
+data "azuread_service_principal" "service-principal" {
+  application_id = "${var.jemo_user_client_id}"
+}
+
+data "azurerm_key_vault" key_vault {
+  name          = "${var.key_vault_name}"
+  resource_group_name = "${data.azurerm_resource_group.rg.name}"
+}
+
+data "azurerm_virtual_network" vn {
+  name = "${var.virtual_network_name}"
+  resource_group_name  = "${data.azurerm_resource_group.rg.name}"
+  _DEPENDS_ON_VN_
+}
+
+data "azurerm_subnet" "kubesubnet" {
+  name                 = "${data.azurerm_virtual_network.vn.subnets[0]}"
+  virtual_network_name = "${var.virtual_network_name}"
+  resource_group_name  = "${data.azurerm_resource_group.rg.name}"
+}
+
+data "azurerm_subnet" "appgwsubnet" {
+  name                 = "${data.azurerm_virtual_network.vn.subnets[1]}"
+  virtual_network_name = "${var.virtual_network_name}"
+  resource_group_name  = "${data.azurerm_resource_group.rg.name}"
+}
+
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/output.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/output.tf
new file mode 100644
index 0000000..a98c721
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/output.tf
@@ -0,0 +1,39 @@
+output "client_key" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config.0.client_key}"
+}
+
+output "client_certificate" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config.0.client_certificate}"
+}
+
+output "cluster_ca_certificate" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config.0.cluster_ca_certificate}"
+}
+
+output "cluster_username" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config.0.username}"
+}
+
+output "cluster_password" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config.0.password}"
+}
+
+output "kube_config" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config_raw}"
+}
+
+output "host" {
+  value = "${azurerm_kubernetes_cluster.k8s.kube_config.0.host}"
+}
+
+output "jemo-identity-id" {
+  value = "${azurerm_user_assigned_identity.jemo-identity.id}"
+}
+
+output "jemo-identity-client_id" {
+  value = "${azurerm_user_assigned_identity.jemo-identity.client_id}"
+}
+
+output "jemo-identity-principal_id" {
+  value = "${azurerm_user_assigned_identity.jemo-identity.principal_id}"
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/roles.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/roles.tf
new file mode 100644
index 0000000..dfcb6d5
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/roles.tf
@@ -0,0 +1,45 @@
+resource "azurerm_role_assignment" "ra1" {
+  scope                = "${data.azurerm_subnet.kubesubnet.id}"
+  role_definition_name = "Network Contributor"
+  principal_id         = "${data.azuread_service_principal.service-principal.id}"
+}
+
+resource "azurerm_role_assignment" "ra2" {
+  scope                = "${azurerm_user_assigned_identity.jemo-identity.id}"
+  role_definition_name = "Managed Identity Operator"
+  principal_id         = "${data.azuread_service_principal.service-principal.id}"
+  depends_on           = ["azurerm_user_assigned_identity.jemo-identity"]
+}
+
+resource "azurerm_role_assignment" "ra3" {
+  scope                = "${azurerm_application_gateway.gateway.id}"
+  role_definition_name = "Contributor"
+  principal_id         = "${azurerm_user_assigned_identity.jemo-identity.principal_id}"
+  depends_on           = ["azurerm_user_assigned_identity.jemo-identity", "azurerm_application_gateway.gateway"]
+}
+
+resource "azurerm_role_assignment" "ra4" {
+  scope                = "${data.azurerm_resource_group.rg.id}"
+  role_definition_name = "Reader"
+  principal_id         = "${azurerm_user_assigned_identity.jemo-identity.principal_id}"
+  depends_on           = ["azurerm_user_assigned_identity.jemo-identity", "azurerm_application_gateway.gateway"]
+}
+
+resource "azurerm_role_assignment" "ra5" {
+  scope                = "${data.azurerm_key_vault.key_vault.id}"
+  role_definition_name = "Contributor"
+  principal_id         = "${azurerm_user_assigned_identity.jemo-identity.principal_id}"
+  depends_on           = ["azurerm_user_assigned_identity.jemo-identity"]
+}
+
+resource "azurerm_key_vault_access_policy" "jemo_key_vault_access_policy" {
+  vault_name          = "${var.key_vault_name}"
+  resource_group_name = "${data.azurerm_resource_group.rg.name}"
+
+  tenant_id = "${var.tenant_id}"
+  object_id = "${azurerm_user_assigned_identity.jemo-identity.principal_id}"
+
+  secret_permissions = [
+    "get",
+  ]
+}
\ No newline at end of file
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/variables.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/variables.tf
new file mode 100644
index 0000000..5c49baf
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/variables.tf
@@ -0,0 +1,131 @@
+variable "resource_group_name" {
+  description = "Name of the resource group already created."
+  default = "jemorg"
+}
+
+variable "jemo_user_client_id" {
+  description = "Application ID/Client ID  of the service principal. Used by AKS to manage AKS related resources on Azure like vms, subnets."
+}
+
+variable "jemo_user_client_secret" {
+  description = "Secret of the service principal. Used by AKS to manage Azure."
+}
+
+variable "terraform_user_client_id" {
+  description = "Client id of service principal with admin priviledges"
+}
+
+variable "terraform_user_client_secret" {
+  description = "Client secret of service principal with admin priviledges"
+}
+
+variable "tenant_id" {
+  description = "The tenant id"
+}
+
+variable "subscription_id" {
+  description = "The subscription id"
+}
+
+variable "key_vault_name" {
+  description = "The Jemo key vault name."
+  default = "jemokv"
+}
+
+variable "virtual_network_name" {
+  description = "Virtual network name"
+  default     = "jemo-virtual-network"
+}
+
+variable "public_ip_name" {
+  description = "The public ip name"
+  default     = "jemo-public-ip"
+}
+
+variable "virtual_network_address_prefix" {
+  description = "Containers DNS server IP address."
+  default     = "15.0.0.0/8"
+}
+
+variable "aks_subnet_name" {
+  description = "AKS Subnet Name."
+  default     = "jemo-subnet"
+}
+
+variable "aks_subnet_address_prefix" {
+  description = "Containers DNS server IP address."
+  default     = "15.0.0.0/16"
+}
+
+variable "app_gateway_subnet_name" {
+  description = "The App Gateway Subnet Name."
+  default     = "appgwsubnet"
+}
+
+variable "app_gateway_subnet_address_prefix" {
+  description = "Containers DNS server IP address."
+  default     = "15.1.0.0/16"
+}
+
+variable "app_gateway_name" {
+  description = "Name of the Application Gateway."
+  default = "jemo-app-gateway"
+}
+
+variable "app_gateway_sku" {
+  description = "Name of the Application Gateway SKU."
+  default = "Standard_v2"
+}
+
+variable "app_gateway_tier" {
+  description = "Tier of the Application Gateway SKU."
+  default = "Standard_v2"
+}
+
+
+variable "aks_name" {
+  description = "Name of the AKS cluster."
+  default     = "jemo-cluster"
+}
+variable "aks_dns_prefix" {
+  description = "Optional DNS prefix to use with hosted Kubernetes API server FQDN."
+  default     = "jemo"
+}
+
+variable "aks_agent_os_disk_size" {
+  description = "Disk size (in GB) to provision for each of the agent pool nodes. This value ranges from 0 to 1023. Specifying 0 will apply the default disk size for that agentVMSize."
+  default     = 30
+}
+
+variable "aks_agent_count" {
+  description = "The number of agent nodes for the cluster."
+  default     = 2
+}
+
+variable "aks_agent_vm_size" {
+  description = "The size of the Virtual Machine."
+  default     = "Standard_D1_v2"
+}
+
+variable "aks_service_cidr" {
+  description = "A CIDR notation IP range from which to assign service cluster IPs."
+  default     = "10.0.0.0/16"
+}
+
+variable "aks_dns_service_ip" {
+  description = "Containers DNS server IP address."
+  default     = "10.0.0.10"
+}
+
+variable "aks_docker_bridge_cidr" {
+  description = "A CIDR notation IP for Docker bridge."
+  default     = "172.17.0.1/16"
+}
+
+variable "tags" {
+  type = "map"
+
+  default = {
+    source = "terraform"
+  }
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/cluster/vn.tf b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/vn.tf
new file mode 100644
index 0000000..c7fd83c
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/cluster/vn.tf
@@ -0,0 +1,19 @@
+resource "azurerm_virtual_network" "vn" {
+  name                = "${var.virtual_network_name}"
+  location            = "${data.azurerm_resource_group.rg.location}"
+  resource_group_name = "${data.azurerm_resource_group.rg.name}"
+  address_space       = ["${var.virtual_network_address_prefix}"]
+
+  subnet {
+    name           = "${var.aks_subnet_name}"
+    address_prefix = "${var.aks_subnet_address_prefix}"
+  }
+
+  subnet {
+    name           = "${var.app_gateway_subnet_name}"
+    address_prefix = "${var.app_gateway_subnet_address_prefix}"
+  }
+
+  tags = "${var.tags}"
+}
+
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/install/install.tf b/runtime-impl/azure-runtime/src/main/resources/azure/install/install.tf
new file mode 100644
index 0000000..8c2253e
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/install/install.tf
@@ -0,0 +1,171 @@
+provider "azurerm" {
+  version = "=1.21.0"
+
+  subscription_id = "${var.subscription_id}"
+  client_id       = "${var.terraform_user_client_id}"
+  client_secret   = "${var.terraform_user_client_secret}"
+  tenant_id       = "${var.tenant_id}"
+}
+
+provider "azuread" {
+  version = "=0.1.0"
+
+  subscription_id = "${var.subscription_id}"
+  client_id       = "${var.terraform_user_client_id}"
+  client_secret   = "${var.terraform_user_client_secret}"
+  tenant_id       = "${var.tenant_id}"
+}
+
+resource "azurerm_resource_group" "rg" {
+  name     = "${var.resource-group}"
+  location = "${var.region}"
+}
+
+resource "random_string" "random-suffix" {
+  length = 17
+  special = false
+  upper = false
+}
+
+resource "azurerm_storage_account" "sa" {
+  name                     = "jemosa${random_string.random-suffix.result}"
+  resource_group_name      = "${azurerm_resource_group.rg.name}"
+  location                 = "${var.region}"
+  account_tier             = "Standard"
+  account_replication_type = "GRS"
+
+  depends_on = ["azurerm_resource_group.rg"]
+}
+
+resource "azurerm_cosmosdb_account" "db" {
+  name                = "jemocdba-${random_string.random-suffix.result}"
+  location            = "${azurerm_resource_group.rg.location}"
+  resource_group_name = "${azurerm_resource_group.rg.name}"
+  offer_type          = "Standard"
+  kind                = "GlobalDocumentDB"
+
+  enable_automatic_failover = true
+
+  consistency_policy {
+    consistency_level       = "BoundedStaleness"
+    max_interval_in_seconds = 10
+    max_staleness_prefix    = 200
+  }
+
+  geo_location {
+    prefix            = "jemocdba-${random_string.random-suffix.result}-cid"
+    location          = "${azurerm_resource_group.rg.location}"
+    failover_priority = 0
+  }
+
+  depends_on = ["azurerm_resource_group.rg"]
+}
+
+resource "azurerm_eventhub_namespace" "ehn" {
+  name                = "jemoehn-${random_string.random-suffix.result}"
+  location            = "${azurerm_resource_group.rg.location}"
+  resource_group_name = "${azurerm_resource_group.rg.name}"
+  sku                 = "Standard"
+  capacity            = 1
+  kafka_enabled       = false
+
+  depends_on = ["azurerm_resource_group.rg"]
+}
+
+resource "azurerm_log_analytics_workspace" "log-workspace" {
+  name                = "jemo-log-workspace-${random_string.random-suffix.result}"
+  location            = "${var.log-workspace-location}"
+  resource_group_name = "${azurerm_resource_group.rg.name}"
+  sku                 = "${var.sku}"
+  retention_in_days   = 30
+
+  depends_on = ["azurerm_resource_group.rg"]
+}
+
+resource "azurerm_key_vault" "key-vault" {
+  name                        = "jemokv-${random_string.random-suffix.result}"
+  location                    = "${azurerm_resource_group.rg.location}"
+  resource_group_name         = "${azurerm_resource_group.rg.name}"
+  tenant_id                   = "${var.tenant_id}"
+
+  sku {
+    name = "standard"
+  }
+
+  access_policy {
+    tenant_id = "${var.tenant_id}"
+    object_id = "${azuread_service_principal.jemo-user.id}"
+
+    certificate_permissions = [
+    ]
+
+    key_permissions = [
+    ]
+
+    secret_permissions = [
+      "get",
+      "list",
+    ]
+  }
+
+  depends_on = ["azurerm_resource_group.rg", "azurerm_role_assignment.service_principal"]
+}
+
+data "azuread_service_principal" terraform-user {
+  application_id = "${var.terraform_user_client_id}"
+}
+
+resource "azurerm_key_vault_access_policy" "key-vault-access-policy" {
+  vault_name          = "${azurerm_key_vault.key-vault.name}"
+  resource_group_name = "${azurerm_key_vault.key-vault.resource_group_name}"
+
+  tenant_id = "${var.tenant_id}"
+  object_id = "${data.azuread_service_principal.terraform-user.id}"
+
+  certificate_permissions = [
+  ]
+
+  key_permissions = [
+  ]
+
+  secret_permissions = [
+    "get",
+    "list",
+    "set",
+    "delete",
+  ]
+
+  depends_on = ["azurerm_key_vault.key-vault"]
+}
+
+resource "azurerm_key_vault_secret" "clientId" {
+  name     = "clientId"
+  value    = "${azuread_service_principal.jemo-user.application_id}"
+  vault_uri = "${azurerm_key_vault.key-vault.vault_uri}"
+  depends_on = ["azurerm_key_vault_access_policy.key-vault-access-policy"]
+}
+
+resource "azurerm_key_vault_secret" "clientSecret" {
+  name     = "clientSecret"
+  value    = "${random_string.password.result}"
+  vault_uri = "${azurerm_key_vault.key-vault.vault_uri}"
+  depends_on = ["azurerm_key_vault_access_policy.key-vault-access-policy"]
+}
+
+resource "azurerm_key_vault_secret" "tenantId" {
+  name     = "tenantId"
+  value    = "${var.tenant_id}"
+  vault_uri = "${azurerm_key_vault.key-vault.vault_uri}"
+  depends_on = ["azurerm_key_vault_access_policy.key-vault-access-policy"]
+}
+
+resource "azurerm_key_vault_secret" "encryption-key" {
+  name     = "encryptionKey"
+  value    = "${random_string.encryption-key.result}"
+  vault_uri = "${azurerm_key_vault.key-vault.vault_uri}"
+  depends_on = ["azurerm_key_vault_access_policy.key-vault-access-policy"]
+}
+
+resource "random_string" "encryption-key" {
+  length = 16
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/install/output.tf b/runtime-impl/azure-runtime/src/main/resources/azure/install/output.tf
new file mode 100644
index 0000000..898be22
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/install/output.tf
@@ -0,0 +1,25 @@
+output "jemo_user_client_id" {
+  value = "${azuread_service_principal.jemo-user.application_id}"
+}
+output "jemo_user_client_secret" {
+  value = "${azuread_service_principal_password.principal_password.value}"
+}
+
+output "eclipse.jemo.azure.resourcegroup" {
+  value = "${azurerm_resource_group.rg.name}"
+}
+output "eclipse.jemo.azure.eventhub" {
+  value = "${azurerm_eventhub_namespace.ehn.name}"
+}
+output "eclipse.jemo.azure.db" {
+  value = "${azurerm_cosmosdb_account.db.name}"
+}
+output "eclipse.jemo.azure.storage" {
+  value = "${azurerm_storage_account.sa.name}"
+}
+output "eclipse.jemo.azure.log-workspace" {
+  value = "${azurerm_log_analytics_workspace.log-workspace.name}"
+}
+output "eclipse.jemo.azure.keyvault" {
+  value = "${azurerm_key_vault.key-vault.name}"
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/install/service_principal.tf b/runtime-impl/azure-runtime/src/main/resources/azure/install/service_principal.tf
new file mode 100644
index 0000000..7c8c50e
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/install/service_principal.tf
@@ -0,0 +1,60 @@
+data "azurerm_subscription" "current" {}
+
+resource "azuread_application" "app" {
+  name = "jemo-user"
+  available_to_other_tenants = false
+  oauth2_allow_implicit_flow = true
+}
+
+resource "random_string" "password" {
+  length = 32
+}
+
+resource "azuread_service_principal" "jemo-user" {
+  application_id = "${azuread_application.app.application_id}"
+}
+
+resource "azuread_service_principal_password" "principal_password" {
+  service_principal_id = "${azuread_service_principal.jemo-user.id}"
+  value                = "${random_string.password.result}"
+  end_date             = "${timeadd(timestamp(), "${2 * (24 * 365)}h")}"
+}
+
+resource "azurerm_role_assignment" "service_principal" {
+  scope                = "${data.azurerm_subscription.current.id}"
+  role_definition_id   = "${azurerm_role_definition.jemo-role.id}"
+  principal_id         = "${azuread_service_principal.jemo-user.id}"
+}
+
+
+resource "azurerm_role_definition" "jemo-role" {
+  name               = "jemo-role"
+  scope              = "${data.azurerm_subscription.current.id}"
+
+  permissions {
+    actions     = [
+      "Microsoft.Resources/subscriptions/read",
+      "Microsoft.Storage/storageAccounts/listKeys/action",
+      "Microsoft.EventHub/namespaces/eventhubs/write",
+      "Microsoft.EventHub/namespaces/AuthorizationRules/listKeys/action",
+      "Microsoft.DocumentDB/databaseAccounts/listKeys/action",
+      "Microsoft.DocumentDB/databaseAccounts/read",
+      "Microsoft.OperationalInsights/workspaces/read",
+      "Microsoft.Operationalinsights/workspaces/sharedkeys/read",
+      "Microsoft.Authorization/roleAssignments/read",
+      "Microsoft.Authorization/roleDefinitions/read",
+      "Microsoft.Network/virtualNetworks/read",
+      "Microsoft.ManagedIdentity/userAssignedIdentities/read",
+      "Microsoft.KeyVault/vaults/read",
+      "Microsoft.KeyVault/vaults/secrets/read",
+      "Microsoft.KeyVault/vaults/secrets/write"
+    ]
+
+    data_actions = [
+    ]
+  }
+
+  assignable_scopes = [
+    "${data.azurerm_subscription.current.id}",
+  ]
+}
diff --git a/runtime-impl/azure-runtime/src/main/resources/azure/install/variables.tf b/runtime-impl/azure-runtime/src/main/resources/azure/install/variables.tf
new file mode 100644
index 0000000..ef2dd25
--- /dev/null
+++ b/runtime-impl/azure-runtime/src/main/resources/azure/install/variables.tf
@@ -0,0 +1,31 @@
+variable "terraform_user_client_id" {
+  description = "Client id of the service principal with admin priviledges"
+}
+
+variable "terraform_user_client_secret" {
+  description = "Client secret of the service principal with admin priviledges"
+}
+
+variable "tenant_id" {
+  description = "The tenant id"
+}
+
+variable "subscription_id" {
+  description = "The subscription id"
+}
+
+variable "region" {
+  description = "A location that offers the AKS service (australiaeast, canadacentral, canadaeast, centralus, eastus, eastus2, japaneast, northeurope, southeastasia, southindia, uksouth, ukwest, westeurope, westus, westus2). For more read https://docs.microsoft.com/en-us/azure/aks/container-service-quotas#region-availability"
+}
+
+variable "resource-group" {
+  default = "jemorg"
+}
+
+variable "log-workspace-location" {
+  description = "A location that offers the Log Analytics service (australiaeast, australiasoutheast, australiacentral, canadacentral, eastus, southcentralus, westcentralus, westus2, northeurope, westeurope, southeastasia,   japaneast, centralindia, uksouth, francecentral, koreacentral). For more read https://azure.microsoft.com/en-us/pricing/details/monitor/"
+}
+
+variable "sku" {
+  default = "PerGB2018"
+}
\ No newline at end of file