in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/AbstractRpcUriRebalancer.java [203:264]
private void minimizeWorkers(HashBasedTable<Long, Long, RebalancingJob> table) {
Map<String, Map<Long, Double>> uriToLoadPerWorker = new HashMap<>();
for (RebalancingJob job : table.values()) {
String uri = job.getRpcUri();
uriToLoadPerWorker
.computeIfAbsent(uri, o -> new HashMap<>())
.compute(
job.getWorkerId(),
(aLong, aDouble) -> aDouble == null ? job.getLoad() : aDouble + job.getLoad());
}
Map<Long, Map<Long, RebalancingJob>> jobsPerWorkerId = table.columnMap();
for (Map<Long, Double> loadPerWorker : uriToLoadPerWorker.values()) {
if (loadPerWorker.size() <= config.getNumWorkersPerUri()) {
// do not minimize workers below numWorkersPerUri config.
continue;
}
ImmutableList<Long> workerIdsIncreasingLoad =
ImmutableList.copyOf(
ImmutableMap.<Long, Double>builder()
.orderEntriesByValue(
Double::compare) // WARNING: orderEntriesByValue is marked @Beta
.putAll(loadPerWorker)
.build()
.keySet());
ImmutableList<Long> workerIdsDecreasingLoad = workerIdsIncreasingLoad.reverse();
workerIdsIncreasingLoad.forEach(
thisWorkerId -> {
// O(n) lookup from max loaded worker to find something to merge
// the current implementation merges full workers. This could be improved to split a job
// on
// a single worker over multiple other workers.
Double thisLoad = loadPerWorker.get(thisWorkerId);
Preconditions.checkNotNull(
thisLoad, "load should exist for worker_id since we built iterator from keyset");
for (long otherWorkerId : workerIdsDecreasingLoad) {
if (otherWorkerId == thisWorkerId) {
// no need to consider otherWorkerIds < thisWorkerId because those have already
// been shrunk if possible.
break;
}
Double otherLoad = loadPerWorker.get(otherWorkerId);
Preconditions.checkNotNull(
otherLoad, "load should exist for worker_id since we built iterator from keyset");
if (otherLoad + thisLoad < CAPACITY_PER_WORKER) {
jobsPerWorkerId
.getOrDefault(thisWorkerId, new HashMap<>())
.values()
.forEach(
job -> {
job.setWorkerId(otherWorkerId);
table.put(job.getJobId(), otherWorkerId, job);
table.remove(job.getJobId(), thisWorkerId);
});
loadPerWorker.put(otherWorkerId, otherLoad + thisLoad);
loadPerWorker.put(thisWorkerId, 0.0);
break;
}
}
});
}
}