public void snapshotState()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java [491:540]


    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (closed) {
            LOG.debug("snapshotState() called on closed source; returning null.");
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Snapshotting state ...");
            }

            sequenceNumsStateForCheckpoint.clear();

            if (fetcher == null) {
                if (sequenceNumsToRestore != null) {
                    for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry :
                            sequenceNumsToRestore.entrySet()) {
                        // sequenceNumsToRestore is the restored global union state;
                        // should only snapshot shards that actually belong to us
                        int hashCode =
                                shardAssigner.assign(
                                        KinesisDataFetcher.convertToStreamShardHandle(
                                                entry.getKey().getShardMetadata()),
                                        getRuntimeContext().getNumberOfParallelSubtasks());
                        if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
                                hashCode,
                                getRuntimeContext().getNumberOfParallelSubtasks(),
                                getRuntimeContext().getIndexOfThisSubtask())) {

                            sequenceNumsStateForCheckpoint.add(
                                    Tuple2.of(entry.getKey().getShardMetadata(), entry.getValue()));
                        }
                    }
                }
            } else {
                HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot =
                        fetcher.snapshotState();

                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
                            lastStateSnapshot,
                            context.getCheckpointId(),
                            context.getCheckpointTimestamp());
                }

                for (Map.Entry<StreamShardMetadata, SequenceNumber> entry :
                        lastStateSnapshot.entrySet()) {
                    sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
                }
            }
        }
    }