in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java [126:177]
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing offsets for checkpoint {}", checkpointId);
if (!commitOffsetsOnCheckpoint) {
return;
}
Map<TopicPartition, OffsetAndMetadata> committedPartitions =
offsetsToCommit.get(checkpointId);
if (committedPartitions == null) {
LOG.debug("Offsets for checkpoint {} have already been committed.", checkpointId);
return;
}
if (committedPartitions.isEmpty()) {
LOG.debug("There are no offsets to commit for checkpoint {}.", checkpointId);
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
return;
}
((KafkaSourceFetcherManager) splitFetcherManager)
.commitOffsets(
committedPartitions,
(ignored, e) -> {
// The offset commit here is needed by the external monitoring. It won't
// break Flink job's correctness if we fail to commit the offset here.
if (e != null) {
kafkaSourceReaderMetrics.recordFailedCommit();
LOG.warn(
"Failed to commit consumer offsets for checkpoint {}",
checkpointId,
e);
} else {
LOG.debug(
"Successfully committed offsets for checkpoint {}",
checkpointId);
kafkaSourceReaderMetrics.recordSucceededCommit();
// If the finished topic partition has been committed, we remove it
// from the offsets of the finished splits map.
committedPartitions.forEach(
(tp, offset) ->
kafkaSourceReaderMetrics.recordCommittedOffset(
tp, offset.offset()));
offsetsOfFinishedSplits
.entrySet()
.removeIf(
entry ->
committedPartitions.containsKey(
entry.getKey()));
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
}
});
}