public static ProducerBuilder getProducerBuilder()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java [47:92]


    public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
        ProducerBuilder<byte[]> producerBuilder = client.newProducer();

        if (properties.containsKey(PRODUCER_NAME)) {
            producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
        }

        if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
            producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
        }

        if (properties.containsKey(MAX_PENDING_MESSAGES)) {
            producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
        }

        if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
            producerBuilder.maxPendingMessagesAcrossPartitions(
                    Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
        }

        producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));

        if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
            producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
        }

        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
            producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
        }

        if (properties.containsKey(CRYPTO_READER_FACTORY_CLASS_NAME)) {
            try {
                CryptoKeyReaderFactory cryptoReaderFactory = (CryptoKeyReaderFactory) Class
                        .forName(properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME)).newInstance();
                producerBuilder.cryptoKeyReader(cryptoReaderFactory.create(properties));
                Set<String> keys = cryptoReaderFactory.getEncryptionKey(properties);
                if (keys != null) {
                    keys.forEach(producerBuilder::addEncryptionKey);
                }
            } catch (Exception e) {
                throw new IllegalArgumentException("Failed to create crypto reader using factory "
                        + properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME), e);
            }
        }
        return producerBuilder;
    }