in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [186:244]
protected static void insertWorkersIntoRebalancingTable(
RpcJobColocatingRebalancer.RebalancingWorkerTable rebalancingWorkerTable,
List<Integer> workersNeededPerPartition,
PodAwareRebalanceGroup podAwareRebalanceGroup,
RebalancerConfiguration rebalancerConfiguration,
Map<String, Integer> jobGroupToPartitionMap) {
// 1.) Add all workers for each job group to the rebalancing table, provided workers are still
// valid
Map<Long, StoredWorker> podWorkers = podAwareRebalanceGroup.getWorkers();
for (Map.Entry<String, Integer> entry : jobGroupToPartitionMap.entrySet()) {
String jobGroupId = entry.getKey();
int groupPartitionIdx = entry.getValue();
Preconditions.checkArgument(
podAwareRebalanceGroup.getGroupIdToJobs().containsKey(jobGroupId),
String.format("Job group id '%s' is missing, should never happen.", jobGroupId));
for (StoredJob job : podAwareRebalanceGroup.getGroupIdToJobs().get(jobGroupId)) {
long workerId = job.getWorkerId();
if (!podWorkers.containsKey(workerId)) {
continue;
}
// putIfAbsent to avoid worker accidentally being in 2 different partitions
rebalancingWorkerTable.putIfAbsent(workerId, groupPartitionIdx);
// for other cases, which means the job is not on the correct worker, it will be
// handled in stale job step
}
}
// 2.) "availableWorkers" not in rebalancing table are free to be used where needed
Set<Long> allWorkerIds = new HashSet<>(podWorkers.keySet());
allWorkerIds.removeAll(rebalancingWorkerTable.getAllWorkerIds());
List<Long> availableWorkers = new ArrayList<>(allWorkerIds);
int numberOfPartition = podAwareRebalanceGroup.getNumberOfVirtualPartitions();
// 3.) remove workers from partition if there are too many
freeExtraWorkers(
rebalancingWorkerTable,
workersNeededPerPartition,
numberOfPartition,
rebalancerConfiguration,
availableWorkers);
// 4.) assign new workers to partitions that need them from the pool of available workers
int totalExtraWorkersNeeded = 0;
for (int partitionIdx = 0; partitionIdx < numberOfPartition; partitionIdx++) {
if (workersNeededPerPartition.get(partitionIdx)
> rebalancingWorkerTable.getAllWorkerIdsForPartition(partitionIdx).size()) {
totalExtraWorkersNeeded +=
(workersNeededPerPartition.get(partitionIdx)
- rebalancingWorkerTable.getAllWorkerIdsForPartition(partitionIdx).size());
}
}
roundRobinAssignWorkers(
totalExtraWorkersNeeded,
workersNeededPerPartition,
rebalancingWorkerTable,
availableWorkers,
numberOfPartition);
}