public static ConsumerBuilder createConsumerBuilder()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java [93:158]


    public static <T> ConsumerBuilder<T> createConsumerBuilder(
            PulsarClient client, Schema<T> schema, SourceConfiguration configuration) {
        ConsumerBuilder<T> builder = new PulsarConsumerBuilder<>(client, schema);

        configuration.useOption(PULSAR_SUBSCRIPTION_NAME, builder::subscriptionName);
        configuration.useOption(
                PULSAR_ACK_TIMEOUT_MILLIS, v -> builder.ackTimeout(v, MILLISECONDS));
        configuration.useOption(PULSAR_ACK_RECEIPT_ENABLED, builder::isAckReceiptEnabled);
        configuration.useOption(
                PULSAR_TICK_DURATION_MILLIS, v -> builder.ackTimeoutTickTime(v, MILLISECONDS));
        configuration.useOption(
                PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS,
                v -> builder.negativeAckRedeliveryDelay(v, MICROSECONDS));
        configuration.useOption(PULSAR_SUBSCRIPTION_MODE, builder::subscriptionMode);
        configuration.useOption(PULSAR_SUBSCRIPTION_PROPERTIES, builder::subscriptionProperties);
        configuration.useOption(PULSAR_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
        configuration.useOption(PULSAR_RECEIVER_QUEUE_SIZE, builder::receiverQueueSize);
        configuration.useOption(
                PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS,
                v -> builder.acknowledgmentGroupTime(v, MICROSECONDS));
        configuration.useOption(
                PULSAR_REPLICATE_SUBSCRIPTION_STATE, builder::replicateSubscriptionState);
        configuration.useOption(
                PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS,
                builder::maxTotalReceiverQueueSizeAcrossPartitions);
        configuration.useOption(
                PULSAR_CONSUMER_NAME,
                consumerName -> String.format(consumerName, UUID.randomUUID()),
                builder::consumerName);
        configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
        configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
        createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
        configuration.useOption(PULSAR_RETRY_ENABLE, builder::enableRetry);
        configuration.useOption(
                PULSAR_MAX_PENDING_CHUNKED_MESSAGE, builder::maxPendingChunkedMessage);
        configuration.useOption(
                PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL,
                builder::autoAckOldestChunkedMessageOnQueueFull);
        configuration.useOption(
                PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS,
                v -> builder.expireTimeOfIncompleteChunkedMessage(v, MILLISECONDS));
        configuration.useOption(PULSAR_POOL_MESSAGES, builder::poolMessages);

        if (configuration.contains(PULSAR_MEMORY_LIMIT_BYTES)) {
            // Force to scale the receiver queue size if the memory limit has been configured.
            builder.autoScaledReceiverQueueSizeEnabled(true);
        } else {
            configuration.useOption(
                    PULSAR_AUTO_SCALED_RECEIVER_QUEUE_SIZE_ENABLED,
                    builder::autoScaledReceiverQueueSizeEnabled);
        }

        Map<String, String> properties = configuration.getProperties(PULSAR_CONSUMER_PROPERTIES);
        if (!properties.isEmpty()) {
            builder.properties(properties);
        }

        // We only use exclusive subscription.
        builder.subscriptionType(SubscriptionType.Exclusive);

        // Flink connector doesn't need any batch receiving behaviours.
        // Disable the batch-receive timer for the Consumer instance.
        builder.batchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);

        return builder;
    }