protected boolean sendValues()

in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java [107:169]


    protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp)
            throws Exception {
        final AtomicInteger updatesCount = new AtomicInteger(0);
        final AtomicInteger updatesConfirmed = new AtomicInteger(0);

        final AtomicReference<Throwable> exception = new AtomicReference<>();

        FutureCallback<ResultSet> callback =
                new FutureCallback<ResultSet>() {
                    @Override
                    public void onSuccess(ResultSet resultSet) {
                        updatesConfirmed.incrementAndGet();
                        if (updatesCount.get() > 0) { // only set if all updates have been sent
                            if (updatesCount.get() == updatesConfirmed.get()) {
                                synchronized (updatesConfirmed) {
                                    updatesConfirmed.notifyAll();
                                }
                            }
                        }
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        if (exception.compareAndSet(null, throwable)) {
                            LOG.error("Error while sending value.", throwable);
                            synchronized (updatesConfirmed) {
                                updatesConfirmed.notifyAll();
                            }
                        }
                    }
                };

        // set values for prepared statement
        int updatesSent = 0;
        for (Row value : values) {
            for (int x = 0; x < arity; x++) {
                fields[x] = value.getField(x);
            }
            // insert values and send to cassandra
            BoundStatement s = preparedStatement.bind(fields);
            s.setDefaultTimestamp(timestamp);
            ResultSetFuture result = session.executeAsync(s);
            updatesSent++;
            if (result != null) {
                // add callback to detect errors
                Futures.addCallback(result, callback);
            }
        }
        updatesCount.set(updatesSent);

        synchronized (updatesConfirmed) {
            while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
                updatesConfirmed.wait();
            }
        }

        if (exception.get() != null) {
            LOG.warn("Sending a value failed.", exception.get());
            return false;
        } else {
            return true;
        }
    }