private Set queryTopicsByInternalProtocols()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java [71:108]


    private Set<String> queryTopicsByInternalProtocols() throws PulsarClientException {
        checkNotNull(client, "This subscriber doesn't initialize properly.");

        LookupService lookupService = ((PulsarClientImpl) client).getLookup();
        NamespaceName namespaceName = NamespaceName.get(namespace);
        try {
            // Pulsar 2.11.0 can filter regular expression on broker, but it has a bug which can
            // only be used for wildcard filtering.
            String queryPattern = shortenedPattern.toString();
            if (!queryPattern.endsWith(".*")) {
                queryPattern = null;
            }

            GetTopicsResult topicsResult =
                    lookupService
                            .getTopicsUnderNamespace(
                                    namespaceName, subscriptionMode, queryPattern, null)
                            .get();
            List<String> topics = topicsResult.getTopics();
            Set<String> results = new HashSet<>(topics.size());

            // The regular expression filter may not be enabled in broker.
            // Add the filter here if the result is not filtered.
            for (String topic : topics) {
                if (!isInternal(topic)
                        && (topicsResult.isFiltered() || matchesTopicPattern(topic))) {
                    results.add(topic);
                }
            }

            return results;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        } catch (ExecutionException e) {
            throw PulsarClientException.unwrap(e);
        }
    }