in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/HibernatingJobRebalancer.java [84:163]
private Map<RebalancingJobGroup, Long> balanceJobGroupWorker(
Map<RebalancingJobGroup, Long> jobGroupToWorker,
Set<Long> workerIds,
List<Integer> placementPlan) {
Map<RebalancingJobGroup, Long> result = new HashMap<>();
// compute worker job groups
Map<Long, List<RebalancingJobGroup>> workerJobGroupMap =
jobGroupToWorker
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> entry.getValue(),
entry -> Arrays.asList(entry.getKey()),
(list1, list2) ->
Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList())));
workerIds
.stream()
.forEach(
workerId -> workerJobGroupMap.computeIfAbsent(workerId, o -> Collections.EMPTY_LIST));
// non-empty workers sorted by number of jobGroups in ascending order
List<Map.Entry<Long, List<RebalancingJobGroup>>> workerJobGroupList =
workerJobGroupMap
.entrySet()
.stream()
.filter(entry -> entry.getKey() != WorkerUtils.UNSET_WORKER_ID)
.sorted(Comparator.comparingInt(entry -> entry.getValue().size()))
.collect(Collectors.toList());
int nSpareWorkers = workerJobGroupList.size() - placementPlan.size();
// free spare workers
List<RebalancingJobGroup> freeJobGroups =
workerJobGroupList
.subList(0, nSpareWorkers)
.stream()
.map(entry -> entry.getValue())
.flatMap(list -> list.stream())
.collect(Collectors.toList());
// include all unassigned job groups
freeJobGroups.addAll(
workerJobGroupMap.getOrDefault(WorkerUtils.UNSET_WORKER_ID, Collections.EMPTY_LIST));
workerJobGroupList = workerJobGroupList.subList(nSpareWorkers, workerJobGroupList.size());
// switch to mutable list
workerJobGroupList.stream().forEach(entry -> entry.setValue(new ArrayList<>(entry.getValue())));
// make sure worker under load
for (int i = 0; i < placementPlan.size(); ++i) {
Map.Entry<Long, List<RebalancingJobGroup>> entry = workerJobGroupList.get(i);
int expectedSize = placementPlan.get(i);
int actualSize = entry.getValue().size();
if (actualSize > expectedSize) {
List<RebalancingJobGroup> jobGroups = entry.getValue().subList(0, expectedSize);
freeJobGroups.addAll(entry.getValue().subList(expectedSize, actualSize));
entry.setValue(jobGroups);
}
}
// assign free work load to worker
LinkedList<RebalancingJobGroup> linkFreeJobGroups = new LinkedList<>(freeJobGroups);
for (int i = 0; i < placementPlan.size(); ++i) {
Map.Entry<Long, List<RebalancingJobGroup>> entry = workerJobGroupList.get(i);
int expectedSize = placementPlan.get(i);
int actualSize = entry.getValue().size();
for (int j = 0; j < expectedSize - actualSize; ++j) {
entry.getValue().add(linkFreeJobGroups.poll());
}
}
// collect result
workerJobGroupList
.stream()
.forEach(
entry -> entry.getValue().stream().forEach(group -> result.put(group, entry.getKey())));
return result;
}