in pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java [138:159]
void maybeTriggerNext() {
if (!this.triggerNextWorker.isDisposed() && this.inflight.get() < this.maxInflight
&& !this.pendingSubscriptions.isEmpty()) {
if (this.triggerNextTriggered.compareAndSet(false, true)) {
this.triggerNextWorker.schedule(() -> {
this.triggerNextTriggered.set(false);
int remainingSubscriptions = this.pendingSubscriptions.size();
while (this.inflight.get() < this.maxInflight && remainingSubscriptions-- > 0) {
InflightLimiterSubscriber<?> subscriber = this.pendingSubscriptions.poll();
if (subscriber != null) {
if (!subscriber.isDisposed()) {
subscriber.requestMore();
}
}
else {
break;
}
}
});
}
}
}