in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java [50:70]
protected TopicMetadata queryTopicMetadata(String topic) throws PulsarAdminException {
if (NON_PARTITIONED_TOPICS.contains(topic)) {
return new TopicMetadata(topic, NON_PARTITIONED);
}
try {
PartitionedTopicMetadata metadata = admin.topics().getPartitionedTopicMetadata(topic);
if (metadata.partitions == NON_PARTITIONED) {
NON_PARTITIONED_TOPICS.add(topic);
}
return new TopicMetadata(topic, metadata.partitions);
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
// Return null for skipping the topic metadata query.
return null;
} else {
// This method would cause failure for subscribers.
throw e;
}
}
}