in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/limiter/AbstractInflightLimiter.java [106:132]
public boolean complete(Result result) {
boolean succeed = nestedPermit.complete(result);
if (!succeed) {
return false;
}
// try to complete any future permit if there is
if (!futurePermits.isEmpty()) {
Runnable runnable = null;
synchronized (AbstractInflightLimiter.this) {
if (!futurePermits.isEmpty()) {
Optional<Permit> permit = tryAcquire();
if (permit.isPresent()) {
// complete the queued request on the head of queue
Iterator<CompletableFuture<Permit>> iter = futurePermits.iterator();
CompletableFuture<Permit> future = iter.next();
iter.remove();
// reduce scope of critical zone by delay completion
runnable = () -> future.complete(new AsyncPermit(permit.get()));
}
}
}
if (runnable != null) {
runnable.run();
}
}
return true;
}