public List discoverNewShardsToSubscribe()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [921:948]


    public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {

        List<StreamShardHandle> newShardsToSubscribe = new LinkedList<>();

        GetShardListResult shardListResult =
                kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
        if (shardListResult.hasRetrievedShards()) {
            Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();

            for (String stream : streamsWithNewShards) {
                List<StreamShardHandle> newShardsOfStream =
                        shardListResult.getRetrievedShardListOfStream(stream);
                for (StreamShardHandle newShard : newShardsOfStream) {
                    int hashCode = shardAssigner.assign(newShard, totalNumberOfConsumerSubtasks);
                    if (isThisSubtaskShouldSubscribeTo(
                            hashCode, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
                        newShardsToSubscribe.add(newShard);
                    }
                }

                advanceLastDiscoveredShardOfStream(
                        stream,
                        shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
            }
        }

        return newShardsToSubscribe;
    }