private Consumer createPulsarConsumer()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java [287:323]


    private Consumer<byte[]> createPulsarConsumer(TopicPartition partition)
            throws PulsarClientException {
        ConsumerBuilder<byte[]> consumerBuilder =
                createConsumerBuilder(pulsarClient, schema, sourceConfiguration);

        consumerBuilder.topic(partition.getFullTopicName());

        // Add CryptoKeyReader if it exists for supporting end-to-end encryption.
        CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
        if (cryptoKeyReader != null) {
            consumerBuilder.cryptoKeyReader(cryptoKeyReader);

            // Add MessageCrypto if provided.
            MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
                    pulsarCrypto.messageCrypto();
            if (messageCrypto != null) {
                consumerBuilder.messageCrypto(messageCrypto);
            }
        }

        // Add KeySharedPolicy for partial keys subscription.
        if (!isFullTopicRanges(partition.getRanges())) {
            KeySharedPolicy policy = stickyHashRange().ranges(partition.getPulsarRanges());
            // We may enable out of order delivery for speeding up. It was turned off by default.
            policy.setAllowOutOfOrderDelivery(
                    sourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
            consumerBuilder.keySharedPolicy(policy);
        }

        // Create the consumer configuration by using common utils.
        Consumer<byte[]> consumer = consumerBuilder.subscribe();

        // Exposing the consumer metrics.
        exposeConsumerMetrics(consumer);

        return consumer;
    }