in flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java [139:159]
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
// Pending checkpointed buffers are committed in ascending order of their
// checkpoint id. This way we can tell if a checkpointed buffer was committed
// just by asking the third-party storage system for the last checkpoint id
// committed by the specified subtask.
Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx);
if (lastCommittedCheckpoint == null) {
String statement =
String.format(
"SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;",
keySpace, table, operatorId, subtaskIdx);
Iterator<Row> resultIt = session.execute(statement).iterator();
if (resultIt.hasNext()) {
lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id");
lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
}
}
return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
}