public void open()

in flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java [52:76]


    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword());

        if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) {
            if(influxDBConfig.isCreateDatabase()) {
                influxDBClient.createDatabase(influxDBConfig.getDatabase());
            }
            else {
                throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!");
            }
        }

        influxDBClient.setDatabase(influxDBConfig.getDatabase());

        if (influxDBConfig.getBatchActions() > 0) {
            influxDBClient.enableBatch(influxDBConfig.getBatchActions(), influxDBConfig.getFlushDuration(), influxDBConfig.getFlushDurationTimeUnit());
        }

        if (influxDBConfig.isEnableGzip()) {

            influxDBClient.enableGzip();
        }
    }