Handle influxdb fields
Handle time aggregation requests
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
index 5ccacff..041ccb4 100644
--- a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
@@ -34,6 +34,19 @@
*/
public class InfluxDbDatabase {
+ private static final long MILLISECOND = 1;
+
+ private static final long SECOND = MILLISECOND * 1000;
+
+ private static final long MINUTE = SECOND * 60;
+
+ private static final long HOUR = MINUTE * 60;
+
+ private static final long DAY = HOUR * 24;
+
+ private static final long WEEK = DAY * 7;
+
+
private interface SimpleDateFormatProvider {
Iterator<SimpleDateFormat> iterator();
}
@@ -98,7 +111,20 @@
* @param value the Object value to add
*/
public void add(String measurement, Dictionary<String,String> tags, Object value) {
- this.add(measurement, tags, value, System.currentTimeMillis());
+ this.add(measurement, tags, null, value, System.currentTimeMillis());
+ }
+
+ /**
+ * Adds the Object value passed as parameter to the InfluxDB measurement whose name is also
+ * passed as parameter, and tagged using the tags dictionary argument
+ *
+ * @param measurement the name of the InfluxB measurement to which adding the value
+ * @param tags the dictionary of tags to be used to tag the value to be added
+ * @param fields the dictionary of fields to be stored with the value
+ * @param value the Object value to add
+ */
+ public void add(String measurement, Dictionary<String,String> tags, Dictionary<String,Object> fields, Object value) {
+ this.add(measurement, tags, fields, value, System.currentTimeMillis());
}
/**
@@ -107,83 +133,185 @@
*
* @param measurement the name of the InfluxB measurement to which adding the value
* @param tags the dictionary of tags to be used to tag the value to be added
+ * @param fields the dictionary of fields to be stored with the value
* @param value the Object value to add
* @param timestamp the millisecond unix epoch timestamp
*/
- public void add(String measurement, Dictionary<String,String> tags, Object value, long timestamp) {
+ public void add(String measurement, Dictionary<String,String> tags, Dictionary<String,Object> fields, Object value, long timestamp) {
Point point = null;
Builder builder = Point.measurement(measurement);
if(tags != null && !tags.isEmpty()) {
for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
String key = it.next();
- builder.tag(key, String.valueOf(tags.get(key)));
+ builder.tag(key, tags.get(key));
}
}
builder.time(timestamp, TimeUnit.MILLISECONDS);
+
+ if(fields != null && !fields.isEmpty()) {
+ for(Iterator<String> it = Collections.list(fields.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ addField(builder, key, fields.get(key));
+ }
+ }
+ addField(builder,"value",value);
+ point = builder.build();
+ influxDB.write(point);
+ influxDB.flush();
+ }
+
+ /*
+ * (non-javadoc)
+ * Add a field with the specified name to the Point Builder passed as parameter after the cast of the value
+ */
+ private void addField(Builder builder, String key, Object value) {
+ if(value == null)
+ return;
if(value.getClass().isPrimitive()) {
switch(value.getClass().getName()) {
case "byte":
- point = builder.addField("value", Byte.valueOf((byte)value)).build();
+ builder.addField(key, Byte.valueOf((byte)value));
break;
case "short":
- point = builder.addField("value", Short.valueOf((short)value)).build();
+ builder.addField(key, Short.valueOf((short)value));
break;
case "int":
- point = builder.addField("value", Integer.valueOf((int)value)).build();
+ builder.addField(key, Integer.valueOf((int)value));
break;
case "long":
- point = builder.addField("value", Long.valueOf((long)value)).build();
+ builder.addField(key, Long.valueOf((long)value));
break;
case "float":
- point = builder.addField("value", Float.valueOf((float)value)).build();
+ builder.addField(key, Float.valueOf((float)value));
break;
case "double":
- point = builder.addField("value", Double.valueOf((double)value)).build();
+ builder.addField(key, Double.valueOf((double)value));
break;
case "char":
- point = builder.addField("value", new String(new char[] {(char)value})).build();
+ builder.addField(key, new String(new char[] {(char)value}));
break;
case "boolean":
- point = builder.addField("value", Boolean.valueOf((boolean)value)).build();
+ builder.addField(key, Boolean.valueOf((boolean)value));
break;
}
} else if (value instanceof String) {
- point = builder.addField("value", (String) value).build();
+ builder.addField(key, (String) value);
} else if (value instanceof Number) {
switch(value.getClass().getName()) {
case "java.lang.Byte":
- point = builder.addField("value", (Byte) value).build();
+ builder.addField(key, (Byte) value);
break;
case "java.lang.Short":
- point = builder.addField("value", (Short) value).build();
+ builder.addField(key, (Short) value);
break;
case "java.lang.Integer":
- point = builder.addField("value", (Integer) value).build();
+ builder.addField(key, (Integer) value);
break;
case "java.lang.Long":
- point = builder.addField("value", (Long) value).build();
+ builder.addField(key, (Long) value);
break;
case "java.lang.Float":
- point = builder.addField("value", (Float) value).build();
+ builder.addField(key, (Float) value);
break;
case "java.lang.Double":
- point = builder.addField("value", (Double) value).build();
+ builder.addField(key, (Double) value);
break;
case "java.lang.Character":
- point = builder.addField("value", new String(new char[] {((Character)value).charValue()})).build();
+ builder.addField(key, new String(new char[] {((Character)value).charValue()}));
break;
case "java.lang.Boolean":
- point = builder.addField("value", new String(new char[] {(char)value})).build();
+ builder.addField(key, new String(new char[] {(char)value}));
break;
}
} else if(value instanceof Enum){
- point = builder.addField("value", ((Enum)value).name()).build();
+ builder.addField(key, ((Enum)value).name());
} else {
- point = builder.addField("value", String.valueOf(value)).build();
+ builder.addField(key, String.valueOf(value));
}
- influxDB.write(point);
- influxDB.flush();
- }
+ }
+
+ /*
+ * (non-javadoc)
+ *
+ */
+ private String getTimeWindow(long timeWindow) {
+ if(timeWindow <= MILLISECOND)
+ return "";
+ long[] measures = new long[]{WEEK,DAY,HOUR,MINUTE,SECOND,MILLISECOND};
+ int pos = 0;
+ long d = 0;
+ while(pos < measures.length) {
+ long measure = measures[pos];
+ if(timeWindow > measure) {
+ d = timeWindow / measure;
+ if(d*measure == timeWindow)
+ break;
+ }
+ d = 0;
+ pos+=1;
+ }
+ switch(pos) {
+ case 0:
+ return String.format(" group by time(%sw)",d);
+ case 1:
+ return String.format(" group by time(%sd)",d);
+ case 2:
+ return String.format(" group by time(%sh)",d);
+ case 3:
+ return String.format(" group by time(%sm)",d);
+ case 4:
+ return String.format(" group by time(%ss)",d);
+ case 5:
+ return String.format(" group by time(%sms)",d);
+ default:
+ return "";
+ }
+ }
+
+ /*
+ * (non-javadoc)
+ *
+ */
+ private String getSelectFunction(String column, String function) {
+ String select = null;
+ if(column==null)
+ return null;
+ if(function == null)
+ return column;
+
+ switch(function) {
+ case "avg":
+ select = String.format("MEAN(%s)",column);
+ break;
+ case "count":
+ select = String.format("COUNT(%s)",column);
+ break;
+ case "countDistinct":
+ select = String.format("COUNT(DISTINCT(%s))",column);
+ break;
+ case "distinct":
+ select = String.format("DISTINCT(%s)",column);
+ break;
+ case "max":
+ select = String.format("MAX(%s)",column);
+ break;
+ case "median":
+ select = String.format("MEDIAN(%s)",column);
+ break;
+ case "min":
+ select = String.format("MIN(%s)",column);
+ break;
+ case "sqsum":
+ select = String.format("SQRT(SUM(%s))",column,column);
+ break;
+ case "sum":
+ select = String.format("SUM(%s)",column);
+ break;
+ default:
+ select = column;
+ }
+ return select;
+ }
/**
* Returns the String formated values for the columns List passed as parameter, of the records from
@@ -225,7 +353,6 @@
where = builder.toString();
} else
where="";
-
Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
QueryResult result = this.influxDB.query(query);
return result.toString();
@@ -233,6 +360,47 @@
/**
* Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param column the String name of the column on which the specified aggregation function applies
+ * @param function the String name of the aggregation function applying
+ * @param timeWindow the time window of the specified aggregation function
+ *
+ * @return the JSON formated String result of the research
+ */
+ public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow) {
+ String select = getSelectFunction(column, function);
+ if(select == null)
+ return null;
+ String from = String.format(" FROM %s " , measurement);
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ where = builder.toString();
+ } else
+ where="";
+ String window = getTimeWindow(timeWindow);
+ Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
+ QueryResult result = this.influxDB.query(query);
+ return result.toString();
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
* the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
* argument and starting from the specified String formated start datetime.
*
@@ -283,6 +451,51 @@
/**
* Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param column the String name of the column on which the specified aggregation function applies
+ * @param function the String name of the aggregation function applying
+ * @param timeWindow the time window of the specified aggregation function
+ * @param start the String formated date defining the chronological beginning of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow, String start) {
+ if(start == null || !isValidDate(start))
+ return get(measurement, tags, column, function, timeWindow);
+ String select = getSelectFunction(column, function);
+ if(select == null)
+ return null;
+ String from = String.format(" FROM %s " , measurement);
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ builder.append(String.format(" AND time > '%s'", start));
+ where = builder.toString();
+ } else
+ where = String.format(" WHERE time > '%s'", start);
+ String window = getTimeWindow(timeWindow);
+ Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
+ QueryResult result = this.influxDB.query(query);
+ return result.toString();
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
* the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
* argument, between both String formated start and end datetimes.
*
@@ -332,6 +545,57 @@
QueryResult result = this.influxDB.query(query);
return result.toString();
}
+
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param column the String name of the column on which the specified aggregation function applies
+ * @param function the String name of the aggregation function applying
+ * @param timeWindow the time window of the specified aggregation function
+ * @param start the String formated date defining the chronological beginning of records in which to search
+ * @param end the String formated date defining the chronological ending of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow, String start, String end) {
+ if(end == null || !isValidDate(end))
+ return get(measurement, tags, column, function, timeWindow, start);
+ String select = getSelectFunction(column, function);
+ if(select == null)
+ return null;
+ String from = String.format(" FROM %s " , measurement);
+
+ if(start == null || !isValidDate(start))
+ start = "1970-01-01T00:00:00.001Z";
+
+ String where = null;
+ if(tags != null && !tags.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+ String key = it.next();
+ builder.append(key);
+ builder.append("='");
+ builder.append(tags.get(key));
+ builder.append("'");
+ if(it.hasNext())
+ builder.append(" AND ");
+ }
+ builder.append(String.format(" AND time > '%s' AND time < '%s'", start, end));
+ where = builder.toString();
+ } else
+ where = String.format(" WHERE time > '%s' AND time < '%s'", start, end);
+ String window = getTimeWindow(timeWindow);
+ Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
+ QueryResult result = this.influxDB.query(query);
+ return result.toString();
+ }
+
/**
* Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
@@ -381,7 +645,7 @@
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
return resultMapper.toPOJO(result, resultType);
}
-
+
/**
* Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
* mapped to the specified result type, compliant with tags dictionary also passed as parameter,
@@ -434,6 +698,7 @@
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
return resultMapper.toPOJO(result, resultType);
}
+
/**
* Returns the List of records from the InfluxDB measurement whose name is passed as parameter,