in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [214:276]
static void adjustWorkerCountForPartition(
RpcJobColocatingRebalancer.RebalancingCache rebalancingCache,
List<Integer> workersNeededPerPartition,
final Map<Long, StoredWorker> workerMap,
int numberOfPartition,
RebalancerConfiguration rebalancerConfiguration) {
// consolidate between cache and current worker set
for (Long cacheWorkerId : rebalancingCache.getAllWorkerIds()) {
// remove not exiting worker
if (!workerMap.containsKey(cacheWorkerId)) {
rebalancingCache.removeWorker(cacheWorkerId);
}
}
List<Long> newWorkers = new ArrayList<>();
Set<Long> allAvailableWorkerIds = new HashSet<>(workerMap.keySet());
allAvailableWorkerIds.removeAll(rebalancingCache.getAllWorkerIds());
allAvailableWorkerIds.forEach(worker -> newWorkers.add(worker));
for (int parititionIdx = 0; parititionIdx < numberOfPartition; parititionIdx++) {
// for partitions that have more workers than expected, we graually reduce in batch of 10%
int diff =
rebalancingCache.getAllWorkersForPartition(parititionIdx).size()
- workersNeededPerPartition.get(parititionIdx);
if (diff >= MINIMUM_WORKER_THRESHOLD) {
int numberOfWorkersToRemove =
(int) Math.floor(diff * rebalancerConfiguration.getWorkerToReduceRatio());
// remove at least 2 workers
numberOfWorkersToRemove = Math.max(MINIMUM_WORKER_THRESHOLD, numberOfWorkersToRemove);
logger.info(
"Need to remove {} workers for partition.",
numberOfWorkersToRemove,
StructuredLogging.virtualPartition(parititionIdx),
StructuredLogging.workloadBasedWorkerCount(
workersNeededPerPartition.get(parititionIdx)));
List<Long> toRemoveWorkerIds =
removeJobsFromLeastLoadedWorkers(
rebalancingCache, parititionIdx, numberOfWorkersToRemove);
toRemoveWorkerIds.forEach(rebalancingCache::removeWorker);
newWorkers.addAll(toRemoveWorkerIds);
// reset workers needed for this partition to be the same as the current worker size
workersNeededPerPartition.set(
parititionIdx, rebalancingCache.getAllWorkerIdsForPartition(parititionIdx).size());
}
}
int totalExtraWorkersNeeded = 0;
for (int partitionIdx = 0; partitionIdx < numberOfPartition; partitionIdx++) {
if (workersNeededPerPartition.get(partitionIdx)
> rebalancingCache.getAllWorkerIdsForPartition(partitionIdx).size()) {
totalExtraWorkersNeeded +=
(workersNeededPerPartition.get(partitionIdx)
- rebalancingCache.getAllWorkerIdsForPartition(partitionIdx).size());
}
}
roundRobinAssignWorkers(
totalExtraWorkersNeeded,
workersNeededPerPartition,
rebalancingCache,
newWorkers,
numberOfPartition);
}