in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java [111:139]
public void start() {
subscriber.open(pulsarClient, pulsarAdmin);
rangeGenerator.open(sourceConfiguration);
// Expose the split assignment metrics if Flink has supported.
if (metricGroup != null) {
metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);
}
// Check the pulsar topic information and convert it into source split.
if (sourceConfiguration.isEnablePartitionDiscovery()) {
LOG.info(
"Starting the PulsarSourceEnumerator for subscription {} "
+ "with partition discovery interval of {} ms.",
sourceConfiguration.getSubscriptionDesc(),
sourceConfiguration.getPartitionDiscoveryIntervalMs());
context.callAsync(
this::getSubscribedTopicPartitions,
this::checkPartitionChanges,
0,
sourceConfiguration.getPartitionDiscoveryIntervalMs());
} else {
LOG.info(
"Starting the PulsarSourceEnumerator for subscription {} "
+ "without periodic partition discovery.",
sourceConfiguration.getSubscriptionDesc());
context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
}
}