Introduce InfluxDB datadase connector
InfluxDB connector allows for now to register Points and to collect records according to tags and datetime
diff --git a/platform/tools/influxdb-connector/pom.xml b/platform/tools/influxdb-connector/pom.xml
new file mode 100644
index 0000000..05b1801
--- /dev/null
+++ b/platform/tools/influxdb-connector/pom.xml
@@ -0,0 +1,84 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.eclipse.sensinact.gateway.tools</groupId>
+ <artifactId>platform.tools</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>influxdb-connector</artifactId>
+ <packaging>bundle</packaging>
+
+
+ <name>sensiNact IoT Gateway - Influxdb Connector</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.influxdb</groupId>
+ <artifactId>influxdb-java</artifactId>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Import-Package>
+ javax.annotation,
+ javax.annotation.meta,
+ javax.crypto,
+ javax.crypto.spec,
+ javax.net,
+ javax.net.ssl,
+ javax.security.auth.x500,
+ org.slf4j;version="[1.7,2)",
+ sun.misc
+ </Import-Package>
+ <Export-Package>
+ org.influxdb,
+ org.influxdb.annotation,
+ org.influxdb.dto,
+ okhttp3,
+ okio,
+ org.eclipse.sensinact.gateway.tools.connector.influxdb
+ </Export-Package>
+ <Private-Package>
+ org.influxdb.impl,
+ org.influxdb.msgpack,
+ org.influxdb.querybuilder,
+ org.influxdb.querybuilder.clauses,
+ com.squareup.moshi,
+ okhttp3.*,
+ org.json,
+ org.msgpack.core,
+ org.msgpack.value,
+ org.msgpack.value.*,
+ retrofit2,
+ retrofit2.*,
+ org.msgpack.core.buffer
+ </Private-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java
new file mode 100644
index 0000000..e40dade
--- /dev/null
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2020 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.tools.connector.influxdb;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Pong;
+
+/**
+ * InfluxDB database connector
+ */
+public class InfluxDbConnector {
+
+ private InfluxDB influxDB;
+
+ /**
+ * Constructor
+ *
+ * @throws IOException if an error occurs when connecting to the InfluxDB instance
+ */
+ public InfluxDbConnector() throws IOException {
+ this(new InfluxDbConnectorConfiguration.Builder().build());
+ }
+
+ /**
+ * Constructor
+ *
+ * @param configuration {@link InfluxDbConnectorConfiguration} allowing to configure the connection
+ * to the InfluxDB instance
+ *
+ * @throws IOException if an error occurs when connecting to the InfluxDB instance
+ */
+ public InfluxDbConnector(InfluxDbConnectorConfiguration configuration) throws IOException {
+ boolean connected = false;
+
+ if(configuration.getUserName()!=null && configuration.getPassword()!=null)
+ connected = this.connect(configuration.getUri(),configuration.getUserName(),configuration.getPassword());
+ else
+ connected = this.connect(configuration.getUri());
+ if(connected) {
+ influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
+ influxDB.enableBatch();
+ }
+ else
+ throw new IOException("Unable to connect");
+ }
+
+
+ /**
+ * Creates a connection and returns true if it has been done properly.
+ * Returns false otherwise
+ *
+ * @param uri
+ * @param database
+ *
+ * @return <ul>
+ * <li>true if the connection has been properly done</li>
+ * <li>false otherwise</li>
+ * </ul>
+ */
+ private boolean connect(String uri) {
+ influxDB = InfluxDBFactory.connect(uri);
+ if(!checkVersion())
+ return false;
+ return true;
+ }
+
+
+ /**
+ * Creates a connection and returns true if it has been done properly.
+ * Returns false otherwise
+ *
+ * @param uri
+ * @param username
+ * @param password
+ * @param database
+ *
+ * @return <ul>
+ * <li>true if the connection has been properly done</li>
+ * <li>false otherwise</li>
+ * </ul>
+ */
+ private boolean connect(String uri, String username, String password) {
+ influxDB = InfluxDBFactory.connect(uri, username, password);
+ if(!checkVersion())
+ return false;
+ return true;
+ }
+
+ private boolean checkVersion() {
+ Pong response = this.influxDB.ping();
+ if (response.getVersion().equalsIgnoreCase("unknown")) {
+ System.out.println("Error pinging server.");
+ influxDB.close();
+ influxDB = null;
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns true if the database whose name is passed as parameter exists in
+ * the InfluxDB instance this InfluxDbConnector is connected to ; otherwise
+ * returns false
+ *
+ * @param databaseName the name of the database
+ *
+ * @return
+ * <ul>
+ * <li>true if the database with the specified name exists</li>
+ * <li>false otherwise</li>
+ * </ul>
+ */
+ @SuppressWarnings("deprecation")
+ public boolean exists(String databaseName) {
+ return this.influxDB.databaseExists(databaseName);
+ }
+
+ /**
+ * Returns the database with the name passed as parameter if it exists, otherwise
+ * it is created and returned
+ *
+ * @param databaseName the name of the database
+ *
+ * @return the newly created database or the one previously existing with the
+ * specified name
+ */
+ @SuppressWarnings("deprecation")
+ public InfluxDbDatabase createIfNotExists(String databaseName) {
+ if(!exists(databaseName))
+ this.influxDB.createDatabase(databaseName);
+ return new InfluxDbDatabase(this.influxDB,databaseName);
+ }
+
+ /**
+ * Returns the database with the name passed as parameter if it exists, otherwise
+ * returns null
+ *
+ * @param databaseName the name of the database
+ *
+ * @return
+ * <ul>
+ * <li>the database with the specified name if it exists</li>
+ * <li>null otherwise</li>
+ * </ul>
+ */
+ public InfluxDbDatabase getIfExists(String databaseName) {
+ if(!exists(databaseName))
+ return null;
+ return new InfluxDbDatabase(this.influxDB,databaseName);
+ }
+
+ /**
+ * Close the connection this InfluxDbConnector initiated
+ * with an InfluxDB instance
+ */
+ public void close() {
+ this.influxDB.close();
+ }
+
+
+ public static void main(String[] args) {
+ try {
+ InfluxDbConnector connector = new InfluxDbConnector();
+ InfluxDbDatabase database = connector.createIfNotExists("sensinact");
+ database.add("test", new java.util.Hashtable<String,String>(){{
+ this.put("tag1","t1");
+ this.put("tag2","t2");
+ this.put("tag3","t3");
+ }}, 25.21d);
+ database.add("test", new java.util.Hashtable<String,String>(){{
+ this.put("tag1","t0");
+ this.put("tag2","t2");
+ this.put("tag3","t0");
+ }}, 25.0d);
+ database.add("test", new java.util.Hashtable<String,String>(){{
+ this.put("tag1","t0");
+ this.put("tag2","t2");
+ this.put("tag3","t3");
+ }}, 24.2d);
+ database.add("test", new java.util.Hashtable<String,String>(){{
+ this.put("tag1","t1");
+ this.put("tag2","t0");
+ this.put("tag3","t0");
+ }}, 20.0d);
+ System.out.println(database.get(Measure.class, "test", Arrays.asList("value"), new java.util.Hashtable<String,String>(){{
+ this.put("tag2","t2");}}));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java
new file mode 100644
index 0000000..42f7e4b
--- /dev/null
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright (c) 2020 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.tools.connector.influxdb;
+
+/**
+ * InfluxDbConnectorConfiguration helps at configuring a InfluxDb client
+ * to be instantiated
+ */
+public class InfluxDbConnectorConfiguration {
+
+ /**
+ * {@link InfluxDbConnectorConfiguration} Builder
+ */
+ public static class Builder {
+
+ String username;
+ String password;
+ String scheme;
+ String host;
+ int port;
+ String path;
+
+ /**
+ * Constructor
+ */
+ public Builder(){
+ }
+
+ /**
+ * Defines the String name of the user to be connected
+ *
+ * @param username the name of the user
+ * @return this Builder
+ */
+ public Builder withUsername(String username) {
+ if(username != null) {
+ String user = username.trim();
+ if(user.length()>0)
+ this.username = user;
+ }
+ return this;
+ }
+
+ /**
+ * Defines the String password of the user to be connected
+ *
+ * @param password the password of the user
+ * @return this Builder
+ */
+ public Builder withPassword(String password) {
+ if(password != null) {
+ String pass = password.trim();
+ if(pass.length()>0)
+ this.password = pass;
+ }
+ return this;
+ }
+
+ /**
+ * Defines the String host of the InfluxDB instance to connect to
+ *
+ * @param host the String host of the InfluxDB instance
+ * @return this Builder
+ */
+ public Builder withHost(String host) {
+ if(host!=null) {
+ String h = host.trim();
+ if(h.length() > 0)
+ this.host = h;
+ }
+ return this;
+ }
+
+ /**
+ * Defines the integer port number of the InfluxDB instance to connect to
+ *
+ * @param port the integer port number of the InfluxDB instance
+ * @return this Builder
+ */
+ public Builder withPort(int port) {
+ if(port > 0)
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Defines the String path of the URI of the database to connect to
+ *
+ * @param path the String path of the URI of the database
+ * @return this Builder
+ */
+ public Builder withPath(String path) {
+ if(path!=null) {
+ String p = path.trim();
+ if(p.length() > 0)
+ this.path = p;
+ }
+ return this;
+ }
+
+ private String getPath() {
+ if(this.path == null)
+ return DEFAULT_PATH;
+ return this.path;
+ }
+
+ /**
+ * Defines the String scheme of the URI to connect to the database
+ *
+ * @param path the String scheme of the URI of the database
+ * @return this Builder
+ */
+ public Builder withScheme(String scheme) {
+ if(scheme!=null) {
+ String s = scheme.trim();
+ if(s.length() > 0)
+ this.scheme = s;
+ }
+ return this;
+ }
+
+ /**
+ * Builds the {@link InfluxDbConnectorConfiguration}
+ *
+ * @return the {@link InfluxDbConnectorConfiguration} built by
+ * this Builder
+ */
+ public InfluxDbConnectorConfiguration build() {
+ final InfluxDbConnectorConfiguration config;
+ if(host == null && port==0 && (username==null || password==null)) {
+ config = new InfluxDbConnectorConfiguration();
+ config.setPath(getPath());
+ } else {
+ if(scheme == null)
+ this.scheme = DEFAULT_SCHEME;
+ if(host == null)
+ this.host = DEFAULT_HOST;
+ if(port == 0)
+ this.port = DEFAULT_PORT;
+ if(this.username!=null && this.password!=null)
+ config = new InfluxDbConnectorConfiguration(username, password, scheme, host, port, getPath());
+ else
+ config = new InfluxDbConnectorConfiguration(scheme, host, port, getPath());
+ }
+ return config;
+ }
+ }
+
+ public static final String DEFAULT_SCHEME = "http";
+ public static final String DEFAULT_HOST = "localhost";
+ public static final String DEFAULT_PATH = "";
+ public static final int DEFAULT_PORT = 8086;
+
+ private String username;
+ private String password;
+
+ private String scheme;
+ private String host;
+ private int port;
+ private String path;
+
+ /**
+ * Constructor
+ */
+ private InfluxDbConnectorConfiguration(){
+ this(DEFAULT_SCHEME, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_PATH);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param scheme the String host of the InfluxDB instance to connect to
+ * @param host the String host of the InfluxDB instance to connect to
+ * @param port the integer port number of the InfluxDB instance to connect to
+ * @param path the String path of the URI to connect to the InfluxDB instance
+ */
+ private InfluxDbConnectorConfiguration(String scheme, String host, int port, String path){
+ this.scheme = scheme;
+ this.port = port;
+ this.host = host;
+ this.path = path;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param username the String name of the user to be connected
+ * @param password the String password of the user to be connected
+ * @param scheme the String scheme of the URI to connect to the InfluxDB instance
+ * @param host the String host of the InfluxDB instance to connect to
+ * @param port the integer port number of the InfluxDB instance to connect to
+ * @param path the String path of the URI to connect to the InfluxDB instance
+ */
+ private InfluxDbConnectorConfiguration(String username, String password, String scheme, String host, int port, String path){
+ this(scheme, host, port, path);
+ this.username = username;
+ this.password = password;
+ }
+
+ /**
+ * Defines the String path of the URI to connect to the database
+ *
+ * @param path the String path of the URI
+ */
+ private void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * Returns the String name of the user to be connected to
+ * the database
+ *
+ * @return the String name of the user
+ */
+ public String getUserName() {
+ return this.username;
+ }
+
+ /**
+ * Returns the String password of the user to be connected
+ * to the database
+ *
+ * @return the String password of the user
+ */
+ public String getPassword() {
+ return this.password;
+ }
+
+ /**
+ * Returns the String URI to connect to the InluxDB instance
+ *
+ * @return the connection String URI
+ */
+ public String getUri() {
+ String port = null;
+ if(this.port == 0)
+ port="";
+ else
+ port=":".concat(String.valueOf(this.port));
+
+ String uri = String.format("%s://%s%s%s",scheme,host,port,path);
+ return uri;
+ }
+
+}
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
new file mode 100644
index 0000000..e3fa1f1
--- /dev/null
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
@@ -0,0 +1,520 @@
+/*
+ * Copyright (c) 2020 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.tools.connector.influxdb;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Point.Builder;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.influxdb.impl.InfluxDBResultMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A InfluxDbDatabase provide CRUD methods to an InfluxDB database
+ */
+public class InfluxDbDatabase {
+
+ private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMAT = new ThreadLocal<SimpleDateFormat>() {
+
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ }
+ };
+
+ private static final Logger LOG = LoggerFactory.getLogger(InfluxDbDatabase.class);
+
+ private InfluxDB influxDB;
+ private String database;
+
+ /**
+ * Constructor
+ *
+ * @param influxDB the {@link InfluxDB} wrapped by the InfluxDbDatabase to be instantiated
+ */
+ public InfluxDbDatabase(InfluxDB influxDB, String database) {
+ this.influxDB = influxDB;
+ this.database = database;
+
+ influxDB.setDatabase(database);
+ influxDB.setRetentionPolicy("autogen");
+ }
+
+ /**
+ * Adds the Object value passed as parameter to the InfluxDB measurement whose name is also
+ * passed as parameter, and tagged using the tags dictionary argument
+ *
+ * @param measurement the name of the InfluxB measurement to which adding the value
+ * @param tags the dictionary of tags to be used to tag the value to be added
+ * @param value the Object value to add
+ */
+ public void add(String measurement, Dictionary<String,String> tags, Object value) {
+ this.add(measurement, tags, value, System.currentTimeMillis());
+ }
+
+ /**
+ * Adds the Object value passed as parameter to the InfluxDB measurement whose name is also
+ * passed as parameter, and tagged using the tags dictionary argument, with the defined timestamp
+ *
+ * @param measurement the name of the InfluxB measurement to which adding the value
+ * @param tags the dictionary of tags to be used to tag the value to be added
+ * @param value the Object value to add
+ * @param timestamp the millisecond unix epoch timestamp
+ */
+ public void add(String measurement, Dictionary<String,String> tags, Object value, long timestamp) {
+ Point point = null;
+ Builder builder = Point.measurement(measurement);
+ if(tags != null && !tags.isEmpty()) {
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.tag(key, String.valueOf(tags.get(key)));
+ }
+ }
+ builder.time(timestamp, TimeUnit.MILLISECONDS);
+ if(value.getClass().isPrimitive()) {
+ switch(value.getClass().getName()) {
+ case "byte":
+ point = builder.addField("value", Byte.valueOf((byte)value)).build();
+ break;
+ case "short":
+ point = builder.addField("value", Short.valueOf((short)value)).build();
+ break;
+ case "int":
+ point = builder.addField("value", Integer.valueOf((int)value)).build();
+ break;
+ case "long":
+ point = builder.addField("value", Long.valueOf((long)value)).build();
+ break;
+ case "float":
+ point = builder.addField("value", Float.valueOf((float)value)).build();
+ break;
+ case "double":
+ point = builder.addField("value", Double.valueOf((double)value)).build();
+ break;
+ case "char":
+ point = builder.addField("value", new String(new char[] {(char)value})).build();
+ break;
+ case "boolean":
+ point = builder.addField("value", Boolean.valueOf((boolean)value)).build();
+ break;
+ }
+ } else if (value instanceof String) {
+ point = builder.addField("value", (String) value).build();
+ } else if (value instanceof Number) {
+ switch(value.getClass().getName()) {
+ case "java.lang.Byte":
+ point = builder.addField("value", (Byte) value).build();
+ break;
+ case "java.lang.Short":
+ point = builder.addField("value", (Short) value).build();
+ break;
+ case "java.lang.Integer":
+ point = builder.addField("value", (Integer) value).build();
+ break;
+ case "java.lang.Long":
+ point = builder.addField("value", (Long) value).build();
+ break;
+ case "java.lang.Float":
+ point = builder.addField("value", (Float) value).build();
+ break;
+ case "java.lang.Double":
+ point = builder.addField("value", (Double) value).build();
+ break;
+ case "java.lang.Character":
+ point = builder.addField("value", new String(new char[] {((Character)value).charValue()})).build();
+ break;
+ case "java.lang.Boolean":
+ point = builder.addField("value", new String(new char[] {(char)value})).build();
+ break;
+ }
+ } else if(value instanceof Enum){
+ point = builder.addField("value", ((Enum)value).name()).build();
+ } else {
+ point = builder.addField("value", String.valueOf(value)).build();
+ }
+ influxDB.write(point);
+ influxDB.flush();
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ *
+ * @return the JSON formated String result of the research
+ */
+ public String get(String measurement, List<String> columns, Dictionary<String,String> tags) {
+ String select = null;
+ if(columns==null || columns.isEmpty())
+ select = "* ";
+ else
+ select = columns.stream().collect(StringBuilder::new,(b,s)-> {
+ b.append(s);
+ b.append(",");
+ }, (h,t)->h.append(t.toString())).toString();
+ select =select.substring(0,select.length()-1);
+
+ String from = String.format(" FROM %s " , measurement);
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ where = builder.toString();
+ } else
+ where="";
+
+ Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
+ QueryResult result = this.influxDB.query(query);
+ return result.toString();
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
+ * argument and starting from the specified String formated start datetime.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param start the String formated date defining the chronological beginning of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ public String get(String measurement, List<String> columns, Dictionary<String,String> tags, String start) {
+ if(start == null)
+ return get(measurement, columns, tags);
+ String select = null;
+ if(columns==null || columns.isEmpty())
+ select = "* ";
+ else
+ select = columns.stream().collect(StringBuilder::new,(b,s)-> {
+ b.append(s);
+ b.append(",");
+ }, (h,t)->h.append(t.toString())).toString();
+ select =select.substring(0,select.length()-1);
+
+ String from = String.format(" FROM %s " , measurement);
+ long startTime = -1;
+
+ try {
+ SimpleDateFormat format = THREAD_LOCAL_FORMAT.get();
+ startTime = format.parse(start).getTime();
+ }catch(ParseException e) {
+ LOG.error(e.getMessage(),e);
+ }finally {
+ THREAD_LOCAL_FORMAT.remove();
+ }
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ if(startTime > 0)
+ builder.append(String.format(" AND time > %s", startTime));
+ where = builder.toString();
+ } else {
+ if(startTime > 0)
+ where = String.format(" WHERE time > %s", startTime);
+ else
+ where="";
+ }
+ Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
+ QueryResult result = this.influxDB.query(query);
+ return result.toString();
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
+ * argument, between both String formated start and end datetimes.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param start the String formated date defining the chronological beginning of records in which to search
+ * @param end the String formated date defining the chronological ending of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ public String get(String measurement, List<String> columns, Dictionary<String,String> tags, String start, String end) {
+ if(end == null)
+ return get(measurement, columns, tags, start);
+ String select = null;
+ if(columns==null || columns.isEmpty())
+ select = "* ";
+ else
+ select = columns.stream().collect(StringBuilder::new,(b,s)-> {
+ b.append(s);
+ b.append(",");
+ }, (h,t)->h.append(t.toString())).toString();
+ select =select.substring(0,select.length()-1);
+
+ String from = String.format(" FROM %s " , measurement);
+
+ long startTime = -1;
+ long endTime = -1;
+ try {
+ SimpleDateFormat format = THREAD_LOCAL_FORMAT.get();
+ startTime = format.parse(start).getTime();
+ endTime = format.parse(end).getTime();
+ } catch(ParseException e) {
+ LOG.error(e.getMessage(),e);
+ } finally {
+ THREAD_LOCAL_FORMAT.remove();
+ }
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ where = builder.toString();
+ if(startTime > 0 && endTime > 0)
+ builder.append(String.format(" AND time > %s AND time < %s", startTime, endTime));
+ } else {
+ if(startTime > 0 && endTime > 0)
+ where = String.format(" WHERE time > %s AND time < %s", startTime, endTime);
+ else
+ where="";
+ }
+ Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
+ QueryResult result = this.influxDB.query(query);
+ return result.toString();
+ }
+
+ /**
+ * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
+ * mapped to the specified result type, and compliant with tags dictionary also passed as parameter
+ *
+ * @param <T> result unit type
+ *
+ * @param resultType the type to which found points (records) are mapped
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ *
+ * @return the List of resultType typed instances resulting of the search
+ */
+ public <T> List<T> get(Class<T> resultType, String measurement, List<String> columns, Dictionary<String,String> tags) {
+ String select = null;
+ if(columns.isEmpty())
+ select = "* ";
+ else
+ select = columns.stream().collect(StringBuilder::new,(b,s)-> {
+ b.append(s);
+ b.append(",");
+ }, (h,t)->h.append(t.toString())).toString();
+ select =select.substring(0,select.length()-1);
+
+ String from = String.format(" FROM %s " , measurement);
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ where = builder.toString();
+ } else
+ where="";
+
+ Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
+ QueryResult result = this.influxDB.query(query);
+ InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
+ return resultMapper.toPOJO(result, resultType);
+ }
+
+ /**
+ * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
+ * mapped to the specified result type, compliant with tags dictionary also passed as parameter,
+ * and starting from the specified String formated start datetime..
+ *
+ * @param <T> result unit type
+ *
+ * @param resultType the type to which found points (records) are mapped
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param start the String formated date defining the chronological beginning of records in which to search
+ *
+ * @return the List of resultType typed instances resulting of the search
+ */
+ public <T> List<T> get(Class<T> resultType, String measurement, List<String> columns, Dictionary<String,String> tags, String start) {
+ if(start == null)
+ return get(resultType, measurement, columns, tags);
+ String select = null;
+ if(columns.isEmpty())
+ select = "* ";
+ else
+ select = columns.stream().collect(StringBuilder::new,(b,s)-> {
+ b.append(s);
+ b.append(",");
+ }, (h,t)->h.append(t.toString())).toString();
+ select =select.substring(0,select.length()-1);
+
+ String from = String.format(" FROM %s " , measurement);
+ long startTime = -1;
+
+ try {
+ SimpleDateFormat format = THREAD_LOCAL_FORMAT.get();
+ startTime = format.parse(start).getTime();
+ }catch(ParseException e) {
+ LOG.error(e.getMessage(),e);
+ } finally {
+ THREAD_LOCAL_FORMAT.remove();
+ }
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ if(startTime > 0)
+ builder.append(String.format(" AND time > %s", startTime));
+ where = builder.toString();
+ } else {
+ if(startTime > 0)
+ where = String.format(" WHERE time > %s", startTime);
+ else
+ where="";
+ }
+ Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
+ QueryResult result = this.influxDB.query(query);
+ InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
+ return resultMapper.toPOJO(result, resultType);
+ }
+
+ /**
+ * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
+ * mapped to the specified result type, compliant with tags dictionary also passed as parameter,
+ * and between both String formated start and end datetimes.
+ *
+ * @param <T> result unit type
+ *
+ * @param resultType the type to which found points (records) are mapped
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param start the String formated date defining the chronological beginning of records in which to search
+ * @param end the String formated date defining the chronological ending of records in which to search
+ *
+ * @return the List of resultType typed instances resulting of the search
+ */
+ public <T> List<T> get(Class<T> resultType, String measurement, List<String> columns, Dictionary<String,String> tags, String start, String end) {
+ if(end == null)
+ return get(resultType, measurement, columns, tags, start);
+ String select = null;
+ if(columns.isEmpty())
+ select = "* ";
+ else
+ select = columns.stream().collect(StringBuilder::new,(b,s)-> {
+ b.append(s);
+ b.append(",");
+ }, (h,t)->h.append(t.toString())).toString();
+ select =select.substring(0,select.length()-1);
+
+ String from = String.format(" FROM %s " , measurement);
+
+ long startTime = -1;
+ long endTime = -1;
+ try {
+ SimpleDateFormat format = THREAD_LOCAL_FORMAT.get();
+ startTime = format.parse(start).getTime();
+ endTime = format.parse(end).getTime();
+ }catch(ParseException e) {
+ LOG.error(e.getMessage(),e);
+ } finally {
+ THREAD_LOCAL_FORMAT.remove();
+ }
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ if(startTime > 0 && endTime > 0)
+ builder.append(String.format(" AND time > %s AND time < %s", startTime, endTime));
+ } else {
+ if(startTime > 0 && endTime > 0)
+ where = String.format(" WHERE time > %s AND time < %s", startTime, endTime);
+ else
+ where="";
+ }
+ Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
+ QueryResult result = this.influxDB.query(query);
+ InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
+ return resultMapper.toPOJO(result, resultType);
+ }
+}
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/Measure.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/Measure.java
new file mode 100644
index 0000000..9fc3e6d
--- /dev/null
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/Measure.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2020 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.tools.connector.influxdb;
+
+import org.influxdb.annotation.Column;
+import org.influxdb.annotation.Measurement;
+
+@Measurement(name = "test")
+public class Measure {
+
+ @Column(name="path")
+ private String path;
+
+ @Column(name="value")
+ private String value;
+
+ @Column(name="time")
+ private String time;
+
+ public Measure() {}
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public void setTime(String time) {
+ this.time = time;
+ }
+
+ public String getTime() {
+ return this.time;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return this.path;
+ }
+
+ @Override
+ public String toString() {
+ return "{'path':'"+path+"','value':'"+value+"','time':'"+time+"'}";
+ }
+
+}
\ No newline at end of file