in pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java [100:114]
private Mono<Void> decoratePipeline(Mono<Void> pipeline) {
if (this.maxInflight > 0) {
Mono<Void> finalPipeline = pipeline;
pipeline = Mono.using(() -> new InflightLimiter(this.maxInflight),
(inflightLimiter) -> (finalPipeline)
.contextWrite(Context.of(INFLIGHT_LIMITER_CONTEXT_KEY, inflightLimiter)),
InflightLimiter::dispose);
}
if (this.pipelineRetrySpec != null) {
return pipeline.retryWhen(this.pipelineRetrySpec);
}
else {
return pipeline;
}
}