Add Historic reader API implementation for influxdb
diff --git a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/Stack.java b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/Stack.java
similarity index 92%
rename from platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/Stack.java
rename to platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/Stack.java
index f59a1a2..62d90b2 100644
--- a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/Stack.java
+++ b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/Stack.java
@@ -8,7 +8,7 @@
* Contributors:
* Kentyou - initial API and implementation
*/
-package org.eclipse.sensinact.gateway.agent.storage.generic;
+package org.eclipse.sensinact.gateway.historic.storage.agent.generic;
import java.util.ArrayList;
import java.util.List;
diff --git a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/StorageAgent.java
similarity index 98%
rename from platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java
rename to platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/StorageAgent.java
index 9ee3793..f4358ba 100644
--- a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageAgent.java
+++ b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/StorageAgent.java
@@ -8,7 +8,7 @@
* Contributors:
* Kentyou - initial API and implementation
*/
-package org.eclipse.sensinact.gateway.agent.storage.generic;
+package org.eclipse.sensinact.gateway.historic.storage.agent.generic;
import java.text.SimpleDateFormat;
import java.util.Arrays;
diff --git a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageConnection.java b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/StorageConnection.java
similarity index 97%
rename from platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageConnection.java
rename to platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/StorageConnection.java
index 8596d04..e708578 100644
--- a/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/generic/StorageConnection.java
+++ b/platform/northbound/generic-storage-agent/src/main/java/org/eclipse/sensinact/gateway/historic/storage/agent/generic/StorageConnection.java
@@ -8,7 +8,7 @@
* Contributors:
* Kentyou - initial API and implementation
*/
-package org.eclipse.sensinact.gateway.agent.storage.generic;
+package org.eclipse.sensinact.gateway.historic.storage.agent.generic;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.json.JSONObject;
diff --git a/platform/northbound/influxdb-storage-agent/pom.xml b/platform/northbound/influxdb-storage-agent/pom.xml
index 91836f2..7853c1c 100644
--- a/platform/northbound/influxdb-storage-agent/pom.xml
+++ b/platform/northbound/influxdb-storage-agent/pom.xml
@@ -50,6 +50,11 @@
<artifactId>sensinact-core</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.sensinact.gateway</groupId>
+ <artifactId>sensinact-historic-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBHistoricProvider.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBHistoricProvider.java
new file mode 100644
index 0000000..c31c21b
--- /dev/null
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBHistoricProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.agent.storage.influxdb;
+
+import org.eclipse.sensinact.gateway.agent.storage.influxdb.read.InfluxDBSpatialRequest;
+import org.eclipse.sensinact.gateway.agent.storage.influxdb.read.InfluxDBSpatioTemporalRequest;
+import org.eclipse.sensinact.gateway.agent.storage.influxdb.read.InfluxDBTemporalRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricProvider;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricSpatialRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricSpatioTemporalRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricTemporalRequest;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
+
+/**
+ * InfluxDB {@link HistoricProvider}
+ */
+public class InfluxDBHistoricProvider implements HistoricProvider {
+
+ private InfluxDbConnector connector;
+ private String measurement;
+ private String database;
+
+ public InfluxDBHistoricProvider(InfluxDbConnector connector, String database, String measurement){
+ this.connector = connector;
+ this.database = database;
+ this.measurement = measurement;
+ }
+
+ /**
+ * Creates and returns a new {@link HistoricTemporalRequest}
+ *
+ * @return a newly created {@link HistoricTemporalRequest}
+ */
+ public HistoricTemporalRequest newTemporalRequest() {
+ InfluxDBTemporalRequest request = new InfluxDBTemporalRequest(this.connector);
+ request.setDatabase(this.database);
+ request.setMeasurement(this.measurement);
+ return request;
+ }
+
+ /**
+ * Creates and returns a new {@link HistoricSpatialRequest}
+ *
+ * @return a newly created {@link HistoricSpatialRequest}
+ */
+ public HistoricSpatialRequest newSpatialRequest() {
+ InfluxDBSpatialRequest request = new InfluxDBSpatialRequest(this.connector);
+ request.setDatabase(this.database);
+ request.setMeasurement(this.measurement);
+ return request;
+ }
+
+ /**
+ * Creates and returns a new {@link HistoricSpatioTemporalRequest}
+ *
+ * @return a newly created {@link HistoricSpatioTemporalRequest}
+ */
+ public HistoricSpatioTemporalRequest newSpatioTemporalRequest() {
+ InfluxDBSpatioTemporalRequest request = new InfluxDBSpatioTemporalRequest(this.connector);
+ request.setDatabase(this.database);
+ request.setMeasurement(this.measurement);
+ return request;
+ }
+
+
+}
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java
index 168a160..91699a2 100644
--- a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/InfluxDBStorageAgent.java
@@ -1,11 +1,22 @@
+/*
+ * Copyright (c) 2021 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
package org.eclipse.sensinact.gateway.agent.storage.influxdb;
-import java.io.IOException;
+import java.util.Hashtable;
-import org.eclipse.sensinact.gateway.agent.storage.generic.StorageAgent;
-import org.eclipse.sensinact.gateway.agent.storage.influxdb.internal.InfluxDBStorageConnection;
+import org.eclipse.sensinact.gateway.agent.storage.influxdb.write.InfluxDBStorageConnection;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.core.message.AgentRelay;
+import org.eclipse.sensinact.gateway.historic.storage.agent.generic.StorageAgent;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricProvider;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnectorConfiguration;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbDatabase;
@@ -89,7 +100,10 @@
this.measurement = INFLUX_AGENT_DEFAULT_MEASUREMENT;
this.database = this.connector.createIfNotExists(db);
-
+
+ InfluxDBHistoricProvider provider = new InfluxDBHistoricProvider(connector, db, this.measurement);
+ this.mediator.register(new Hashtable(), provider,new Class[] { HistoricProvider.class});
+
super.setStorageKeys((String) mediator.getProperty(STORAGE_AGENT_KEYS_PROPS));
super.setStorageConnection(new InfluxDBStorageConnection(mediator, database, this.measurement));
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBRequest.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBRequest.java
new file mode 100644
index 0000000..531c49b
--- /dev/null
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBRequest.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright (c) 2021 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.agent.storage.influxdb.read;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.eclipse.sensinact.gateway.core.DataResource;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.TemporalDTO;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDBTagDTO;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbDatabase;
+import org.influxdb.dto.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class InfluxDBRequest<T> implements HistoricRequest<T>{
+
+ private static final Logger LOG = LoggerFactory.getLogger(InfluxDBRequest.class);
+
+ private static final List<String> REPLACEMENTS = Arrays.asList(":",".","-","[","]","(",")","{","}","+");
+
+ protected String database;
+ protected String measurement;
+
+ protected String provider;
+ protected String service;
+ protected String resource;
+ protected LocalDateTime start;
+ protected LocalDateTime end;
+
+ protected InfluxDbConnector influxDbConnector;
+
+ public InfluxDBRequest(InfluxDbConnector influxDbConnector) {
+ this.influxDbConnector = influxDbConnector;
+ }
+
+ public void setMeasurement(String measurement) {
+ this.measurement = measurement;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ @Override
+ public void setServiceProviderIdentifier(String provider) {
+ this.provider = provider;
+ }
+
+ @Override
+ public void setServiceIdentifier(String service) {
+ this.service = service;
+ }
+
+ @Override
+ public void setResourceIdentifier(String resource) {
+ this.resource = resource;
+ }
+
+ @Override
+ public void setHistoricStartTime(LocalDateTime fromTime) {
+ this.start = fromTime;
+ }
+
+ @Override
+ public void setHistoricEndTime(LocalDateTime toTime) {
+ this.end = toTime;
+ }
+
+ protected InfluxDBTagDTO getDataSourcePath() {
+ InfluxDBTagDTO historicAttributeDTO = new InfluxDBTagDTO();
+ historicAttributeDTO.name="path";
+ String datasource = null;
+ if(this.resource == null) {
+
+ if(this.provider == null)
+ return null;
+
+ datasource = provider;
+ for(String replacement:REPLACEMENTS)
+ datasource = datasource.replace(replacement,"\\".concat(replacement));
+
+ historicAttributeDTO.value=new StringBuilder(
+ ).append("/("
+ ).append(datasource
+ ).append("\\/([^\\/]+\\/?)+)/"
+ ).toString();
+ historicAttributeDTO.pattern=true;
+ return historicAttributeDTO;
+ }
+ if(this.provider == null){
+ if(this.service == null)
+ return null;
+
+ datasource = this.service;
+ for(String replacement:REPLACEMENTS)
+ datasource = datasource.replace(replacement,"\\".concat(replacement));
+
+ datasource = new StringBuilder(
+ ).append("/([^\\/]+\\/"
+ ).append(datasource
+ ).append("\\/[^\\/]+)/"
+ ).toString();
+ historicAttributeDTO.value=datasource;
+ historicAttributeDTO.pattern=true;
+ return historicAttributeDTO;
+
+ }
+ historicAttributeDTO.value=new StringBuilder(
+ ).append("/"
+ ).append(this.provider
+ ).append("/"
+ ).append(this.service
+ ).append("/"
+ ).append(this.resource
+ ).append("/"
+ ).append(DataResource.VALUE
+ ).toString();
+ historicAttributeDTO.pattern=false;
+ return historicAttributeDTO;
+ }
+
+ protected InfluxDBTagDTO getResource() {
+ if(this.resource == null)
+ return null;
+ InfluxDBTagDTO historicAttributeDTO = new InfluxDBTagDTO();
+ historicAttributeDTO.name="resource";
+ historicAttributeDTO.value=resource;
+ historicAttributeDTO.pattern=false;
+ return historicAttributeDTO;
+ }
+
+ private List<TemporalDTO> buildTemporalDTOList(QueryResult result){
+ List<List<Object>> serie = null;
+ try {
+ serie = result.getResults().get(0).getSeries().get(0).getValues();
+ } catch(NullPointerException e ) {
+ return Collections.<TemporalDTO>emptyList();
+ }
+ List<TemporalDTO> list = new ArrayList<>();
+ for(int i=0;i<serie.size();i++){
+ TemporalDTO dto = null;
+ try {
+ dto = new TemporalDTO();
+ dto.tagID = i;
+ dto.timestamp = Instant.parse(String.valueOf(serie.get(i).get(0))).toEpochMilli();
+ dto.value = String.valueOf(serie.get(i).get(1));
+ list.add(dto);
+ } catch(Exception e) {
+ LOG.error(e.getMessage(),e);
+ if(dto!=null) {
+ dto.error=e.getMessage();
+ list.add(dto);
+ }
+ }
+ }
+ return list;
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the
+ * records from the InfluxDB measurement whose name is also passed as parameter, and
+ * compliant with tags dictionary argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param columns the Strings List defining the fields to be provided
+ * @return the JSON formated String result of the research
+ */
+ protected List<TemporalDTO> get(InfluxDbDatabase db, String measurement, List<InfluxDBTagDTO> tags) {
+ QueryResult result = db.getResult(measurement, tags, Arrays.asList("time","value"));
+ List<TemporalDTO> list = buildTemporalDTOList(result);
+ return list;
+ }
+
+ /**
+ * Returns the String formated aggregated values for the column passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param function the String name of the aggregation function applying
+ * @param timeWindow the time window of the specified aggregation function
+ * @param column the String name of the column on which the specified aggregation function applies
+ * @return the JSON formated String result of the research
+ */
+ protected List<TemporalDTO> get(InfluxDbDatabase db, String measurement, List<InfluxDBTagDTO> tags, String function, long timeWindow) {
+ QueryResult result = db.getResult(measurement, tags, "value", function, timeWindow);
+ List<TemporalDTO> list = buildTemporalDTOList(result);
+ return list;
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
+ * argument and starting from the specified String formated start datetime.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param columns the Strings List defining the fields to be provided
+ * @param start the LocalDateTime defining the chronological beginning of records in which to search
+ * @return the JSON formated String result of the research
+ */
+ protected List<TemporalDTO> get(InfluxDbDatabase db, String measurement, List<InfluxDBTagDTO> tags, LocalDateTime start) {
+ QueryResult result = db.getResult(measurement, tags, Arrays.asList("time","value"), start);
+ List<TemporalDTO> list = buildTemporalDTOList(result);
+ return list;
+ }
+
+ /**
+ * Returns the String formated aggregated values for the column passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param column the String name of the column on which the specified aggregation function applies
+ * @param function the String name of the aggregation function applying
+ * @param timeWindow the time window of the specified aggregation function
+ * @param start the LocalDateTime defining the chronological beginning of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ protected List<TemporalDTO> get(InfluxDbDatabase db, String measurement, List<InfluxDBTagDTO> tags, String function, long timeWindow, LocalDateTime start) {
+ QueryResult result = db.getResult(measurement, tags, "value", function, timeWindow, start);
+ List<TemporalDTO> list = buildTemporalDTOList(result);
+ return list;
+ }
+
+ /**
+ * Returns the String formated values for the columns List passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
+ * argument, between both start and end datetimes.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param columns the Strings List defining the fields to be provided
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param start the LocalDateTime defining the chronological beginning of records in which to search
+ * @param end the LocalDateTime defining the chronological ending of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ protected List<TemporalDTO> get(InfluxDbDatabase db, String measurement, List<InfluxDBTagDTO> tags, LocalDateTime start, LocalDateTime end) {
+ QueryResult result = db.getResult(measurement, tags, Arrays.asList("time","value"), start, end);
+ List<TemporalDTO> list = buildTemporalDTOList(result);
+ return list;
+ }
+
+ /**
+ * Returns the String formated aggregated values for the column passed as parameter, of the records from
+ * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
+ * argument.
+ *
+ * @param measurement the name of the InfluxDB measurement in which searching the points
+ * @param tags the dictionary of tags allowing to parameterize the research
+ * @param column the String name of the column on which the specified aggregation function applies
+ * @param function the String name of the aggregation function applying
+ * @param timeWindow the time window of the specified aggregation function
+ * @param start the LocalDateTime defining the chronological beginning of records in which to search
+ * @param end the LocalDateTime defining the chronological ending of records in which to search
+ *
+ * @return the JSON formated String result of the research
+ */
+ protected List<TemporalDTO> get(InfluxDbDatabase db, String measurement, List<InfluxDBTagDTO> tags, String function, long timeWindow, LocalDateTime start, LocalDateTime end) {
+ QueryResult result = db.getResult(measurement, tags, "value", function, timeWindow, start, end);
+ List<TemporalDTO> list = buildTemporalDTOList(result);
+ return list;
+ }
+
+}
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBSpatialRequest.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBSpatialRequest.java
new file mode 100644
index 0000000..92d2f1f
--- /dev/null
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBSpatialRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.agent.storage.influxdb.read;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricSpatialRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.SpatialDTO;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
+
+
+public class InfluxDBSpatialRequest extends InfluxDBRequest<SpatialDTO> implements HistoricSpatialRequest{
+
+ protected String region;
+
+ public InfluxDBSpatialRequest(InfluxDbConnector influxDbConnector) {
+ super(influxDbConnector);
+ }
+
+ @Override
+ public List<SpatialDTO> execute() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void setRegion(String region) {
+ this.region = region;
+ }
+}
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBSpatioTemporalRequest.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBSpatioTemporalRequest.java
new file mode 100644
index 0000000..168f75c
--- /dev/null
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBSpatioTemporalRequest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.agent.storage.influxdb.read;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricSpatioTemporalRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.SpatioTemporalDTO;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
+
+
+public class InfluxDBSpatioTemporalRequest extends InfluxDBRequest<SpatioTemporalDTO> implements HistoricSpatioTemporalRequest{
+
+ protected String region;
+ protected String function;
+ protected long temporalWindow;
+
+ public InfluxDBSpatioTemporalRequest(InfluxDbConnector influxDbConnector) {
+ super(influxDbConnector);
+ }
+
+ @Override
+ public List<SpatioTemporalDTO> execute() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void setFunction(String function) {
+ this.function = function;
+ }
+
+ @Override
+ public void setTemporalWindow(long temporalWindow) {
+ this.temporalWindow = temporalWindow;
+ }
+
+ @Override
+ public void setRegion(String region) {
+ this.region = region;
+ }
+}
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBTemporalRequest.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBTemporalRequest.java
new file mode 100644
index 0000000..d3156e5
--- /dev/null
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/read/InfluxDBTemporalRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 Kentyou.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Kentyou - initial API and implementation
+ */
+package org.eclipse.sensinact.gateway.agent.storage.influxdb.read;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricTemporalRequest;
+import org.eclipse.sensinact.gateway.historic.storage.reader.api.TemporalDTO;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
+import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbDatabase;
+
+
+public class InfluxDBTemporalRequest extends InfluxDBRequest<TemporalDTO> implements HistoricTemporalRequest{
+
+ protected String function;
+ protected long temporalWindow;
+
+ public InfluxDBTemporalRequest(InfluxDbConnector influxDbConnector) {
+ super(influxDbConnector);
+ }
+
+ @Override
+ public List<TemporalDTO> execute() {
+ InfluxDbDatabase db = influxDbConnector.getIfExists(super.database);
+ if(db == null)
+ return Collections.emptyList();
+ List<TemporalDTO> s;
+ if(this.function == null) {
+ s = super.get(db,
+ super.measurement.concat("_num"),
+ Arrays.asList(super.getDataSourcePath(), super.getResource()),
+ super.start,
+ super.end);
+ } else {
+ s = super.get(db,
+ super.measurement.concat("_num"),
+ Arrays.asList(super.getDataSourcePath(), super.getResource()),
+ this.function,
+ this.temporalWindow <=0?10000:temporalWindow,
+ super.start,
+ super.end);
+ }
+ return s;
+ }
+
+ @Override
+ public void setFunction(String function) {
+ this.function = function;
+ }
+
+ @Override
+ public void setTemporalWindow(long temporalWindow) {
+ this.temporalWindow = temporalWindow;
+ }
+
+}
diff --git a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/write/InfluxDBStorageConnection.java
similarity index 96%
rename from platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java
rename to platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/write/InfluxDBStorageConnection.java
index 352b219..b1e7dba 100644
--- a/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/internal/InfluxDBStorageConnection.java
+++ b/platform/northbound/influxdb-storage-agent/src/main/java/org/eclipse/sensinact/gateway/agent/storage/influxdb/write/InfluxDBStorageConnection.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Kentyou.
+ * Copyright (c) 2021 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -8,7 +8,7 @@
* Contributors:
* Kentyou - initial API and implementation
*/
-package org.eclipse.sensinact.gateway.agent.storage.influxdb.internal;
+package org.eclipse.sensinact.gateway.agent.storage.influxdb.write;
import java.io.IOException;
import java.util.Dictionary;
@@ -17,9 +17,9 @@
import java.util.Iterator;
import java.util.Set;
-import org.eclipse.sensinact.gateway.agent.storage.generic.StorageConnection;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.core.DataResource;
+import org.eclipse.sensinact.gateway.historic.storage.agent.generic.StorageConnection;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbDatabase;
import org.eclipse.sensinact.gateway.util.json.JSONObjectStatement;
import org.eclipse.sensinact.gateway.util.json.JSONTokenerStatement;
@@ -119,7 +119,7 @@
String measurement = null;
final Dictionary<String,Object> fs = new Hashtable<>();
final Dictionary<String,String> ts = new Hashtable<>();
- for(Iterator<String> it = obj.keys();it.hasNext();) {
+ for(Iterator<String> it = obj.keys(); it.hasNext();) {
String key = it.next();
if(this.fields.contains(key)) {
fs.put(key,obj.get(key));
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java
index 8b1562d..4166d8d 100644
--- a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnector.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Kentyou.
+ * Copyright (c) 2021 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
@@ -16,10 +16,11 @@
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Pong;
+
/**
- * InfluxDB database connector
+ * Historic provider implementation connected to InfluxDB database
*/
-public class InfluxDbConnector {
+public class InfluxDbConnector {
private InfluxDB influxDB;
@@ -48,26 +49,13 @@
else
connected = this.connect(configuration.getUri());
if(connected) {
- influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
+ influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
influxDB.enableBatch();
}
else
throw new IOException("Unable to connect");
}
-
- /**
- * Creates a connection and returns true if it has been done properly.
- * Returns false otherwise
- *
- * @param uri
- * @param database
- *
- * @return <ul>
- * <li>true if the connection has been properly done</li>
- * <li>false otherwise</li>
- * </ul>
- */
private boolean connect(String uri) {
influxDB = InfluxDBFactory.connect(uri);
if(!checkVersion())
@@ -75,21 +63,6 @@
return true;
}
-
- /**
- * Creates a connection and returns true if it has been done properly.
- * Returns false otherwise
- *
- * @param uri
- * @param username
- * @param password
- * @param database
- *
- * @return <ul>
- * <li>true if the connection has been properly done</li>
- * <li>false otherwise</li>
- * </ul>
- */
private boolean connect(String uri, String username, String password) {
influxDB = InfluxDBFactory.connect(uri, username, password);
if(!checkVersion())
@@ -99,7 +72,7 @@
private boolean checkVersion() {
Pong response = this.influxDB.ping();
- if (response.getVersion().equalsIgnoreCase("unknown")) {
+ if (!response.isGood()) {
System.out.println("Error pinging server.");
influxDB.close();
influxDB = null;
@@ -125,7 +98,8 @@
public boolean exists(String databaseName) {
return this.influxDB.databaseExists(databaseName);
}
-
+
+
/**
* Returns the database with the name passed as parameter if it exists, otherwise
* it is created and returned
@@ -161,11 +135,22 @@
}
/**
- * Close the connection this InfluxDbConnector initiated
- * with an InfluxDB instance
+ * Returns the database with the name passed as parameter if it exists, otherwise
+ * returns null
+ *
+ * @param databaseName the name of the database
+ * @param retentionPolicy the String retention policy applying on the database
+ *
+ * @return
+ * <ul>
+ * <li>the database with the specified name if it exists</li>
+ * <li>null otherwise</li>
+ * </ul>
*/
- public void close() {
- this.influxDB.close();
+ public InfluxDbDatabase getIfExists(String databaseName, String retentionPolicy) {
+ if(!exists(databaseName))
+ return null;
+ return new InfluxDbDatabase(this.influxDB,databaseName,retentionPolicy);
}
-
+
}
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java
index 42f7e4b..47f3d32 100644
--- a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbConnectorConfiguration.java
@@ -1,12 +1,9 @@
-/*
- * Copyright (c) 2020 Kentyou.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Kentyou - initial API and implementation
+/*
+ * Copyright 2021 Kentyou
+ * Proprietary and confidential
+ *
+ * All Rights Reserved.
+ * Unauthorized copying of this file is strictly prohibited
*/
package org.eclipse.sensinact.gateway.tools.connector.influxdb;
@@ -241,7 +238,7 @@
*/
public String getUri() {
String port = null;
- if(this.port == 0)
+ if(this.port <= 0)
port="";
else
port=":".concat(String.valueOf(this.port));
diff --git a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
index e256b70..77e08f2 100644
--- a/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
+++ b/platform/tools/influxdb-connector/src/main/java/org/eclipse/sensinact/gateway/tools/connector/influxdb/InfluxDbDatabase.java
@@ -1,33 +1,29 @@
-/*
- * Copyright (c) 2020 Kentyou.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Kentyou - initial API and implementation
+/*
+ * Copyright 2021 Kentyou
+ * Proprietary and confidential
+ *
+ * All Rights Reserved.
+ * Unauthorized copying of this file is strictly prohibited
*/
package org.eclipse.sensinact.gateway.tools.connector.influxdb;
-import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.Dictionary;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.eclipse.sensinact.gateway.util.LocationUtils;
-import org.eclipse.sensinact.gateway.util.location.Segment;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
-import org.influxdb.impl.InfluxDBResultMapper;
-import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,40 +65,35 @@
};
}
};
-
- private static final Logger LOG = LoggerFactory.getLogger(InfluxDbDatabase.class);
private InfluxDB influxDB;
private String database;
+ private final ZoneOffset offset;
/**
* Constructor
*
* @param influxDB the {@link InfluxDB} wrapped by the InfluxDbDatabase to be instantiated
+ * @param database the String database name
*/
public InfluxDbDatabase(InfluxDB influxDB, String database) {
- this.influxDB = influxDB;
- this.database = database;
-
- influxDB.setDatabase(database);
- influxDB.setRetentionPolicy("autogen");
+ this(influxDB,database,"autogen");
}
- private boolean isValidDate(String datetime) {
- SimpleDateFormatProvider formatProvider = THREAD_LOCAL_FORMATS.get();
- boolean valid = false;
- for(Iterator<SimpleDateFormat> it = formatProvider.iterator();it.hasNext();) {
- SimpleDateFormat format = it.next();
- try {
- format.parse(datetime);
- valid = true;
- break;
- }catch(ParseException e) {
- LOG.error(e.getMessage(),e);
- };
- }
- THREAD_LOCAL_FORMATS.remove();
- return valid;
+ /**
+ * Constructor
+ *
+ * @param influxDB the {@link InfluxDB} wrapped by the InfluxDbDatabase to be instantiated
+ * @param database the String database name
+ * @param retention the String retention policy applying
+ */
+ public InfluxDbDatabase(InfluxDB influxDB, String database, String retention) {
+ this.influxDB = influxDB;
+ this.database = database;
+ this.offset = ZoneOffset.systemDefault().getRules().getOffset(Instant.now());
+
+ influxDB.setDatabase(database);
+ influxDB.setRetentionPolicy(retention);
}
/**
@@ -233,7 +224,7 @@
builder.addField(key, String.valueOf(value));
}
}
-
+
/*
* (non-javadoc)
*/
@@ -271,19 +262,16 @@
}
}
- /*
- * (non-javadoc)
- *
- */
private String getSelectFunction(String column, String function) {
String select = null;
- if(column==null)
+ if(column ==null || column.trim().length()==0)
return null;
- if(function == null)
- return column;
+ if(function == null)
+ return column;
switch(function) {
- case "avg":
+ case "avg":
+ case "mean":
select = String.format("MEAN(%s)",column);
break;
case "count":
@@ -304,6 +292,7 @@
case "min":
select = String.format("MIN(%s)",column);
break;
+ case "sum_square":
case "sqsum":
select = String.format("SQRT(SUM(%s))",column,column);
break;
@@ -312,22 +301,33 @@
break;
default:
select = column;
- }
+ }
return select;
}
-
- /**
- * Returns the String formated values for the columns List passed as parameter, of the records from
- * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
- * argument.
- *
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param columns the Strings List defining the fields to be provided
- * @param tags the dictionary of tags allowing to parameterize the research
- *
- * @return the JSON formated String result of the research
- */
- public String get(String measurement, List<String> columns, Dictionary<String,String> tags) {
+
+ private String buildWhereClause(List<InfluxDBTagDTO> tags) {
+ String where=null;
+ if(tags == null || tags.isEmpty()) {
+ where="";
+ return where;
+ }
+ StringBuilder builder = new StringBuilder();
+ builder.append(" WHERE ");
+ for(int i=0;i<tags.size();i++) {
+ InfluxDBTagDTO t = tags.get(i);
+ if(i > 0)
+ builder.append(" AND ");
+ builder.append(t.name);
+ builder.append(t.pattern?"=~":"=");
+ builder.append(t.pattern?" ":"'");
+ builder.append(t.value);
+ builder.append(t.pattern?" ":"'");
+ }
+ where = builder.toString();
+ return where;
+ }
+
+ public QueryResult getResult(String measurement, List<InfluxDBTagDTO> tags, List<String> columns) {
String select = null;
if(columns==null || columns.isEmpty())
select = "* ";
@@ -338,85 +338,29 @@
}, (h,t)->h.append(t.toString())).toString();
select =select.substring(0,select.length()-1);
- String from = String.format(" FROM %s " , measurement);
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- where = builder.toString();
- } else
- where="";
+ String from = String.format(" FROM %s " , measurement);
+ String where = buildWhereClause(tags);
Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
QueryResult result = this.influxDB.query(query);
- return result.toString();
+ return result;
}
- /**
- * Returns the String formated values for the columns List passed as parameter, of the records from
- * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
- * argument.
- *
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param column the String name of the column on which the specified aggregation function applies
- * @param function the String name of the aggregation function applying
- * @param timeWindow the time window of the specified aggregation function
- *
- * @return the JSON formated String result of the research
- */
- public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow) {
+ public QueryResult getResult(String measurement, List<InfluxDBTagDTO> tags, String column, String function, long timeWindow) {
String select = getSelectFunction(column, function);
if(select == null)
return null;
String from = String.format(" FROM %s " , measurement);
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- where = builder.toString();
- } else
- where="";
+ String where = buildWhereClause(tags);
String window = getTimeWindow(timeWindow);
Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
QueryResult result = this.influxDB.query(query);
- return result.toString();
+ return result;
}
- /**
- * Returns the String formated values for the columns List passed as parameter, of the records from
- * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
- * argument and starting from the specified String formated start datetime.
- *
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param columns the Strings List defining the fields to be provided
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param start the String formated date defining the chronological beginning of records in which to search
- *
- * @return the JSON formated String result of the research
- */
- public String get(String measurement, List<String> columns, Dictionary<String,String> tags, String start) {
- if(start == null || !isValidDate(start))
- return get(measurement, columns, tags);
+ public QueryResult getResult(String measurement, List<InfluxDBTagDTO> tags, List<String> columns, LocalDateTime start) {
+ if(start == null)
+ return getResult(measurement, tags, columns);
String select = null;
if(columns==null || columns.isEmpty())
select = "* ";
@@ -429,90 +373,59 @@
String from = String.format(" FROM %s " , measurement);
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- builder.append(String.format(" AND time > '%s'", start));
- where = builder.toString();
- } else
- where = String.format(" WHERE time > '%s'", start);
+ String where = buildWhereClause(tags);
+
+ SimpleDateFormatProvider formatProvider = THREAD_LOCAL_FORMATS.get();
+ SimpleDateFormat df = formatProvider.iterator().next();
+
+ String startDate = df.format(new Date(start.toInstant(offset).toEpochMilli()));
+
+ THREAD_LOCAL_FORMATS.remove();
+ if(where.length() == 0)
+ where = String.format(" WHERE time > '%s'", startDate);
+ else {
+ StringBuilder builder = new StringBuilder();
+ builder.append(where);
+ builder.append(String.format(" AND time > '%s'", startDate));
+ where = builder.toString();
+ }
Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
QueryResult result = this.influxDB.query(query);
- return result.toString();
+ return result;
}
- /**
- * Returns the String formated values for the columns List passed as parameter, of the records from
- * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
- * argument.
- *
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param column the String name of the column on which the specified aggregation function applies
- * @param function the String name of the aggregation function applying
- * @param timeWindow the time window of the specified aggregation function
- * @param start the String formated date defining the chronological beginning of records in which to search
- *
- * @return the JSON formated String result of the research
- */
- public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow, String start) {
- if(start == null || !isValidDate(start))
- return get(measurement, tags, column, function, timeWindow);
+ public QueryResult getResult(String measurement, List<InfluxDBTagDTO> tags, String column, String function, long timeWindow, LocalDateTime start) {
+ if(start == null)
+ return getResult(measurement, tags, column, function, timeWindow);
String select = getSelectFunction(column, function);
if(select == null)
return null;
String from = String.format(" FROM %s " , measurement);
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- builder.append(String.format(" AND time > '%s'", start));
- where = builder.toString();
- } else
- where = String.format(" WHERE time > '%s'", start);
+ String where = buildWhereClause(tags);
+ SimpleDateFormatProvider formatProvider = THREAD_LOCAL_FORMATS.get();
+
+ SimpleDateFormat df = formatProvider.iterator().next();
+ String startDate = df.format(new Date(start.toInstant(offset).toEpochMilli()));
+
+ THREAD_LOCAL_FORMATS.remove();
+ if(where.length() == 0)
+ where = String.format(" WHERE time > '%s'", startDate);
+ else {
+ StringBuilder builder = new StringBuilder();
+ builder.append(where);
+ builder.append(String.format(" AND time > '%s'", startDate));
+ where = builder.toString();
+ }
String window = getTimeWindow(timeWindow);
Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
QueryResult result = this.influxDB.query(query);
- return result.toString();
+ return result;
}
-
- /**
- * Returns the String formated values for the columns List passed as parameter, of the records from
- * the InfluxDB measurement whose name is also passed as parameter, compliant with tags dictionary
- * argument, between both String formated start and end datetimes.
- *
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param columns the Strings List defining the fields to be provided
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param start the String formated date defining the chronological beginning of records in which to search
- * @param end the String formated date defining the chronological ending of records in which to search
- *
- * @return the JSON formated String result of the research
- */
- public String get(String measurement, List<String> columns, Dictionary<String,String> tags, String start, String end) {
- if(end == null || !isValidDate(end))
- return get(measurement, columns, tags, start);
+
+ public QueryResult getResult(String measurement, List<InfluxDBTagDTO> tags, List<String> columns, LocalDateTime start, LocalDateTime end) {
+ if(end == null)
+ return this.getResult(measurement, tags, columns, start);
String select = null;
if(columns==null || columns.isEmpty())
select = "* ";
@@ -522,241 +435,61 @@
b.append(",");
}, (h,t)->h.append(t.toString())).toString();
select =select.substring(0,select.length()-1);
-
+
+ SimpleDateFormatProvider formatProvider = THREAD_LOCAL_FORMATS.get();
+ SimpleDateFormat df = formatProvider.iterator().next();
+
+ String startDate=null;
+ String endDate=null;
+
+ startDate = df.format(new Date(start.toInstant(offset).toEpochMilli()));
+ endDate = df.format(new Date(end.toInstant(offset).toEpochMilli()));
+
+ THREAD_LOCAL_FORMATS.remove();
+
String from = String.format(" FROM %s " , measurement);
- if(start == null || !isValidDate(start))
- start = "1970-01-01T00:00:00.001Z";
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- builder.append(String.format(" AND time > '%s' AND time < '%s'", start, end));
- where = builder.toString();
- } else
- where = String.format(" WHERE time > '%s' AND time < '%s'", start, end);
+ String where = buildWhereClause(tags);
+ if(where.length() == 0)
+ where = String.format(" WHERE time > '%s' AND time < '%s'", startDate, endDate);
+ else {
+ StringBuilder builder = new StringBuilder();
+ builder.append(where);
+ builder.append(String.format(" AND time > '%s' AND time < '%s'", startDate, endDate));
+ where = builder.toString();
+ }
Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
- QueryResult result = this.influxDB.query(query);
- return result.toString();
+ QueryResult result = this.influxDB.query(query);
+ return result;
}
+ public QueryResult getResult(String measurement, List<InfluxDBTagDTO> tags, String column, String function, long timeWindow, LocalDateTime start, LocalDateTime end) {
+ if(end == null)
+ return getResult(measurement, tags, column, function, timeWindow, start);
- /**
- * Returns the String formated values for the columns List passed as parameter, of the records from
- * the InfluxDB measurement whose name is also passed as parameter, and compliant with tags dictionary
- * argument.
- *
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param column the String name of the column on which the specified aggregation function applies
- * @param function the String name of the aggregation function applying
- * @param timeWindow the time window of the specified aggregation function
- * @param start the String formated date defining the chronological beginning of records in which to search
- * @param end the String formated date defining the chronological ending of records in which to search
- *
- * @return the JSON formated String result of the research
- */
- public String get(String measurement, Dictionary<String,String> tags, String column, String function, long timeWindow, String start, String end) {
- if(end == null || !isValidDate(end))
- return get(measurement, tags, column, function, timeWindow, start);
String select = getSelectFunction(column, function);
if(select == null)
return null;
String from = String.format(" FROM %s " , measurement);
- if(start == null || !isValidDate(start))
- start = "1970-01-01T00:00:00.001Z";
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- builder.append(String.format(" AND time > '%s' AND time < '%s'", start, end));
- where = builder.toString();
- } else
- where = String.format(" WHERE time > '%s' AND time < '%s'", start, end);
+ SimpleDateFormatProvider formatProvider = THREAD_LOCAL_FORMATS.get();
+ SimpleDateFormat df = formatProvider.iterator().next();
+
+ String startDate = df.format(new Date(start.toInstant(offset).toEpochMilli()));
+ String endDate = df.format(new Date(end.toInstant(offset).toEpochMilli()));
+
+ THREAD_LOCAL_FORMATS.remove();
+ String where = buildWhereClause(tags);
+ if(where.length() == 0)
+ where = String.format(" WHERE time > '%s' AND time < '%s'", startDate, endDate);
+ else {
+ StringBuilder builder = new StringBuilder();
+ builder.append(where);
+ builder.append(String.format(" AND time > '%s' AND time < '%s'", startDate, endDate));
+ where = builder.toString();
+ }
String window = getTimeWindow(timeWindow);
Query query = new Query(String.format("SELECT %s%s%s%s", select, from, where, window),database);
QueryResult result = this.influxDB.query(query);
- return result.toString();
- }
-
-
- /**
- * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
- * mapped to the specified result type, and compliant with tags dictionary also passed as parameter
- *
- * @param <T> result unit type
- *
- * @param resultType the type to which found points (records) are mapped
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param columns the Strings List defining the fields to be provided
- * @param tags the dictionary of tags allowing to parameterize the research
- *
- * @return the List of resultType typed instances resulting of the search
- */
- public <T> List<T> get(Class<T> resultType, String measurement, List<String> columns, Dictionary<String,String> tags) {
- String select = null;
- if(columns.isEmpty())
- select = "* ";
- else
- select = columns.stream().collect(StringBuilder::new,(b,s)-> {
- b.append(s);
- b.append(",");
- }, (h,t)->h.append(t.toString())).toString();
- select =select.substring(0,select.length()-1);
-
- String from = String.format(" FROM %s " , measurement);
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- where = builder.toString();
- } else
- where="";
-
- Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
- QueryResult result = this.influxDB.query(query);
- InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
- return resultMapper.toPOJO(result, resultType);
- }
-
- /**
- * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
- * mapped to the specified result type, compliant with tags dictionary also passed as parameter,
- * and starting from the specified String formated start datetime..
- *
- * @param <T> result unit type
- *
- * @param resultType the type to which found points (records) are mapped
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param columns the Strings List defining the fields to be provided
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param start the String formated date defining the chronological beginning of records in which to search
- *
- * @return the List of resultType typed instances resulting of the search
- */
- public <T> List<T> get(Class<T> resultType, String measurement, List<String> columns, Dictionary<String,String> tags, String start) {
- if(start == null || !isValidDate(start))
- return get(resultType, measurement, columns, tags);
- String select = null;
- if(columns.isEmpty())
- select = "* ";
- else
- select = columns.stream().collect(StringBuilder::new,(b,s)-> {
- b.append(s);
- b.append(",");
- }, (h,t)->h.append(t.toString())).toString();
- select =select.substring(0,select.length()-1);
-
- String from = String.format(" FROM %s " , measurement);
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- builder.append(String.format(" AND time > '%s'", start));
- where = builder.toString();
- } else
- where = String.format(" WHERE time > '%s'", start);
- Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
- QueryResult result = this.influxDB.query(query);
- InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
- return resultMapper.toPOJO(result, resultType);
- }
-
-
- /**
- * Returns the List of records from the InfluxDB measurement whose name is passed as parameter,
- * mapped to the specified result type, compliant with tags dictionary also passed as parameter,
- * and between both String formated start and end datetimes.
- *
- * @param <T> result unit type
- *
- * @param resultType the type to which found points (records) are mapped
- * @param measurement the name of the InfluxDB measurement in which searching the points
- * @param columns the Strings List defining the fields to be provided
- * @param tags the dictionary of tags allowing to parameterize the research
- * @param start the String formated date defining the chronological beginning of records in which to search
- * @param end the String formated date defining the chronological ending of records in which to search
- *
- * @return the List of resultType typed instances resulting of the search
- */
- public <T> List<T> get(Class<T> resultType, String measurement, List<String> columns, Dictionary<String,String> tags, String start, String end) {
- if(end == null || !isValidDate(end))
- return get(resultType, measurement, columns, tags, start);
- String select = null;
- if(columns.isEmpty())
- select = "* ";
- else
- select = columns.stream().collect(StringBuilder::new,(b,s)-> {
- b.append(s);
- b.append(",");
- }, (h,t)->h.append(t.toString())).toString();
- select =select.substring(0,select.length()-1);
-
- String from = String.format(" FROM %s " , measurement);
- if(start == null || !isValidDate(start))
- start = "1970-01-01T00:00:00.001Z";
-
- String where = null;
- if(tags != null && !tags.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- builder.append(" WHERE ");
- for(Iterator<String> it = Collections.list(tags.keys()).iterator();it.hasNext();) {
- String key = it.next();
- builder.append(key);
- builder.append("='");
- builder.append(tags.get(key));
- builder.append("'");
- if(it.hasNext())
- builder.append(" AND ");
- }
- builder.append(String.format(" AND time > '%s' AND time < '%s'", start, end));
- where = builder.toString();
- } else
- where = String.format(" WHERE time > '%s' AND time < '%s'", start, end);
-
- Query query = new Query(String.format("SELECT %s%s%s", select, from, where),database);
- QueryResult result = this.influxDB.query(query);
- InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
- return resultMapper.toPOJO(result, resultType);
- }
+ return result;
+ }
}