Update generic-storage-agent default schema - add 'resource' key
Update influxdb-storage-agent : handle fields - defined default configuration - allows to deactive default configuration and to extend storage schema
diff --git a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java
index 3b704ef..a254a07 100644
--- a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java
+++ b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java
@@ -78,7 +78,21 @@
this.keyProcessors.put("path", new Executable<SnaMessage<?>,String>(){
@Override
public String execute(SnaMessage<?> message) throws Exception {
- return message.getPath();
+ String path = message.getPath();
+ String[] pathElements = UriUtils.getUriElements(path);
+ if(pathElements.length==3)
+ return path.concat("/value");
+ return path;
+ }
+ });
+ this.keyProcessors.put("resource", new Executable<SnaMessage<?>,String>(){
+ @Override
+ public String execute(SnaMessage<?> message) throws Exception {
+ String path = message.getPath();
+ String[] pathElements = UriUtils.getUriElements(path);
+ if(pathElements.length > 2)
+ return pathElements[2];
+ return null;
}
});
this.keyProcessors.put("location", new Executable<SnaMessage<?>,String>(){
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java
index 2e0a323..baa9819 100644
--- a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java
@@ -39,7 +39,7 @@
public void activate(ComponentContext context) {
BundleContext bc = context.getBundleContext();
this.mediator = new Mediator(bc);
-
+
String scheme = (String) mediator.getProperty(INFLUX_AGENT_SCHEME_PROPS);
if(scheme == null)
scheme = InfluxDbConnectorConfiguration.DEFAULT_SCHEME;
@@ -81,7 +81,10 @@
this.measurement = INFLUX_AGENT_DEFAULT_MEASUREMENT;
this.database = this.connector.createIfNotExists("sensinact");
+
+ super.setStorageKeys((String) mediator.getProperty(STORAGE_AGENT_KEYS_PROPS));
super.setStorageConnection(new InfluxDBStorageConnection(mediator, database, this.measurement));
+
}
@Deactivate
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java
index a4bb1e3..e63b53b 100644
--- a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java
@@ -12,7 +12,10 @@
import java.io.IOException;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Set;
import org.eclipse.sensinact.gateway.agent.storage.generic.StorageConnection;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
@@ -27,13 +30,17 @@
*/
public class InfluxDBStorageConnection extends StorageConnection {
+
+ private static final String STORAGE_AGENT_INFLUXDB_FIELDS = "org.eclipse.sensinact.gateway.history.influxdb.fields";
+ private static final String STORAGE_AGENT_INFLUXDB_TAGS = "org.eclipse.sensinact.gateway.history.influxdb.tags";
+ private static final String STORAGE_AGENT_INFLUXDB_ENABLE_DEFAULT = "org.eclipse.sensinact.gateway.history.influxdb.default";
+ private static final String STORAGE_AGENT_INFLUXDB_ENABLE_GEOJSON = "org.eclipse.sensinact.gateway.history.influxdb.geojson";
+
private static final JSONObjectStatement STATEMENT =
new JSONObjectStatement(new JSONTokenerStatement(
"{" +
" \"type\": \"Feature\"," +
- " \"properties\": {" +
- " \"name\": $(name)" +
- " }," +
+ " \"properties\": {}," +
" \"geometry\": {" +
" \"type\": \"Point\"," +
" \"coordinates\": [ $(longitude),$(latitude)] " +
@@ -43,6 +50,11 @@
private String measurement;
private InfluxDbDatabase database;
+ private boolean enableGeoJSON;
+ private boolean enableDefault;
+ private Set<String> fields;
+ private Set<String> tags;
+
/**
* Constructor
*
@@ -57,34 +69,112 @@
super(mediator);
this.database = database;
this.measurement = measurement;
+ this.enableDefault = true;
+ this.enableGeoJSON = false;
+
+ Object fieldsProperty = mediator.getProperty(STORAGE_AGENT_INFLUXDB_FIELDS);
+ Object tagsProperty = mediator.getProperty(STORAGE_AGENT_INFLUXDB_TAGS);
+ Object defaultProperty = mediator.getProperty(STORAGE_AGENT_INFLUXDB_ENABLE_DEFAULT);
+ Object geojsonProperty = mediator.getProperty(STORAGE_AGENT_INFLUXDB_ENABLE_GEOJSON);
+
+ this.fields = new HashSet<>();
+ this.tags = new HashSet<>();
+
+ if(geojsonProperty != null)
+ enableGeoJSON = Boolean.parseBoolean(String.valueOf(geojsonProperty));
+
+ if(defaultProperty != null)
+ enableDefault = Boolean.parseBoolean(String.valueOf(defaultProperty));
+
+ if(enableDefault) {
+ this.fields.add("latitude");
+ this.fields.add("longitude");
+ this.tags.add("path");
+ this.tags.add("resource");
+ }
+ if(enableGeoJSON)
+ this.fields.add("geojson");
+
+ if(fieldsProperty != null) {
+ String fieldsStr = String.valueOf(fieldsProperty);
+ String[] fieldsArr = fieldsStr.split(",");
+ for(String field : fieldsArr) {
+ String fd = field.trim();
+ if(fd.length() > 0)
+ this.fields.add(fd);
+ }
+ }
+ if(tagsProperty != null) {
+ String tagsStr = String.valueOf(tagsProperty);
+ String[] tagsArr = tagsStr.split(",");
+ for(String tag : tagsArr) {
+ String tg = tag.trim();
+ if(tg.length() > 0)
+ this.tags.add(tg);
+ }
+ }
}
@Override
- public void store(JSONObject obj) {
- String uri = (String) obj.opt("path");
- String location = (String) obj.opt("location");
+ public void store(JSONObject obj) {
+ final Dictionary<String,Object> fs = new Hashtable<>();
+ final Dictionary<String,String> ts = new Hashtable<>();
+ for(Iterator<String> it = obj.keys();it.hasNext();) {
+ String key = it.next();
+ if(this.fields.contains(key)) {
+ fs.put(key,obj.get(key));
+ continue;
+ }
+ if(this.tags.contains(key))
+ ts.put(key,String.valueOf(obj.get(key)));
+ }
+ this.extractLocation(fs, obj.opt("location"));
+ this.database.add(measurement, ts, fs, obj.opt(DataResource.VALUE));
+ }
+
+ private void extractLocation(Dictionary<String,Object> fields, Object location) {
+ if(location == null)
+ return;
double latitude = -1;
double longitude = -1;
String geolocation = null;
+ String lc = String.valueOf(location);
- if(location != null) {
- String[] locationElements = location.split(":");
- if(locationElements.length == 2) {
+ String separator;
+ String[] separators = new String[] {":",",",".","-"," "};
+ int ind = 0;
+ for(;ind < separators.length;ind++) {
+ if(lc.indexOf(separators[ind]) > -1)
+ break;
+ }
+ if(ind < separators.length)
+ separator=separators[ind];
+ else
+ return;
+ String[] locationElements = lc.split(separator);
+ if(locationElements.length == 2) {
+ try {
latitude = Double.parseDouble(locationElements[0]);
- longitude = Double.parseDouble(locationElements[1]);
- STATEMENT.apply("latitude", latitude);
- STATEMENT.apply("longitude", longitude);
- STATEMENT.apply("name", obj.opt("provider"));
+ longitude = Double.parseDouble(locationElements[1]);
+ if(this.enableDefault) {
+ fields.put("latitude", latitude);
+ fields.put("longitude", longitude);
+ }
+ STATEMENT.apply("latitude", latitude);
+ STATEMENT.apply("longitude", longitude);
geolocation = STATEMENT.toString();
- } else
- geolocation = location;
- }
- final Dictionary<String,String> ts = new Hashtable<>();
- ts.put("path",uri);
- ts.put("latitude", latitude==-1?"N/A":String.valueOf(latitude));
- ts.put("longitude", longitude==-1?"N/A":String.valueOf(longitude));
- ts.put("geolocation", geolocation==null?"N/A":geolocation);
-
- this.database.add(measurement, ts, obj.opt(DataResource.VALUE));
+
+ } catch(IllegalArgumentException e) {
+ mediator.error(e);
+ }
+ } else
+ geolocation = lc;
+
+ if(this.enableGeoJSON && geolocation!=null)
+ fields.put("geolocation", geolocation);
}
+
+ // Constants.EARTH_SPHERICAL_MODEL_RADIUS * 1000D 2* atan2(sqrt(sqrt(sin(((lat2 - lat1) * Constants.DEGREES_TO_RADIUS_COEF) / 2))+sqrt(sin(((lng2 - lng1) * Constants.DEGREES_TO_RADIUS_COEF) / 2))*cos(lat1 * Constants.DEGREES_TO_RADIUS_COEF)*cos(lat2 * Constants.DEGREES_TO_RADIUS_COEF))
+ // sqrt(1-(sqrt(sin(((lat2 - lat1) * Constants.DEGREES_TO_RADIUS_COEF) / 2))+sqrt(sin(((lng2 - lng1) * Constants.DEGREES_TO_RADIUS_COEF) / 2))*cos(lat1 * Constants.DEGREES_TO_RADIUS_COEF)*cos(lat2 * Constants.DEGREES_TO_RADIUS_COEF))))
+
}