private Producer getOrCreateProducer()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [212:252]


    private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema)
            throws PulsarClientException {
        Map<SchemaHash, Producer<?>> set = producers.computeIfAbsent(topic, t -> new HashMap<>());
        SchemaHash hash = PulsarSchemaUtils.hash(schema);
        if (set.containsKey(hash)) {
            return (Producer<T>) set.get(hash);
        }

        try {
            // Use this method for auto creating the non-exist topics. Otherwise, it will throw an
            // exception.
            TopicName topicName = TopicName.get(topic);
            ((PulsarClientImpl) pulsarClient)
                    .getLookup()
                    .getPartitionedTopicMetadata(topicName)
                    .get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, e);
        } catch (ExecutionException e) {
            throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, e);
        }

        ProducerBuilder<T> builder = createProducerBuilder(pulsarClient, schema, sinkConfiguration);

        // Enable end-to-end encryption if provided.
        configPulsarCrypto(builder);

        // Set the required topic name.
        builder.topic(topic);
        // Set the sending counter for metrics.
        builder.intercept(new ProducerMetricsInterceptor(metricGroup));

        Producer<T> producer = builder.create();

        // Expose the stats for calculating and monitoring.
        exposeProducerMetrics(producer);
        set.put(hash, producer);

        return producer;
    }