protected Set createTopicPartitions()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java [72:94]


    protected Set<TopicPartition> createTopicPartitions(
            Set<String> topics, RangeGenerator generator, int parallelism)
            throws PulsarAdminException {
        Set<TopicPartition> results = new HashSet<>();

        for (String topic : topics) {
            TopicMetadata metadata = queryTopicMetadata(topic);
            if (metadata != null) {
                List<TopicRange> ranges = generator.range(metadata, parallelism);
                if (!metadata.isPartitioned()) {
                    // For non-partitioned topic.
                    results.add(new TopicPartition(metadata.getName(), ranges));
                } else {
                    // For partitioned topic.
                    for (int i = 0; i < metadata.getPartitionSize(); i++) {
                        results.add(new TopicPartition(metadata.getName(), i, ranges));
                    }
                }
            }
        }

        return results;
    }