in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java [210:244]
private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable throwable) {
if (throwable != null) {
throw new FlinkRuntimeException(
"Failed to list subscribed topic partitions due to: " + throwable.getMessage(),
throwable);
}
// Append the partitions into current assignment state twice,
// because the getSubscribedTopicPartitions method is executed in another thread.
List<TopicPartition> newPartitions =
splitAssigner.registerTopicPartitions(fetchedPartitions);
// Create subscription on newly discovered topic partitions if it doesn't contain related
// subscription.
for (TopicPartition partition : newPartitions) {
String topic = partition.getFullTopicName();
String subscriptionName = sourceConfiguration.getSubscriptionName();
CursorPosition position =
startCursor.position(partition.getTopic(), partition.getPartitionId());
try {
if (sourceConfiguration.isResetSubscriptionCursor()) {
position.seekPosition(pulsarAdmin, topic, subscriptionName);
} else {
position.createInitialPosition(pulsarAdmin, topic, subscriptionName);
}
} catch (PulsarAdminException e) {
throw new FlinkRuntimeException(e);
}
}
// Assign the new readers.
List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
assignPendingPartitionSplits(registeredReaders);
}