in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java [47:92]
public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (properties.containsKey(PRODUCER_NAME)) {
producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
}
if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES)) {
producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
producerBuilder.maxPendingMessagesAcrossPartitions(
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
}
producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}
if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
}
if (properties.containsKey(CRYPTO_READER_FACTORY_CLASS_NAME)) {
try {
CryptoKeyReaderFactory cryptoReaderFactory = (CryptoKeyReaderFactory) Class
.forName(properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME)).newInstance();
producerBuilder.cryptoKeyReader(cryptoReaderFactory.create(properties));
Set<String> keys = cryptoReaderFactory.getEncryptionKey(properties);
if (keys != null) {
keys.forEach(producerBuilder::addEncryptionKey);
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to create crypto reader using factory "
+ properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME), e);
}
}
return producerBuilder;
}