Handle influxdb fields

Handle time aggregation requests
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
index 5ccacff..041ccb4 100644
--- a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
@@ -34,6 +34,19 @@
  */
 public class InfluxDbDatabase {
 
+	private static final long MILLISECOND = 1;
+	
+	private static final long SECOND = MILLISECOND * 1000;
+
+	private static final long MINUTE = SECOND * 60;
+
+	private static final long HOUR = MINUTE * 60;
+
+	private static final long DAY = HOUR * 24;
+	
+	private static final long WEEK = DAY * 7;
+	
+	
 	private interface SimpleDateFormatProvider {
 		Iterator<SimpleDateFormat> iterator();
 	}
@@ -98,7 +111,20 @@
      * @param value the Object value to add
      */
     public void add(String measurement, Dictionary<String,String> tags, Object value) {
-    	this.add(measurement, tags, value, System.currentTimeMillis());
+    	this.add(measurement, tags, null, value, System.currentTimeMillis());
+    }     
+    
+    /**
+     * Adds the Object value passed as parameter to the InfluxDB measurement whose name is also 
+     * passed as parameter, and tagged using the tags dictionary argument 
+     *  
+     * @param measurement the name of the InfluxB measurement to which adding the value 
+     * @param tags the dictionary of tags to be used to tag the value to be added
+     * @param fields the dictionary of fields to be stored with the value
+     * @param value the Object value to add
+     */
+    public void add(String measurement, Dictionary<String,String> tags, Dictionary<String,Object> fields, Object value) {
+    	this.add(measurement, tags, fields, value, System.currentTimeMillis());
     }         
 
     /**
@@ -107,83 +133,185 @@
      *  
      * @param measurement the name of the InfluxB measurement to which adding the value 
      * @param tags the dictionary of tags to be used to tag the value to be added
+     * @param fields the dictionary of fields to be stored with the value
      * @param value the Object value to add
      * @param timestamp the millisecond unix epoch timestamp 
      */
-    public void add(String measurement, Dictionary<String,String> tags, Object value, long timestamp) {
+    public void add(String measurement, Dictionary<String,String> tags, Dictionary<String,Object> fields, Object value, long timestamp) {
     	Point point = null;
         Builder builder = Point.measurement(measurement);
         if(tags != null && !tags.isEmpty()) {		         	       		 
 	    	for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
 	    		String key = it.next();
-	    		builder.tag(key, String.valueOf(tags.get(key)));
+	    		builder.tag(key, tags.get(key));
 	    	}
     	}
     	builder.time(timestamp, TimeUnit.MILLISECONDS);
+
+        if(fields != null && !fields.isEmpty()) {		         	       		 
+	    	for(Iterator<String> it = Collections.list(fields.keys()).iterator();it.hasNext();) {
+	    		String key = it.next();
+	    		addField(builder, key, fields.get(key));
+	    	}
+    	}
+        addField(builder,"value",value);
+        point = builder.build();
+        influxDB.write(point);
+        influxDB.flush();
+    }
+    
+    /*
+     * (non-javadoc)
+     * Add a field with the specified name to the Point Builder passed as parameter after the cast of the value 
+     */    
+    private void addField(Builder builder, String key, Object value) {
+    	if(value == null)
+    		return;
     	if(value.getClass().isPrimitive()) {
     		switch(value.getClass().getName()) {
 	    		case "byte":
-	    			point = builder.addField("value", Byte.valueOf((byte)value)).build();
+	    			builder.addField(key, Byte.valueOf((byte)value));
     				break;
 	    		case "short":
-	    			point = builder.addField("value", Short.valueOf((short)value)).build();
+	    			builder.addField(key, Short.valueOf((short)value));
     				break;
 	    		case "int":
-	    			point = builder.addField("value", Integer.valueOf((int)value)).build();
+	    			builder.addField(key, Integer.valueOf((int)value));
     				break;
 	    		case "long":
-	    			point = builder.addField("value", Long.valueOf((long)value)).build();
+	    			builder.addField(key, Long.valueOf((long)value));
     				break;
 	    		case "float":
-	    			point = builder.addField("value", Float.valueOf((float)value)).build();
+	    			builder.addField(key, Float.valueOf((float)value));
 	    			break;
 	    		case "double":
-	    			point = builder.addField("value", Double.valueOf((double)value)).build();
+	    			builder.addField(key, Double.valueOf((double)value));
     				break;
 	    		case "char":
-	    			point = builder.addField("value", new String(new char[] {(char)value})).build();
+	    			builder.addField(key, new String(new char[] {(char)value}));
     				break;
 	    		case "boolean":
-	    			point = builder.addField("value", Boolean.valueOf((boolean)value)).build();
+	    			builder.addField(key, Boolean.valueOf((boolean)value));
     				break;
     		}
     	} else if (value instanceof String) {
-             point = builder.addField("value", (String) value).build();
+             builder.addField(key, (String) value);
         } else if (value instanceof Number) {
         	switch(value.getClass().getName()) {
     		case "java.lang.Byte":
-    			point = builder.addField("value", (Byte) value).build();
+    			builder.addField(key, (Byte) value);
 				break;
     		case "java.lang.Short":
-    			point = builder.addField("value", (Short) value).build();
+    			builder.addField(key, (Short) value);
 				break;
     		case "java.lang.Integer":
-    			point = builder.addField("value", (Integer) value).build();
+    			builder.addField(key, (Integer) value);
 				break;
     		case "java.lang.Long":
-    			point = builder.addField("value", (Long) value).build();
+    			builder.addField(key, (Long) value);
 				break;
     		case "java.lang.Float":
-    			point = builder.addField("value", (Float) value).build();
+    			builder.addField(key, (Float) value);
     			break;
     		case "java.lang.Double":
-    			point = builder.addField("value", (Double) value).build();
+    			builder.addField(key, (Double) value);
 				break;
     		case "java.lang.Character":
-    			point = builder.addField("value", new String(new char[] {((Character)value).charValue()})).build();
+    			builder.addField(key, new String(new char[] {((Character)value).charValue()}));
 				break;
     		case "java.lang.Boolean":
-    			point = builder.addField("value", new String(new char[] {(char)value})).build();
+    			builder.addField(key, new String(new char[] {(char)value}));
 				break;
         	}			
 		} else if(value instanceof Enum){
-			point = builder.addField("value", ((Enum)value).name()).build();
+			builder.addField(key, ((Enum)value).name());
 		} else {
-			point = builder.addField("value", String.valueOf(value)).build();			
+			builder.addField(key, String.valueOf(value));			
 		}
-        influxDB.write(point);
-        influxDB.flush();
-    }	
+    }
+
+    /*
+     * (non-javadoc)
+     * 
+     */
+    private String getTimeWindow(long timeWindow) {
+    	if(timeWindow <= MILLISECOND)
+    		return "";
+    	long[] measures = new long[]{WEEK,DAY,HOUR,MINUTE,SECOND,MILLISECOND};
+    	int pos = 0;
+    	long d = 0;
+    	while(pos < measures.length) {
+    		long measure = measures[pos];
+    		if(timeWindow > measure) {
+    			d = timeWindow / measure;
+    			if(d*measure == timeWindow)
+    				break;
+    		}
+    		d = 0;
+    		pos+=1;
+    	}
+    	switch(pos) {
+    	case 0:
+    		return String.format(" group by time(%sw)",d);
+    	case 1:
+    		return String.format(" group by time(%sd)",d);
+    	case 2:
+    		return String.format(" group by time(%sh)",d);
+    	case 3:
+    		return String.format(" group by time(%sm)",d);
+    	case 4:
+    		return String.format(" group by time(%ss)",d);
+    	case 5:
+    		return String.format(" group by time(%sms)",d);
+    	default: 
+    		return "";
+    	}    	
+    }
+    
+    /*
+     * (non-javadoc)
+     * 
+     */
+    private String getSelectFunction(String column, String function) {
+    	String select = null;
+    	if(column==null)
+    		return null;
+    	if(function == null)
+    		return column;
+    	
+		switch(function) {
+    		case "avg":
+    			select = String.format("MEAN(%s)",column);
+    			break;
+    		case "count":
+    			select = String.format("COUNT(%s)",column);
+    			break;
+    		case "countDistinct":
+    			select = String.format("COUNT(DISTINCT(%s))",column);
+    			break;
+    		case "distinct":
+    			select = String.format("DISTINCT(%s)",column);
+    			break;
+    		case "max":
+    			select = String.format("MAX(%s)",column);
+    			break;
+    		case "median":
+    			select = String.format("MEDIAN(%s)",column);
+    			break;
+    		case "min":
+    			select = String.format("MIN(%s)",column);
+    			break;
+    		case "sqsum":
+    			select = String.format("SQRT(SUM(%s))",column,column);
+    			break;
+    		case "sum":
+    			select = String.format("SUM(%s)",column);
+    			break;
+    		default:
+    			select = column;   
+    	}
+		return select;
+    }
 
     /**
      * Returns the String formated values for the columns List passed as parameter, of the records from 
@@ -225,7 +353,6 @@
 			where = builder.toString();
     	} else 
     		where="";
-    	
     	Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
     	QueryResult result = this.influxDB.query(query);
     	return result.toString();
@@ -233,6 +360,47 @@
 
     /**
      * Returns the String formated values for the columns List passed as parameter, of the records from 
+     * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary 
+     * argument.
+     * 
+     * @param measurement the name of the InfluxDB measurement in which searching the points
+     * @param tags the dictionary of tags allowing to parameterize the research
+     * @param column the String name of the column on which  the specified aggregation function applies
+     * @param function the String name of the aggregation function applying
+     * @param timeWindow the time window of the specified aggregation function
+     * 
+     * @return the JSON formated String result of the research
+     */
+    public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow) {
+    	String select = getSelectFunction(column, function);
+    	if(select == null)
+    		return null;
+    	String from = String.format(" FROM %s " , measurement);
+    	
+    	String where = null;
+    	if(tags != null && !tags.isEmpty()) {
+	   		StringBuilder builder = new StringBuilder();
+	   		builder.append(" WHERE ");
+			for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+				String key = it.next();
+				builder.append(key);
+				builder.append("='");
+				builder.append(tags.get(key));
+				builder.append("'");
+				if(it.hasNext())
+					builder.append(" AND ");
+			}
+			where = builder.toString();
+    	} else 
+    		where="";
+    	String window = getTimeWindow(timeWindow);
+    	Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
+    	QueryResult result = this.influxDB.query(query);
+    	return result.toString();
+    }
+
+    /**
+     * Returns the String formated values for the columns List passed as parameter, of the records from 
      * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary 
      * argument and starting from the specified String formated start datetime. 
      * 
@@ -283,6 +451,51 @@
 
     /**
      * Returns the String formated values for the columns List passed as parameter, of the records from 
+     * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary 
+     * argument.
+     * 
+     * @param measurement the name of the InfluxDB measurement in which searching the points
+     * @param tags the dictionary of tags allowing to parameterize the research
+     * @param column the String name of the column on which  the specified aggregation function applies
+     * @param function the String name of the aggregation function applying
+     * @param timeWindow the time window of the specified aggregation function
+     * @param start the String formated date defining the chronological beginning of records in which to search 
+     * 
+     * @return the JSON formated String result of the research
+     */
+    public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow, String start) {
+    	if(start == null || !isValidDate(start))
+    		return get(measurement, tags, column, function, timeWindow);
+    	String select = getSelectFunction(column, function);
+    	if(select == null)
+    		return null;
+    	String from = String.format(" FROM %s " , measurement);
+
+    	String where = null;
+    	if(tags != null && !tags.isEmpty()) {    		
+	   		StringBuilder builder = new StringBuilder();
+	   		builder.append(" WHERE ");
+			for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+				String key = it.next();
+				builder.append(key);
+				builder.append("='");
+				builder.append(tags.get(key));
+				builder.append("'");
+				if(it.hasNext())
+					builder.append(" AND ");
+			}
+			builder.append(String.format(" AND time > '%s'", start));
+			where = builder.toString();
+    	} else 
+			where = String.format(" WHERE time > '%s'", start);
+    	String window = getTimeWindow(timeWindow);
+    	Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
+    	QueryResult result = this.influxDB.query(query);
+    	return result.toString();
+    }
+    
+    /**
+     * Returns the String formated values for the columns List passed as parameter, of the records from 
      * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary 
      * argument, between both String formated start and end datetimes. 
      * 
@@ -332,6 +545,57 @@
     	QueryResult result = this.influxDB.query(query);
     	return result.toString();
     }	
+	
+
+    /**
+     * Returns the String formated values for the columns List passed as parameter, of the records from 
+     * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary 
+     * argument.
+     * 
+     * @param measurement the name of the InfluxDB measurement in which searching the points
+     * @param tags the dictionary of tags allowing to parameterize the research
+     * @param column the String name of the column on which  the specified aggregation function applies
+     * @param function the String name of the aggregation function applying
+     * @param timeWindow the time window of the specified aggregation function
+     * @param start the String formated date defining the chronological beginning of records in which to search 
+     * @param end the String formated date defining the chronological ending of records in which to search 
+     * 
+     * @return the JSON formated String result of the research
+     */
+    public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow, String start, String end) {
+    	if(end == null || !isValidDate(end))
+    		return get(measurement, tags, column, function, timeWindow, start);
+    	String select = getSelectFunction(column, function);
+    	if(select == null)
+    		return null;
+    	String from = String.format(" FROM %s " , measurement);
+
+    	if(start == null || !isValidDate(start))   
+			start = "1970-01-01T00:00:00.001Z";
+		
+    	String where = null;
+    	if(tags != null && !tags.isEmpty()) {
+	   		StringBuilder builder = new StringBuilder();
+	   		builder.append(" WHERE ");
+			for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
+				String key = it.next();
+				builder.append(key);
+				builder.append("='");
+				builder.append(tags.get(key));
+				builder.append("'");
+				if(it.hasNext())
+					builder.append(" AND ");
+			}
+		    builder.append(String.format(" AND time > '%s' AND time < '%s'", start, end));
+			where = builder.toString();
+    	} else 
+			where = String.format(" WHERE time > '%s' AND time < '%s'", start, end);
+    	String window = getTimeWindow(timeWindow);
+    	Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
+    	QueryResult result = this.influxDB.query(query);
+    	return result.toString();
+    }
+    
 
     /**
      * Returns the List of records from the InfluxDB measurement whose name is passed as parameter, 
@@ -381,7 +645,7 @@
     	InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
     	return resultMapper.toPOJO(result, resultType);
     }	
-
+    
     /**
      * Returns the List of records from the InfluxDB measurement whose name is passed as parameter, 
      * mapped to the specified result type, compliant with tags dictionary also passed as parameter, 
@@ -434,6 +698,7 @@
     	InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
     	return resultMapper.toPOJO(result, resultType);
     }	
+	
 
     /**
      * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,