blob: c0831c7e0a896b71da0aa5f5e600282de51ca920 [file] [log] [blame]
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.eclipse.sensinact.gateway.agent.storage.influxdb.internal;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.agent.storage.generic.StorageConnection;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Pong;
import org.json.JSONObject;
/**
*
* @author kleisar
*/
public class InfluxStorageConnection extends StorageConnection {
private InfluxDB influxDB;
private BatchPoints batchPoints;
protected String databaseURL;
public InfluxStorageConnection(Mediator mediator, String databaseURL, String userName, String password) throws IOException {
super(mediator, userName, password);
this.databaseURL = databaseURL;
this.connect();
}
private void insert(String provider, String service, String resource, Object value) {
System.out.println("insert " + provider + ", " + service + ", " + resource + " = " + value);
Point point = null;
Builder builder = Point.measurement("sensiNact")
.tag("provider", provider)
.tag("service", service)
.tag("resource", resource);
if (value instanceof String) {
point = builder.addField("value", (String) value).build();
} else if (value instanceof Double) {
point = builder.addField("value", (Double) value).build();
} else if (value instanceof Float) {
point = builder.addField("value", (Float) value).build();
} else if (value instanceof Integer) {
point = builder.addField("value", (Integer) value).build();
}
influxDB.write(point);
//batchPoints.point(point);
}
@Override
public void sendRequest(JSONObject object) {
try {
String device = object.getString("device");
String service = object.getString("service");
String resource = object.getString("resource");
Object value = object.get("value");
this.insert(device, service, resource, value);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Creates a connection and returns true if it has been done properly.
* Returns false otherwise
*
* @return <ul>
* <li>true if the connection has been properly done</li>
* <li>false otherwise</li>
* </ul>
*/
protected boolean connect() {
influxDB = InfluxDBFactory.connect(this.databaseURL);
influxDB.setDatabase("sensiNact");
influxDB.setRetentionPolicy("autogen");
Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
System.out.println("Error pinging server.");
return false;
} else {
// this.influxDB.query(new Query("CREATE DATABASE sensiNact", ""));
// batchPoints = BatchPoints
// .database("sensiNact")
// .retentionPolicy("defaultPolicy")
// .build();
}
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
influxDB.enableBatch(100, 5000, TimeUnit.MILLISECONDS); //wait 5s or 100Points, whichever occurs first before sending the points to influxdb
return true;
}
}