in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java [208:235]
private void updateTopicMetadata() throws PulsarAdminException {
ImmutableList.Builder<TopicPartition> parititonsBuilder = ImmutableList.builder();
for (String topic : topics) {
Optional<TopicMetadata> optionalMetadata = queryTopicMetadata(topic);
if (optionalMetadata.isPresent()) {
TopicMetadata metadata = optionalMetadata.get();
int partitionSize = metadata.getPartitionSize();
if (metadata.isPartitioned()) {
for (int i = 0; i < partitionSize; i++) {
parititonsBuilder.add(new TopicPartition(topic, i));
}
} else {
parititonsBuilder.add(new TopicPartition(topic));
}
}
}
for (String partition : partitions) {
TopicName topicName = TopicName.get(partition);
String name = topicName.getPartitionedTopicName();
int index = topicName.getPartitionIndex();
parititonsBuilder.add(new TopicPartition(name, index));
}
this.availablePartitions = parititonsBuilder.build();
}