public static List distinctTopics()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java [68:97]


    public static List<String> distinctTopics(List<String> topics) {
        Set<String> fullTopics = new HashSet<>();
        Map<String, Set<Integer>> partitionedTopics = new HashMap<>();

        for (String topic : topics) {
            TopicName topicName = TopicName.get(topic);
            String partitionedTopicName = topicName.getPartitionedTopicName();

            if (!topicName.isPartitioned()) {
                fullTopics.add(partitionedTopicName);
                partitionedTopics.remove(partitionedTopicName);
            } else if (!fullTopics.contains(partitionedTopicName)) {
                Set<Integer> partitionIds =
                        partitionedTopics.computeIfAbsent(
                                partitionedTopicName, k -> new HashSet<>());
                partitionIds.add(topicName.getPartitionIndex());
            }
        }

        ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(fullTopics);

        for (Map.Entry<String, Set<Integer>> topicSet : partitionedTopics.entrySet()) {
            String topicName = topicSet.getKey();
            for (Integer partitionId : topicSet.getValue()) {
                builder.add(topicNameWithPartition(topicName, partitionId));
            }
        }

        return builder.build();
    }