flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java [108:140]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            throws Exception {
        final AtomicInteger updatesCount = new AtomicInteger(0);
        final AtomicInteger updatesConfirmed = new AtomicInteger(0);

        final AtomicReference<Throwable> exception = new AtomicReference<>();

        FutureCallback<ResultSet> callback =
                new FutureCallback<ResultSet>() {
                    @Override
                    public void onSuccess(ResultSet resultSet) {
                        updatesConfirmed.incrementAndGet();
                        if (updatesCount.get() > 0) { // only set if all updates have been sent
                            if (updatesCount.get() == updatesConfirmed.get()) {
                                synchronized (updatesConfirmed) {
                                    updatesConfirmed.notifyAll();
                                }
                            }
                        }
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        if (exception.compareAndSet(null, throwable)) {
                            LOG.error("Error while sending value.", throwable);
                            synchronized (updatesConfirmed) {
                                updatesConfirmed.notifyAll();
                            }
                        }
                    }
                };

        // set values for prepared statement
        int updatesSent = 0;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java [108:140]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            throws Exception {
        final AtomicInteger updatesCount = new AtomicInteger(0);
        final AtomicInteger updatesConfirmed = new AtomicInteger(0);

        final AtomicReference<Throwable> exception = new AtomicReference<>();

        FutureCallback<ResultSet> callback =
                new FutureCallback<ResultSet>() {
                    @Override
                    public void onSuccess(ResultSet resultSet) {
                        updatesConfirmed.incrementAndGet();
                        if (updatesCount.get() > 0) { // only set if all updates have been sent
                            if (updatesCount.get() == updatesConfirmed.get()) {
                                synchronized (updatesConfirmed) {
                                    updatesConfirmed.notifyAll();
                                }
                            }
                        }
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        if (exception.compareAndSet(null, throwable)) {
                            LOG.error("Error while sending value.", throwable);
                            synchronized (updatesConfirmed) {
                                updatesConfirmed.notifyAll();
                            }
                        }
                    }
                };

        // set values for prepared statement
        int updatesSent = 0;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



