in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisConsumer.java [423:463]
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source; returning null.");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotting state ...");
}
sequenceNumsStateForCheckpoint.clear();
if (fetcher == null) {
if (sequenceNumsToRestore != null) {
for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
// sequenceNumsToRestore is the restored global union state;
// should only snapshot shards that actually belong to us
int hashCode = shardAssigner.assign(
KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()),
getRuntimeContext().getNumberOfParallelSubtasks());
if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
hashCode,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask())) {
sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey().getShardMetadata(), entry.getValue()));
}
}
}
} else {
HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
lastStateSnapshot, context.getCheckpointId(), context.getCheckpointTimestamp());
}
for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}
}