public void start()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java [111:139]


    public void start() {
        subscriber.open(pulsarClient, pulsarAdmin);
        rangeGenerator.open(sourceConfiguration);

        // Expose the split assignment metrics if Flink has supported.
        if (metricGroup != null) {
            metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);
        }

        // Check the pulsar topic information and convert it into source split.
        if (sourceConfiguration.isEnablePartitionDiscovery()) {
            LOG.info(
                    "Starting the PulsarSourceEnumerator for subscription {} "
                            + "with partition discovery interval of {} ms.",
                    sourceConfiguration.getSubscriptionDesc(),
                    sourceConfiguration.getPartitionDiscoveryIntervalMs());
            context.callAsync(
                    this::getSubscribedTopicPartitions,
                    this::checkPartitionChanges,
                    0,
                    sourceConfiguration.getPartitionDiscoveryIntervalMs());
        } else {
            LOG.info(
                    "Starting the PulsarSourceEnumerator for subscription {} "
                            + "without periodic partition discovery.",
                    sourceConfiguration.getSubscriptionDesc());
            context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }