public boolean isCheckpointCommitted()

in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java [139:159]


    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
        // Pending checkpointed buffers are committed in ascending order of their
        // checkpoint id. This way we can tell if a checkpointed buffer was committed
        // just by asking the third-party storage system for the last checkpoint id
        // committed by the specified subtask.

        Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx);
        if (lastCommittedCheckpoint == null) {
            String statement =
                    String.format(
                            "SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;",
                            keySpace, table, operatorId, subtaskIdx);

            Iterator<Row> resultIt = session.execute(statement).iterator();
            if (resultIt.hasNext()) {
                lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id");
                lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
            }
        }
        return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
    }