in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java [102:142]
public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService)
throws PulsarClientException {
// Initialize listener properties.
this.pulsarAdmin = createAdmin(sinkConfiguration);
this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
this.timeService = timeService;
this.topicPartitionCache =
CacheBuilder.newBuilder()
.expireAfterWrite(topicMetadataRefreshInterval, TimeUnit.MILLISECONDS)
.build(
new CacheLoader<String, Optional<Integer>>() {
@Override
@ParametersAreNonnullByDefault
public Optional<Integer> load(String topic)
throws PulsarAdminException {
try {
PartitionedTopicMetadata metadata =
pulsarAdmin
.topics()
.getPartitionedTopicMetadata(topic);
return Optional.of(metadata.partitions);
} catch (NotFoundException e) {
return Optional.empty();
}
}
});
// Initialize the topic metadata. Quit if fail to connect to Pulsar.
try {
updateTopicMetadata();
} catch (PulsarAdminException e) {
throw new FlinkRuntimeException(e);
}
// Register time service for update the topic metadata.
if (topics.isEmpty()) {
LOG.info("No topics have been provided, skip metadata update timer.");
} else {
registerNextTopicMetadataUpdateTimer();
}
}