public void open()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java [102:142]


    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService)
            throws PulsarClientException {
        // Initialize listener properties.
        this.pulsarAdmin = createAdmin(sinkConfiguration);
        this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
        this.timeService = timeService;
        this.topicPartitionCache =
                CacheBuilder.newBuilder()
                        .expireAfterWrite(topicMetadataRefreshInterval, TimeUnit.MILLISECONDS)
                        .build(
                                new CacheLoader<String, Optional<Integer>>() {
                                    @Override
                                    @ParametersAreNonnullByDefault
                                    public Optional<Integer> load(String topic)
                                            throws PulsarAdminException {
                                        try {
                                            PartitionedTopicMetadata metadata =
                                                    pulsarAdmin
                                                            .topics()
                                                            .getPartitionedTopicMetadata(topic);
                                            return Optional.of(metadata.partitions);
                                        } catch (NotFoundException e) {
                                            return Optional.empty();
                                        }
                                    }
                                });

        // Initialize the topic metadata. Quit if fail to connect to Pulsar.
        try {
            updateTopicMetadata();
        } catch (PulsarAdminException e) {
            throw new FlinkRuntimeException(e);
        }

        // Register time service for update the topic metadata.
        if (topics.isEmpty()) {
            LOG.info("No topics have been provided, skip metadata update timer.");
        } else {
            registerNextTopicMetadataUpdateTimer();
        }
    }