in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java [127:177]
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing offsets for checkpoint {}", checkpointId);
if (!commitOffsetsOnCheckpoint) {
return;
}
Map<TopicPartition, OffsetAndMetadata> committedPartitions =
offsetsToCommit.get(checkpointId);
if (committedPartitions == null) {
LOG.debug(
"Offsets for checkpoint {} either do not exist or have already been committed.",
checkpointId);
return;
}
((KafkaSourceFetcherManager) splitFetcherManager)
.commitOffsets(
committedPartitions,
(ignored, e) -> {
// The offset commit here is needed by the external monitoring. It won't
// break Flink job's correctness if we fail to commit the offset here.
if (e != null) {
kafkaSourceReaderMetrics.recordFailedCommit();
LOG.warn(
"Failed to commit consumer offsets for checkpoint {}",
checkpointId,
e);
} else {
LOG.debug(
"Successfully committed offsets for checkpoint {}",
checkpointId);
kafkaSourceReaderMetrics.recordSucceededCommit();
// If the finished topic partition has been committed, we remove it
// from the offsets of the finished splits map.
committedPartitions.forEach(
(tp, offset) ->
kafkaSourceReaderMetrics.recordCommittedOffset(
tp, offset.offset()));
offsetsOfFinishedSplits
.entrySet()
.removeIf(
entry ->
committedPartitions.containsKey(
entry.getKey()));
while (!offsetsToCommit.isEmpty()
&& offsetsToCommit.firstKey() <= checkpointId) {
offsetsToCommit.remove(offsetsToCommit.firstKey());
}
}
});
}