in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java [287:323]
private Consumer<byte[]> createPulsarConsumer(TopicPartition partition)
throws PulsarClientException {
ConsumerBuilder<byte[]> consumerBuilder =
createConsumerBuilder(pulsarClient, schema, sourceConfiguration);
consumerBuilder.topic(partition.getFullTopicName());
// Add CryptoKeyReader if it exists for supporting end-to-end encryption.
CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
if (cryptoKeyReader != null) {
consumerBuilder.cryptoKeyReader(cryptoKeyReader);
// Add MessageCrypto if provided.
MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
pulsarCrypto.messageCrypto();
if (messageCrypto != null) {
consumerBuilder.messageCrypto(messageCrypto);
}
}
// Add KeySharedPolicy for partial keys subscription.
if (!isFullTopicRanges(partition.getRanges())) {
KeySharedPolicy policy = stickyHashRange().ranges(partition.getPulsarRanges());
// We may enable out of order delivery for speeding up. It was turned off by default.
policy.setAllowOutOfOrderDelivery(
sourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
consumerBuilder.keySharedPolicy(policy);
}
// Create the consumer configuration by using common utils.
Consumer<byte[]> consumer = consumerBuilder.subscribe();
// Exposing the consumer metrics.
exposeConsumerMetrics(consumer);
return consumer;
}