in pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java [279:319]
void requestMore() {
// spread requests evenly across active subscriptions (or expected number of
// subscriptions)
int maxInflightForSubscription = Math
.max(InflightLimiter.this.maxInflight / Math.max(InflightLimiter.this.activeSubscriptions.get(),
InflightLimiter.this.expectedSubscriptionsInflight), 1);
if (this.requestedDemand.get() > 0 && (this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED
|| (this.inflightForSubscription.get() < maxInflightForSubscription
&& InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight))) {
if (this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL,
InflightLimiterSubscriberState.SUBSCRIBING)) {
// consume one slot for the subscription, since the first element
// might already be in flight
// when a CompletableFuture is mapped to a Mono
InflightLimiter.this.inflight.incrementAndGet();
this.inflightForSubscription.incrementAndGet();
this.source.subscribe(InflightLimiterSubscriber.this);
}
else if (this.state.get() == InflightLimiterSubscriberState.REQUESTING
|| this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED) {
// subscribing changed the values, so adjust back the values on first
// call
if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBED,
InflightLimiterSubscriberState.REQUESTING)) {
// reverse the slot reservation made when transitioning from
// INITIAL to SUBSCRIBING
InflightLimiter.this.inflight.decrementAndGet();
this.inflightForSubscription.decrementAndGet();
}
long maxRequest = Math.max(Math.min(this.requestedDemand.get(),
maxInflightForSubscription - this.inflightForSubscription.get()), 1);
InflightLimiter.this.inflight.addAndGet((int) maxRequest);
this.requestedDemand.addAndGet(-maxRequest);
this.inflightForSubscription.addAndGet((int) maxRequest);
request(maxRequest);
}
}
else {
maybeAddToPending();
}
}