in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [921:948]
public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
List<StreamShardHandle> newShardsToSubscribe = new LinkedList<>();
GetShardListResult shardListResult =
kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
if (shardListResult.hasRetrievedShards()) {
Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
for (String stream : streamsWithNewShards) {
List<StreamShardHandle> newShardsOfStream =
shardListResult.getRetrievedShardListOfStream(stream);
for (StreamShardHandle newShard : newShardsOfStream) {
int hashCode = shardAssigner.assign(newShard, totalNumberOfConsumerSubtasks);
if (isThisSubtaskShouldSubscribeTo(
hashCode, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
newShardsToSubscribe.add(newShard);
}
}
advanceLastDiscoveredShardOfStream(
stream,
shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
}
}
return newShardsToSubscribe;
}