flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java [143:167]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                fields[x] = value.getField(x);
            }
            // insert values and send to cassandra
            BoundStatement s = preparedStatement.bind(fields);
            s.setDefaultTimestamp(timestamp);
            ResultSetFuture result = session.executeAsync(s);
            updatesSent++;
            if (result != null) {
                // add callback to detect errors
                Futures.addCallback(result, callback);
            }
        }
        updatesCount.set(updatesSent);

        synchronized (updatesConfirmed) {
            while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
                updatesConfirmed.wait();
            }
        }

        if (exception.get() != null) {
            LOG.warn("Sending a value failed.", exception.get());
            return false;
        } else {
            return true;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java [143:167]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                fields[x] = value.getField(x);
            }
            // insert values and send to cassandra
            BoundStatement s = preparedStatement.bind(fields);
            s.setDefaultTimestamp(timestamp);
            ResultSetFuture result = session.executeAsync(s);
            updatesSent++;
            if (result != null) {
                // add callback to detect errors
                Futures.addCallback(result, callback);
            }
        }
        updatesCount.set(updatesSent);

        synchronized (updatesConfirmed) {
            while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
                updatesConfirmed.wait();
            }
        }

        if (exception.get() != null) {
            LOG.warn("Sending a value failed.", exception.get());
            return false;
        } else {
            return true;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



