in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/limiter/AsyncInflightLimiterAdapter.java [92:125]
private void tryCompleteFuture() {
// try to complete any future permit if there is
if (futurePermits.isEmpty()) {
return;
}
Runnable runnable = null;
synchronized (AsyncInflightLimiterAdapter.this) {
if (futurePermits.isEmpty()) {
return;
}
Optional<InflightLimiter.Permit> permit = delegator.tryAcquire();
if (permit.isPresent()) {
CompletableFuture<InflightLimiter.Permit> future;
// purge completed futures and find one pending future on the head of queue
while ((future = futurePermits.poll()) != null) {
if (!future.isDone()) {
final CompletableFuture<InflightLimiter.Permit> finalFuture = future;
// reduce scope of critical zone by delay completion
runnable = () -> finalFuture.complete(new AsyncPermit(permit.get()));
break;
}
}
if (runnable == null) {
// complete the permit to avoid leaking
runnable = () -> permit.get().complete();
}
}
}
if (runnable != null) {
runnable.run();
}
}