void requestMore()

in pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InflightLimiter.java [279:319]


		void requestMore() {
			// spread requests evenly across active subscriptions (or expected number of
			// subscriptions)
			int maxInflightForSubscription = Math
					.max(InflightLimiter.this.maxInflight / Math.max(InflightLimiter.this.activeSubscriptions.get(),
							InflightLimiter.this.expectedSubscriptionsInflight), 1);
			if (this.requestedDemand.get() > 0 && (this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED
					|| (this.inflightForSubscription.get() < maxInflightForSubscription
							&& InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight))) {
				if (this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL,
						InflightLimiterSubscriberState.SUBSCRIBING)) {
					// consume one slot for the subscription, since the first element
					// might already be in flight
					// when a CompletableFuture is mapped to a Mono
					InflightLimiter.this.inflight.incrementAndGet();
					this.inflightForSubscription.incrementAndGet();
					this.source.subscribe(InflightLimiterSubscriber.this);
				}
				else if (this.state.get() == InflightLimiterSubscriberState.REQUESTING
						|| this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED) {
					// subscribing changed the values, so adjust back the values on first
					// call
					if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBED,
							InflightLimiterSubscriberState.REQUESTING)) {
						// reverse the slot reservation made when transitioning from
						// INITIAL to SUBSCRIBING
						InflightLimiter.this.inflight.decrementAndGet();
						this.inflightForSubscription.decrementAndGet();
					}
					long maxRequest = Math.max(Math.min(this.requestedDemand.get(),
							maxInflightForSubscription - this.inflightForSubscription.get()), 1);
					InflightLimiter.this.inflight.addAndGet((int) maxRequest);
					this.requestedDemand.addAndGet(-maxRequest);
					this.inflightForSubscription.addAndGet((int) maxRequest);
					request(maxRequest);
				}
			}
			else {
				maybeAddToPending();
			}
		}