in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java [70:114]
public static <T> ProducerBuilder<T> createProducerBuilder(
PulsarClient client, Schema<T> schema, SinkConfiguration configuration) {
ProducerBuilder<T> builder = client.newProducer(schema);
configuration.useOption(
PULSAR_PRODUCER_NAME,
producerName -> String.format(producerName, UUID.randomUUID()),
builder::producerName);
configuration.useOption(
PULSAR_SEND_TIMEOUT_MS,
Math::toIntExact,
ms -> builder.sendTimeout(ms, MILLISECONDS));
configuration.useOption(
PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
configuration.useOption(
PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY,
builder::roundRobinRouterBatchingPartitionSwitchFrequency);
configuration.useOption(PULSAR_BATCHING_MAX_MESSAGES, builder::batchingMaxMessages);
configuration.useOption(PULSAR_BATCHING_MAX_BYTES, builder::batchingMaxBytes);
configuration.useOption(PULSAR_BATCHING_ENABLED, builder::enableBatching);
configuration.useOption(PULSAR_CHUNKING_ENABLED, builder::enableChunking);
configuration.useOption(PULSAR_CHUNK_MAX_MESSAGE_SIZE, builder::chunkMaxMessageSize);
configuration.useOption(PULSAR_COMPRESSION_TYPE, builder::compressionType);
configuration.useOption(PULSAR_INITIAL_SEQUENCE_ID, builder::initialSequenceId);
configuration.useOption(
PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
// Set producer properties
Map<String, String> properties = configuration.getProperties(PULSAR_PRODUCER_PROPERTIES);
if (!properties.isEmpty()) {
builder.properties(properties);
}
// Set the default value for current producer builder.
// We use non-partitioned producer by default. This wouldn't be changed in the future.
builder.blockIfQueueFull(true)
.messageRoutingMode(SinglePartition)
.enableMultiSchema(false)
.autoUpdatePartitions(false)
.accessMode(Shared)
.enableLazyStartPartitionedProducers(false);
return builder;
}