in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderBuilder.java [111:126]
public ReactiveMessageSender<T> build() {
Object producerActionTransformerKey;
if (this.maxInflight > 0) {
Objects.requireNonNull(this.producerCache, "cache must be provided when maxInflight is set.");
this.producerActionTransformer = () -> new InflightLimiter(this.maxInflight,
Math.max(this.maxInflight / 2, 1), Schedulers.single(), this.maxConcurrentSenderSubscriptions);
producerActionTransformerKey = new ProducerActionTransformerKey(this.maxInflight,
this.maxConcurrentSenderSubscriptions);
}
else {
producerActionTransformerKey = null;
}
return new AdaptedReactiveMessageSender<>(this.schema, this.senderSpec, resolveMaxConcurrency(),
this.reactiveProducerAdapterFactory, (ProducerCache) this.producerCache, this.producerActionTransformer,
producerActionTransformerKey, this.stopOnError);
}