in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [212:252]
private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema)
throws PulsarClientException {
Map<SchemaHash, Producer<?>> set = producers.computeIfAbsent(topic, t -> new HashMap<>());
SchemaHash hash = PulsarSchemaUtils.hash(schema);
if (set.containsKey(hash)) {
return (Producer<T>) set.get(hash);
}
try {
// Use this method for auto creating the non-exist topics. Otherwise, it will throw an
// exception.
TopicName topicName = TopicName.get(topic);
((PulsarClientImpl) pulsarClient)
.getLookup()
.getPartitionedTopicMetadata(topicName)
.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, e);
} catch (ExecutionException e) {
throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, e);
}
ProducerBuilder<T> builder = createProducerBuilder(pulsarClient, schema, sinkConfiguration);
// Enable end-to-end encryption if provided.
configPulsarCrypto(builder);
// Set the required topic name.
builder.topic(topic);
// Set the sending counter for metrics.
builder.intercept(new ProducerMetricsInterceptor(metricGroup));
Producer<T> producer = builder.create();
// Expose the stats for calculating and monitoring.
exposeProducerMetrics(producer);
set.put(hash, producer);
return producer;
}