in flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java [52:76]
public void open(Configuration parameters) throws Exception {
super.open(parameters);
influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword());
if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) {
if(influxDBConfig.isCreateDatabase()) {
influxDBClient.createDatabase(influxDBConfig.getDatabase());
}
else {
throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!");
}
}
influxDBClient.setDatabase(influxDBConfig.getDatabase());
if (influxDBConfig.getBatchActions() > 0) {
influxDBClient.enableBatch(influxDBConfig.getBatchActions(), influxDBConfig.getFlushDuration(), influxDBConfig.getFlushDurationTimeUnit());
}
if (influxDBConfig.isEnableGzip()) {
influxDBClient.enableGzip();
}
}