public void notifyCheckpointComplete()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java [127:177]


    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing offsets for checkpoint {}", checkpointId);
        if (!commitOffsetsOnCheckpoint) {
            return;
        }

        Map<TopicPartition, OffsetAndMetadata> committedPartitions =
                offsetsToCommit.get(checkpointId);
        if (committedPartitions == null) {
            LOG.debug(
                    "Offsets for checkpoint {} either do not exist or have already been committed.",
                    checkpointId);
            return;
        }

        ((KafkaSourceFetcherManager) splitFetcherManager)
                .commitOffsets(
                        committedPartitions,
                        (ignored, e) -> {
                            // The offset commit here is needed by the external monitoring. It won't
                            // break Flink job's correctness if we fail to commit the offset here.
                            if (e != null) {
                                kafkaSourceReaderMetrics.recordFailedCommit();
                                LOG.warn(
                                        "Failed to commit consumer offsets for checkpoint {}",
                                        checkpointId,
                                        e);
                            } else {
                                LOG.debug(
                                        "Successfully committed offsets for checkpoint {}",
                                        checkpointId);
                                kafkaSourceReaderMetrics.recordSucceededCommit();
                                // If the finished topic partition has been committed, we remove it
                                // from the offsets of the finished splits map.
                                committedPartitions.forEach(
                                        (tp, offset) ->
                                                kafkaSourceReaderMetrics.recordCommittedOffset(
                                                        tp, offset.offset()));
                                offsetsOfFinishedSplits
                                        .entrySet()
                                        .removeIf(
                                                entry ->
                                                        committedPartitions.containsKey(
                                                                entry.getKey()));
                                while (!offsetsToCommit.isEmpty()
                                        && offsetsToCommit.firstKey() <= checkpointId) {
                                    offsetsToCommit.remove(offsetsToCommit.firstKey());
                                }
                            }
                        });
    }