in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java [72:94]
protected Set<TopicPartition> createTopicPartitions(
Set<String> topics, RangeGenerator generator, int parallelism)
throws PulsarAdminException {
Set<TopicPartition> results = new HashSet<>();
for (String topic : topics) {
TopicMetadata metadata = queryTopicMetadata(topic);
if (metadata != null) {
List<TopicRange> ranges = generator.range(metadata, parallelism);
if (!metadata.isPartitioned()) {
// For non-partitioned topic.
results.add(new TopicPartition(metadata.getName(), ranges));
} else {
// For partitioned topic.
for (int i = 0; i < metadata.getPartitionSize(); i++) {
results.add(new TopicPartition(metadata.getName(), i, ranges));
}
}
}
}
return results;
}