in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java [312:409]
public void run(SourceContext<T> sourceContext) throws Exception {
// all subtasks will run a fetcher, regardless of whether or not the subtask will initially
// have
// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so
// all subtasks
// can potentially have new shards to subscribe to later on
KinesisDataFetcher<T> fetcher =
createFetcher(
streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(
KinesisDataFetcher.convertToStreamShardMetadata(shard));
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} is seeding the fetcher with restored shard {},"
+ " starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString(),
sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed
// from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} is seeding the fetcher with new discovered shard {},"
+ " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString());
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(
configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants
.DEFAULT_STREAM_INITIAL_POSITION))
.toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(
kinesisStreamShard.getShardMetadata(),
shard,
startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(),
shard.toString(),
startingSeqNum.get());
}
}
}
// check that we are running before starting the fetcher
if (!running) {
return;
}
// expose the fetcher from this point, so that state
// snapshots can be taken from the fetcher's state holders
this.fetcher = fetcher;
// start the fetcher loop. The fetcher will stop running only when cancel() or
// close() is called, or an error is thrown by threads created by the fetcher
fetcher.runFetcher();
// check that the fetcher has terminated before fully closing
fetcher.awaitTermination();
sourceContext.close();
}