in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java [104:130]
private void configureReaderBuilder(ReaderBuilder<T> readerBuilder) {
readerBuilder.topics(this.readerSpec.getTopicNames());
if (this.readerSpec.getReaderName() != null) {
readerBuilder.readerName(this.readerSpec.getReaderName());
}
if (this.readerSpec.getSubscriptionName() != null) {
readerBuilder.subscriptionName(this.readerSpec.getSubscriptionName());
}
if (this.readerSpec.getGeneratedSubscriptionNamePrefix() != null) {
readerBuilder.subscriptionRolePrefix(this.readerSpec.getGeneratedSubscriptionNamePrefix());
}
if (this.readerSpec.getReceiverQueueSize() != null) {
readerBuilder.receiverQueueSize(this.readerSpec.getReceiverQueueSize());
}
if (this.readerSpec.getReadCompacted() != null) {
readerBuilder.readCompacted(this.readerSpec.getReadCompacted());
}
if (this.readerSpec.getKeyHashRanges() != null && !this.readerSpec.getKeyHashRanges().isEmpty()) {
readerBuilder.keyHashRange(this.readerSpec.getKeyHashRanges().toArray(new Range[0]));
}
if (this.readerSpec.getCryptoKeyReader() != null) {
readerBuilder.cryptoKeyReader(this.readerSpec.getCryptoKeyReader());
}
if (this.readerSpec.getCryptoFailureAction() != null) {
readerBuilder.cryptoFailureAction(this.readerSpec.getCryptoFailureAction());
}
}