public void open()

in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java [89:109]


    public void open(Configuration configuration) {
        this.callback =
                new FutureCallback<V>() {
                    @Override
                    public void onSuccess(V ignored) {
                        semaphore.release();
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        throwable.compareAndSet(null, t);
                        log.error("Error while sending value.", t);
                        semaphore.release();
                    }
                };
        this.cluster = builder.getCluster();
        this.session = createSession();

        throwable = new AtomicReference<>();
        semaphore = new Semaphore(config.getMaxConcurrentRequests());
    }