in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [102:172]
private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
if (keySchema == null) {
Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
kafkaKeySerializer.configure(producerConfig.originals(), true);
this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
} else {
this.keySchema = keySchema;
producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (valueSchema == null) {
Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
kafkaValueSerializer.configure(producerConfig.originals(), false);
this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
} else {
this.valueSchema = valueSchema;
producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
partitioner.configure(producerConfig.originals());
this.properties = new Properties();
producerConfig.originals().forEach(properties::put);
long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
try {
// Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
// If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
client = PulsarClientKafkaConfig.getClientBuilder(properties)
.serviceUrl(serviceUrl)
.keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS)
.build();
} catch (ArithmeticException e) {
String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. " +
"Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
logger.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
// To mimic the same batching mode as Kafka, we need to wait a very little amount of
// time to batch if the client is trying to send messages fast enough
long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
if ("gzip".equals(compressionType)) {
pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
} else if ("lz4".equals(compressionType)) {
pulsarProducerBuilder.compressionType(CompressionType.LZ4);
}
pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
// Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
// Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
}