in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [77:139]
public PulsarKafkaProducer(ProducerConfig config) {
super((kafka.producer.Producer) null);
partitioner = config.partitionerClass() != null
? newInstance(config.partitionerClass(), Partitioner.class, config.props())
: new DefaultPartitioner(config.props());
// kafka-config returns default serializer if client doesn't configure it
checkNotNull(config.keySerializerClass(), "key-serializer class can't be null");
checkNotNull(config.serializerClass(), "value-serializer class can't be null");
keySerializer = newInstance(config.keySerializerClass(), Encoder.class, config.props());
valueSerializer = newInstance(config.serializerClass(), Encoder.class, config.props());
Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
: new Properties();
String serviceUrl = config.brokerList();
try {
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new IllegalArgumentException(
"Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
}
pulsarProducerBuilder = client.newProducer();
// doc: https://kafka.apache.org/08/documentation.html#producerapi
// api-doc:
// https://github.com/apache/kafka/blob/0.8.2.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
// queue.enqueue.timeout.ms: The amount of time to block before dropping messages when running in async mode and
// the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or
// dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block
// indefinitely and never willingly drop a send.
boolean blockIfQueueFull = config.queueEnqueueTimeoutMs() == -1 ? true : false;
// This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values
// are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we
// allow batching together of requests (which is great for throughput) but open the possibility of a failure of
// the client machine dropping unsent data.
isSendAsync = "async".equalsIgnoreCase(config.producerType());
CompressionType compressionType = CompressionType.NONE;
// Valid values are "none", "gzip" and "snappy".
if ("gzip".equals(config.compressionCodec().name())) {
compressionType = CompressionType.ZLIB;
} else if ("snappy".equals(config.compressionCodec().name())) {
compressionType = CompressionType.SNAPPY;
}
long batchDelayMs = config.queueBufferingMaxMs();
if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES)) {
pulsarProducerBuilder.maxPendingMessages(config.queueBufferingMaxMessages());
}
if (properties.containsKey(KAFKA_KEY_MAX_BATCH_MESSAGES)) {
pulsarProducerBuilder.batchingMaxMessages(config.batchNumMessages());
}
if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS)) {
pulsarProducerBuilder.batchingMaxPublishDelay(batchDelayMs, TimeUnit.MILLISECONDS);
}
if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS);
}
if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
pulsarProducerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
}
pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
}