public static ProducerBuilder createProducerBuilder()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java [70:114]


    public static <T> ProducerBuilder<T> createProducerBuilder(
            PulsarClient client, Schema<T> schema, SinkConfiguration configuration) {
        ProducerBuilder<T> builder = client.newProducer(schema);

        configuration.useOption(
                PULSAR_PRODUCER_NAME,
                producerName -> String.format(producerName, UUID.randomUUID()),
                builder::producerName);
        configuration.useOption(
                PULSAR_SEND_TIMEOUT_MS,
                Math::toIntExact,
                ms -> builder.sendTimeout(ms, MILLISECONDS));
        configuration.useOption(
                PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
                s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
        configuration.useOption(
                PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY,
                builder::roundRobinRouterBatchingPartitionSwitchFrequency);
        configuration.useOption(PULSAR_BATCHING_MAX_MESSAGES, builder::batchingMaxMessages);
        configuration.useOption(PULSAR_BATCHING_MAX_BYTES, builder::batchingMaxBytes);
        configuration.useOption(PULSAR_BATCHING_ENABLED, builder::enableBatching);
        configuration.useOption(PULSAR_CHUNKING_ENABLED, builder::enableChunking);
        configuration.useOption(PULSAR_CHUNK_MAX_MESSAGE_SIZE, builder::chunkMaxMessageSize);
        configuration.useOption(PULSAR_COMPRESSION_TYPE, builder::compressionType);
        configuration.useOption(PULSAR_INITIAL_SEQUENCE_ID, builder::initialSequenceId);
        configuration.useOption(
                PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);

        // Set producer properties
        Map<String, String> properties = configuration.getProperties(PULSAR_PRODUCER_PROPERTIES);
        if (!properties.isEmpty()) {
            builder.properties(properties);
        }

        // Set the default value for current producer builder.
        // We use non-partitioned producer by default. This wouldn't be changed in the future.
        builder.blockIfQueueFull(true)
                .messageRoutingMode(SinglePartition)
                .enableMultiSchema(false)
                .autoUpdatePartitions(false)
                .accessMode(Shared)
                .enableLazyStartPartitionedProducers(false);

        return builder;
    }