public final void snapshotState()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java [1010:1061]


    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                        subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
                        currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    kafkaTopicPartitionLongEntry.getKey(),
                                    kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }