in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java [68:97]
public static List<String> distinctTopics(List<String> topics) {
Set<String> fullTopics = new HashSet<>();
Map<String, Set<Integer>> partitionedTopics = new HashMap<>();
for (String topic : topics) {
TopicName topicName = TopicName.get(topic);
String partitionedTopicName = topicName.getPartitionedTopicName();
if (!topicName.isPartitioned()) {
fullTopics.add(partitionedTopicName);
partitionedTopics.remove(partitionedTopicName);
} else if (!fullTopics.contains(partitionedTopicName)) {
Set<Integer> partitionIds =
partitionedTopics.computeIfAbsent(
partitionedTopicName, k -> new HashSet<>());
partitionIds.add(topicName.getPartitionIndex());
}
}
ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(fullTopics);
for (Map.Entry<String, Set<Integer>> topicSet : partitionedTopics.entrySet()) {
String topicName = topicSet.getKey();
for (Integer partitionId : topicSet.getValue()) {
builder.add(topicNameWithPartition(topicName, partitionId));
}
}
return builder.build();
}