in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisConsumer.java [284:355]
public void run(SourceContext<T> sourceContext) throws Exception {
// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
// can potentially have new shards to subscribe to later on
KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
" starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
}
}
}
// check that we are running before starting the fetcher
if (!running) {
return;
}
// expose the fetcher from this point, so that state
// snapshots can be taken from the fetcher's state holders
this.fetcher = fetcher;
// start the fetcher loop. The fetcher will stop running only when cancel() or
// close() is called, or an error is thrown by threads created by the fetcher
fetcher.runFetcher();
// check that the fetcher has terminated before fully closing
fetcher.awaitTermination();
sourceContext.close();
}