in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java [482:531]
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source; returning null.");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotting state ...");
}
sequenceNumsStateForCheckpoint.clear();
if (fetcher == null) {
if (sequenceNumsToRestore != null) {
for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry :
sequenceNumsToRestore.entrySet()) {
// sequenceNumsToRestore is the restored global union state;
// should only snapshot shards that actually belong to us
int hashCode =
shardAssigner.assign(
KinesisDataFetcher.convertToStreamShardHandle(
entry.getKey().getShardMetadata()),
getRuntimeContext().getNumberOfParallelSubtasks());
if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
hashCode,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask())) {
sequenceNumsStateForCheckpoint.add(
Tuple2.of(entry.getKey().getShardMetadata(), entry.getValue()));
}
}
}
} else {
HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot =
fetcher.snapshotState();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
lastStateSnapshot,
context.getCheckpointId(),
context.getCheckpointTimestamp());
}
for (Map.Entry<StreamShardMetadata, SequenceNumber> entry :
lastStateSnapshot.entrySet()) {
sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}
}