public static ConsumerBuilder getConsumerBuilder()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java [40:92]


    public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();

        if (properties.containsKey(CONSUMER_NAME)) {
            consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
        }

        if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
            consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
        }

        if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
            consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
                    Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
        }

        if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
            consumerBuilder.acknowledgmentGroupTime(
                    Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
        }

        if (properties.containsKey(REPLICATE_SUBSCRIPTION_STATE)) {
            consumerBuilder.replicateSubscriptionState(
                    Boolean.parseBoolean(properties.getProperty(REPLICATE_SUBSCRIPTION_STATE)));
        }

        if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
            RegexSubscriptionMode mode;
            try {
                mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
                    + Arrays.asList(RegexSubscriptionMode.values()));
            }
            consumerBuilder.subscriptionTopicsMode(mode);
        }

        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
            consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
        }
        
        if (properties.containsKey(CRYPTO_READER_FACTORY_CLASS_NAME)) {
            try {
                CryptoKeyReaderFactory cryptoReaderFactory = (CryptoKeyReaderFactory) Class
                        .forName(properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME)).newInstance();
                consumerBuilder.cryptoKeyReader(cryptoReaderFactory.create(properties));
            } catch (Exception e) {
                throw new IllegalArgumentException("Failed to create crypto reader using factory "
                        + properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME), e);
            }
        }
        return consumerBuilder;
    }