in concurrency-loadbalancer-core/src/main/java/com/uber/concurrency/loadbalancer/ArrayConcurrencyLoadBalancer.java [92:115]
public CompletableTask<T> next() {
TaskGroup<T> taskGroup = weightedSelector.select();
if (taskGroup == null) {
return null;
}
TaskConcurrency<T> leastTaskConcurrency = LEAST_TASK_CONCURRENCY;
ReservoirSampler<TaskConcurrency<T>> sampler = new ReservoirSampler<>();
for(TaskConcurrency<T> taskConcurrency : taskGroup) {
taskConcurrency.syncState();
int diff = taskConcurrency.compareTo(leastTaskConcurrency);
if (diff < 0) {
sampler.reset();
leastTaskConcurrency = taskConcurrency;
} else if (diff > 0){
continue;
}
sampler.sample(taskConcurrency);
}
TaskConcurrency<T> result = sampler.getSample();
if (result == null) {
return null; // no tasks or all tasks reached concurrency limits
}
return new ConcurrentTaskImpl(ticker.read(), result);
}