in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java [71:108]
private Set<String> queryTopicsByInternalProtocols() throws PulsarClientException {
checkNotNull(client, "This subscriber doesn't initialize properly.");
LookupService lookupService = ((PulsarClientImpl) client).getLookup();
NamespaceName namespaceName = NamespaceName.get(namespace);
try {
// Pulsar 2.11.0 can filter regular expression on broker, but it has a bug which can
// only be used for wildcard filtering.
String queryPattern = shortenedPattern.toString();
if (!queryPattern.endsWith(".*")) {
queryPattern = null;
}
GetTopicsResult topicsResult =
lookupService
.getTopicsUnderNamespace(
namespaceName, subscriptionMode, queryPattern, null)
.get();
List<String> topics = topicsResult.getTopics();
Set<String> results = new HashSet<>(topics.size());
// The regular expression filter may not be enabled in broker.
// Add the filter here if the result is not filtered.
for (String topic : topics) {
if (!isInternal(topic)
&& (topicsResult.isFiltered() || matchesTopicPattern(topic))) {
results.add(topic);
}
}
return results;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}