in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java [89:109]
public void open(Configuration configuration) {
this.callback =
new FutureCallback<V>() {
@Override
public void onSuccess(V ignored) {
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
throwable.compareAndSet(null, t);
log.error("Error while sending value.", t);
semaphore.release();
}
};
this.cluster = builder.getCluster();
this.session = createSession();
throwable = new AtomicReference<>();
semaphore = new Semaphore(config.getMaxConcurrentRequests());
}