in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java [49:69]
public List<TopicMetadata> topicsMetadata() {
List<TopicMetadata> metadataList = Lists.newArrayList();
topics.forEach(topic -> {
try {
int partitions;
partitions = admin.topics().getPartitionedTopicMetadata(topic).partitions;
if (partitions > 0) {
for (int partition = 0; partition < partitions; partition++) {
String topicName = TopicName.get(topic).getPartition(partition).toString();
metadataList.add(new PulsarTopicMetadata(hostUrl, port, topicName));
}
} else {
metadataList.add(new PulsarTopicMetadata(hostUrl, port, topic));
}
} catch (PulsarAdminException e) {
log.error("Failed to get partitioned metadata for {}", topic, e);
throw new RuntimeException("Failed to get partitioned-metadata", e);
}
});
return metadataList;
}