in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/limiter/LongFixedInflightLimiter.java [94:127]
Permit doAcquire(AcquireMode acquireMode, int n) throws InterruptedException {
if (n < 0) {
return NoopPermit.INSTANCE;
}
if (!isClosed.get() && limit != 0) {
synchronized (lock) {
queueLength.addAndGet(n);
try {
while (!isClosed.get() && limit != 0) {
long nInflight = inflight.get();
if (nInflight >= 0
&& // note, nInflight could overflow
nInflight <= limit - n) {
inflight.addAndGet(n);
return new FixedPermit(n);
} else if (acquireMode == AcquireMode.DryRun) {
return NoopPermit.INSTANCE;
} else if (acquireMode == AcquireMode.NonBlocking) {
// unblock caller if failed to get permit
return null;
}
// if not get permitted, wait for the unlock signal and try acquire permit again.
lock.wait();
}
} finally {
queueLength.addAndGet(-n);
}
}
}
// Always permit if limiter is closed
return NoopPermit.INSTANCE;
}