in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java [62:82]
public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
List<TopicPartition> newPartitions = new ArrayList<>();
for (TopicPartition partition : fetchedPartitions) {
if (!appendedPartitions.contains(partition)) {
appendedPartitions.add(partition);
newPartitions.add(partition);
// Calculate the reader id by the current parallelism.
int readerId = partitionOwner(partition);
PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
addSplitToPendingList(readerId, split);
}
}
if (!initialized) {
initialized = true;
}
return newPartitions;
}