public void snapshotState()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisConsumer.java [423:463]


	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		if (!running) {
			LOG.debug("snapshotState() called on closed source; returning null.");
		} else {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Snapshotting state ...");
			}

			sequenceNumsStateForCheckpoint.clear();

			if (fetcher == null) {
				if (sequenceNumsToRestore != null) {
					for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
						// sequenceNumsToRestore is the restored global union state;
						// should only snapshot shards that actually belong to us
						int hashCode = shardAssigner.assign(
							KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()),
							getRuntimeContext().getNumberOfParallelSubtasks());
						if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
								hashCode,
								getRuntimeContext().getNumberOfParallelSubtasks(),
								getRuntimeContext().getIndexOfThisSubtask())) {

							sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey().getShardMetadata(), entry.getValue()));
						}
					}
				}
			} else {
				HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();

				if (LOG.isDebugEnabled()) {
					LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
						lastStateSnapshot, context.getCheckpointId(), context.getCheckpointTimestamp());
				}

				for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
					sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
				}
			}
		}
	}