Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
package edu.ie3.datamodel.io.connectors;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Pong;
Expand All @@ -31,43 +31,72 @@ public class InfluxDbConnector implements DataConnector {
return maps;
};

private static final String INFLUXDB_URL = "http://localhost:8086/";
private static final String INFLUXDB_DATABASE_NAME = "ie3_in";
private final String databaseName;
private final String scenarioName;
private final String url;
private final InfluxDB session;

/**
* Initializes a new InfluxDbConnector with the given url, databaseName and scenario name.
*
* @param url the connection url for the influxDB database
* @param databaseName the name of the database to which the connection should be established
* @param scenarioName the name of the simulation scenario which will be used in influxDB
* measurement names
* @param databaseName the name of the database the session should be set to
* @param createDb true if the connector should create the database if it doesn't exist yet, false
* otherwise
* @param logLevel log level of the {@link InfluxDB.LogLevel} logger
* @param batchOptions write options to write batch operations
*/
public InfluxDbConnector(String url, String databaseName, String scenarioName) {
this.url = url;
this.databaseName = databaseName;
public InfluxDbConnector(
String url,
String databaseName,
String scenarioName,
boolean createDb,
InfluxDB.LogLevel logLevel,
BatchOptions batchOptions) {
this(InfluxDBFactory.connect(url), scenarioName, databaseName, createDb);
this.session.setLogLevel(logLevel);
this.session.enableBatch(batchOptions);
}

/**
* Initializes a new InfluxDbConnector with the given influxDb session, the databaseName and
* scenario name.
*
* @param session the influxdb session that should be managed by this connector
* @param scenarioName the name of the scenario
* @param databaseName the name of the database the session should be set to
* @param createDb true if the connector should create the database if it doesn't exist yet, false
* otherwise
*/
public InfluxDbConnector(
InfluxDB session, String scenarioName, String databaseName, boolean createDb) {
this.scenarioName = scenarioName;
this.session = session;

session.setDatabase(databaseName);
if (createDb) createDb(databaseName);
}

/**
* Initializes a new InfluxDbConnector with the given url and databaseName and no scenario name.
* Consider using a scenario name if you plan to persist results using this connector.
* Initializes a new InfluxDbConnector with the given url, databaseName and scenario name.
*
* @param url the connection url for the influxDB database
* @param databaseName the name of the database to which the connection should be established
* @param scenarioName the name of the simulation scenario which will be used in influxDB
* measurement names
*/
public InfluxDbConnector(String url, String databaseName) {
this(url, databaseName, null);
public InfluxDbConnector(String url, String databaseName, String scenarioName) {
this(url, databaseName, scenarioName, true, InfluxDB.LogLevel.NONE, BatchOptions.DEFAULTS);
}

/**
* Initializes a new InfluxDbConnector with the default URL {@link #INFLUXDB_URL}, database name
* {@link #INFLUXDB_DATABASE_NAME} and no scenario name
* Create the database of this connector if it doesn't exist yet
*
* @param databaseName the name of the database that should be created
* @return the result of the create database query
*/
public InfluxDbConnector() {
this(INFLUXDB_URL, INFLUXDB_DATABASE_NAME);
public final QueryResult createDb(String databaseName) {
return session.query(new Query("CREATE DATABASE " + databaseName, databaseName));
}

/**
Expand All @@ -76,35 +105,23 @@ public InfluxDbConnector() {
* @return true, if the database returned the ping
*/
public Boolean isConnectionValid() {
InfluxDB session = getSession();
if (session == null) return false;
Pong response = session.ping();
session.close();
return !response.getVersion().equalsIgnoreCase("unknown");
}

@Override
public void shutdown() {
// no cleanup actions necessary
}

public String getDatabaseName() {
return databaseName;
session.close(); // release async writing resources and flushes the batch if batch is enabled
// (blocking!)
}

/**
* Creates a session using the given connection parameters. If no database with the given name
* exists, one is created.
* Return the session of this connector
*
* @return Autocloseable InfluxDB session
* @return influx db session
*/
public InfluxDB getSession() {
InfluxDB session;
session = InfluxDBFactory.connect(url);
session.setDatabase(databaseName);
session.query(new Query("CREATE DATABASE " + databaseName, databaseName));
session.setLogLevel(InfluxDB.LogLevel.NONE);
session.enableBatch(100000, 5, TimeUnit.SECONDS);
return session;
}

Expand Down
17 changes: 10 additions & 7 deletions src/main/java/edu/ie3/datamodel/io/sink/InfluxDbSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

Expand Down Expand Up @@ -90,6 +89,14 @@ public <E extends TimeSeriesEntry<V>, V extends Value> void persistTimeSeries(
writeAll(points);
}

/**
* If batch writing is enabled, this call writes everything inside the batch to the database. This
* will block until all pending points are written.
*/
public void flush() {
if (connector.getSession().isBatchEnabled()) connector.getSession().flush();
}

/**
* Transforms a ResultEntity to an influxDB data point. <br>
* As the equivalent to a relational table, the influxDB measurement point will be named using the
Expand Down Expand Up @@ -254,9 +261,7 @@ private <C extends UniqueEntity> Set<Point> extractPoints(C entity) {
*/
private void write(Point point) {
if (point == null) return;
try (InfluxDB session = connector.getSession()) {
session.write(point);
}
connector.getSession().write(point);
}

/**
Expand All @@ -267,9 +272,7 @@ private void write(Point point) {
private void writeAll(Collection<Point> points) {
if (points.isEmpty()) return;
BatchPoints batchPoints = BatchPoints.builder().points(points).build();
try (InfluxDB session = connector.getSession()) {
session.write(batchPoints);
}
connector.getSession().write(batchPoints);
}

/**
Expand Down
15 changes: 14 additions & 1 deletion src/test/groovy/edu/ie3/datamodel/io/sink/InfluxDbSinkIT.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class InfluxDbSinkIT extends Specification {
InfluxDBContainer influxDbContainer = new InfluxDBContainer("latest")
.withAuthEnabled(false)
.withDatabase("test_out")
.withExposedPorts(8086)
.withExposedPorts(8086) as InfluxDBContainer

@Shared
InfluxDbConnector connector
Expand Down Expand Up @@ -72,6 +72,7 @@ class InfluxDbSinkIT extends Specification {
null)
when:
sink.persist(lineResult1)
sink.flush()
def key = fileNamingStrategy.getFileName(LineResult).get().trim().replaceAll("\\W", "_")
def queryResult = connector.getSession().query(new Query("SELECT * FROM " + key))
def parsedResults = InfluxDbConnector.parseQueryResult(queryResult)
Expand Down Expand Up @@ -180,6 +181,8 @@ class InfluxDbSinkIT extends Specification {
when:
sinkWithEmptyNamingStrategy.persist(lineResult1)
sinkWithEmptyNamingStrategy.persist(timeSeries)
sinkWithEmptyNamingStrategy.flush()

def key_lineresult = lineResult1.getClass().getSimpleName()
def key_timeseries = timeSeries.getEntries().iterator().next().getValue().getClass().getSimpleName()
def queryResult = connector.getSession().query(new Query("SELECT * FROM " + key_lineresult))
Expand Down Expand Up @@ -210,6 +213,16 @@ class InfluxDbSinkIT extends Specification {
queryResult.getResults().get(0).getSeries() == null
}

def "An InfluxDbSink should terminate the corresponding session inside its connector correctly"() {
when:
sink.shutdown()

then:
// after shutdown the batch processor must be disabled and empty
!sink.connector.getSession().batchEnabled
sink.connector.getSession().batchProcessor.queue.isEmpty()
}


static def mapMatchesLineResultEntity(Map<String, String> fieldMap, LineResult lineResult) {
def timeUtil = new TimeUtil(ZoneId.of("UTC"), Locale.GERMANY, "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'")
Expand Down