in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java [109:213]
private void configureConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
if (this.consumerSpec.getTopicNames() != null && !this.consumerSpec.getTopicNames().isEmpty()) {
consumerBuilder.topics(this.consumerSpec.getTopicNames());
}
if (this.consumerSpec.getTopicsPattern() != null) {
consumerBuilder.topicsPattern(this.consumerSpec.getTopicsPattern());
}
if (this.consumerSpec.getTopicsPatternSubscriptionMode() != null) {
consumerBuilder.subscriptionTopicsMode(this.consumerSpec.getTopicsPatternSubscriptionMode());
}
if (this.consumerSpec.getTopicsPatternAutoDiscoveryPeriod() != null) {
consumerBuilder.patternAutoDiscoveryPeriod(
(int) (this.consumerSpec.getTopicsPatternAutoDiscoveryPeriod().toMillis() / 1000L),
TimeUnit.SECONDS);
}
if (this.consumerSpec.getSubscriptionName() != null) {
consumerBuilder.subscriptionName(this.consumerSpec.getSubscriptionName());
}
if (this.consumerSpec.getSubscriptionMode() != null) {
consumerBuilder.subscriptionMode(this.consumerSpec.getSubscriptionMode());
}
if (this.consumerSpec.getSubscriptionType() != null) {
consumerBuilder.subscriptionType(this.consumerSpec.getSubscriptionType());
}
if (this.consumerSpec.getSubscriptionInitialPosition() != null) {
consumerBuilder.subscriptionInitialPosition(this.consumerSpec.getSubscriptionInitialPosition());
}
if (this.consumerSpec.getKeySharedPolicy() != null) {
consumerBuilder.keySharedPolicy(this.consumerSpec.getKeySharedPolicy());
}
if (this.consumerSpec.getReplicateSubscriptionState() != null) {
consumerBuilder.replicateSubscriptionState(this.consumerSpec.getReplicateSubscriptionState());
}
if (this.consumerSpec.getSubscriptionProperties() != null
&& !this.consumerSpec.getSubscriptionProperties().isEmpty()) {
consumerBuilder.subscriptionProperties(this.consumerSpec.getSubscriptionProperties());
}
if (this.consumerSpec.getConsumerName() != null) {
consumerBuilder.consumerName(this.consumerSpec.getConsumerName());
}
if (this.consumerSpec.getProperties() != null && !this.consumerSpec.getProperties().isEmpty()) {
consumerBuilder.properties(this.consumerSpec.getProperties());
}
if (this.consumerSpec.getPriorityLevel() != null) {
consumerBuilder.priorityLevel(this.consumerSpec.getPriorityLevel());
}
if (this.consumerSpec.getReadCompacted() != null) {
consumerBuilder.readCompacted(this.consumerSpec.getReadCompacted());
}
if (this.consumerSpec.getBatchIndexAckEnabled() != null) {
consumerBuilder.enableBatchIndexAcknowledgment(this.consumerSpec.getBatchIndexAckEnabled());
}
if (this.consumerSpec.getAckTimeout() != null) {
consumerBuilder.ackTimeout(this.consumerSpec.getAckTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
if (this.consumerSpec.getAckTimeoutTickTime() != null) {
consumerBuilder.ackTimeoutTickTime(this.consumerSpec.getAckTimeoutTickTime().toMillis(),
TimeUnit.MILLISECONDS);
}
if (this.consumerSpec.getAcknowledgementsGroupTime() != null) {
consumerBuilder.acknowledgmentGroupTime(this.consumerSpec.getAcknowledgementsGroupTime().toMillis(),
TimeUnit.MILLISECONDS);
}
if (this.consumerSpec.getNegativeAckRedeliveryDelay() != null) {
consumerBuilder.negativeAckRedeliveryDelay(this.consumerSpec.getNegativeAckRedeliveryDelay().toMillis(),
TimeUnit.MILLISECONDS);
}
if (this.consumerSpec.getDeadLetterPolicy() != null) {
consumerBuilder.deadLetterPolicy(this.consumerSpec.getDeadLetterPolicy());
}
if (this.consumerSpec.getRetryLetterTopicEnable() != null) {
consumerBuilder.enableRetry(this.consumerSpec.getRetryLetterTopicEnable());
}
if (this.consumerSpec.getReceiverQueueSize() != null) {
consumerBuilder.receiverQueueSize(this.consumerSpec.getReceiverQueueSize());
}
if (this.consumerSpec.getMaxTotalReceiverQueueSizeAcrossPartitions() != null) {
consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
this.consumerSpec.getMaxTotalReceiverQueueSizeAcrossPartitions());
}
if (this.consumerSpec.getAutoUpdatePartitions() != null) {
consumerBuilder.autoUpdatePartitions(this.consumerSpec.getAutoUpdatePartitions());
}
if (this.consumerSpec.getAutoUpdatePartitionsInterval() != null) {
consumerBuilder.autoUpdatePartitionsInterval(
(int) (this.consumerSpec.getAutoUpdatePartitionsInterval().toMillis() / 1000L), TimeUnit.SECONDS);
}
if (this.consumerSpec.getCryptoKeyReader() != null) {
consumerBuilder.cryptoKeyReader(this.consumerSpec.getCryptoKeyReader());
}
if (this.consumerSpec.getCryptoFailureAction() != null) {
consumerBuilder.cryptoFailureAction(this.consumerSpec.getCryptoFailureAction());
}
if (this.consumerSpec.getMaxPendingChunkedMessage() != null) {
consumerBuilder.maxPendingChunkedMessage(this.consumerSpec.getMaxPendingChunkedMessage());
}
if (this.consumerSpec.getAutoAckOldestChunkedMessageOnQueueFull() != null) {
consumerBuilder
.autoAckOldestChunkedMessageOnQueueFull(this.consumerSpec.getAutoAckOldestChunkedMessageOnQueueFull());
}
if (this.consumerSpec.getExpireTimeOfIncompleteChunkedMessage() != null) {
consumerBuilder.expireTimeOfIncompleteChunkedMessage(
this.consumerSpec.getExpireTimeOfIncompleteChunkedMessage().toMillis(), TimeUnit.MILLISECONDS);
}
}