blob: 91699a22abf8110497e51f32458601f9d102d4fc [file] [log] [blame]
/*
* Copyright (c) 2021 Kentyou.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Kentyou - initial API and implementation
*/
package org.eclipse.sensinact.gateway.agent.storage.influxdb;
import java.util.Hashtable;
import org.eclipse.sensinact.gateway.agent.storage.influxdb.write.InfluxDBStorageConnection;
import org.eclipse.sensinact.gateway.common.bundle.Mediator;
import org.eclipse.sensinact.gateway.core.message.AgentRelay;
import org.eclipse.sensinact.gateway.historic.storage.agent.generic.StorageAgent;
import org.eclipse.sensinact.gateway.historic.storage.reader.api.HistoricProvider;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnector;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbConnectorConfiguration;
import org.eclipse.sensinact.gateway.tools.connector.influxdb.InfluxDbDatabase;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@Component(immediate=true, service = {AgentRelay.class})
public class InfluxDBStorageAgent extends StorageAgent {
private static final String INFLUX_AGENT_SCHEME_PROPS = "org.eclipse.sensinact.gateway.history.influx.scheme";
private static final String INFLUX_AGENT_HOST_PROPS = "org.eclipse.sensinact.gateway.history.influx.host";
private static final String INFLUX_AGENT_PORT_PROPS = "org.eclipse.sensinact.gateway.history.influx.port";
private static final String INFLUX_AGENT_PATH_PROPS = "org.eclipse.sensinact.gateway.history.influx.path";
private static final String INFLUX_AGENT_LOGIN_PROPS = "org.eclipse.sensinact.gateway.history.influx.login";
private static final String INFLUX_AGENT_PASSWORD_PROPS = "org.eclipse.sensinact.gateway.history.influx.password";
private static final String INFLUX_AGENT_DB_PROPS = "org.eclipse.sensinact.gateway.history.influx.database";
private static final String INFLUX_AGENT_MEASUREMENT_PROPS = "org.eclipse.sensinact.gateway.history.influx.measurement";
private static final String INFLUX_AGENT_DEFAULT_MEASUREMENT = "test";
private static final String DEFAULT_DATABASE = "sensinact";
private Mediator mediator;
private InfluxDbConnector connector;
private String measurement;
private InfluxDbDatabase database;
@Activate
public void activate(ComponentContext context) {
BundleContext bc = context.getBundleContext();
this.mediator = new Mediator(bc);
String scheme = (String) mediator.getProperty(INFLUX_AGENT_SCHEME_PROPS);
if(scheme == null)
scheme = InfluxDbConnectorConfiguration.DEFAULT_SCHEME;
String host = (String) mediator.getProperty(INFLUX_AGENT_HOST_PROPS);
if(host == null)
host = InfluxDbConnectorConfiguration.DEFAULT_HOST;
int port = -1;
String portStr = (String) mediator.getProperty(INFLUX_AGENT_PORT_PROPS);
if(portStr == null)
port = InfluxDbConnectorConfiguration.DEFAULT_PORT;
else
port = Integer.parseInt(portStr);
String path = (String) mediator.getProperty(INFLUX_AGENT_PATH_PROPS);
if(path == null)
path = InfluxDbConnectorConfiguration.DEFAULT_PATH;
String db = (String) mediator.getProperty(INFLUX_AGENT_DB_PROPS);
if(db == null)
db = DEFAULT_DATABASE;
String username = (String) mediator.getProperty(INFLUX_AGENT_LOGIN_PROPS);
String password = (String) mediator.getProperty(INFLUX_AGENT_PASSWORD_PROPS);
InfluxDbConnectorConfiguration configuration = new InfluxDbConnectorConfiguration.Builder(
).withScheme(scheme
).withHost(host
).withPort(port
).withPath(path
).withUsername(username
).withPassword(password
).build();
try {
this.connector = new InfluxDbConnector(configuration);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
this.measurement = (String) mediator.getProperty(INFLUX_AGENT_MEASUREMENT_PROPS);
if(this.measurement == null)
this.measurement = INFLUX_AGENT_DEFAULT_MEASUREMENT;
this.database = this.connector.createIfNotExists(db);
InfluxDBHistoricProvider provider = new InfluxDBHistoricProvider(connector, db, this.measurement);
this.mediator.register(new Hashtable(), provider,new Class[] { HistoricProvider.class});
super.setStorageKeys((String) mediator.getProperty(STORAGE_AGENT_KEYS_PROPS));
super.setStorageConnection(new InfluxDBStorageConnection(mediator, database, this.measurement));
}
@Deactivate
public void deactivate() {
super.stop();
}
}