in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [282:338]
private void ensureWorkersLoadBalanced(
RebalancingWorkerTable rebalancingWorkerTable, String pod) {
for (long partitionIdx : rebalancingWorkerTable.getAllPartitions()) {
List<RebalancingWorkerWithSortedJobs> allWorkersInPartition =
rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx);
// TODO: emit metric if the workers within partition is empty
List<RebalancingWorkerWithSortedJobs> allWorkers = new ArrayList<>(allWorkersInPartition);
allWorkers.sort(RebalancingWorkerWithSortedJobs::compareTo);
int numberOfOverloadingWorker = 0;
int numberOfWorkerHittingWorkloadHardLimit = 0;
int numberOfUnadjustedWorker = 0;
// starting from the most loaded worker
for (int workerIdx = allWorkers.size() - 1; workerIdx >= 0; workerIdx--) {
RebalancingWorkerWithSortedJobs rebalancingWorkerWithSortedJobs = allWorkers.get(workerIdx);
if (rebalancingWorkerWithSortedJobs.getNumberOfJobs() == 1) {
// we don't move if the worker only has one job
continue;
}
if (!isWorkerUnderLoadLimit(rebalancingWorkerWithSortedJobs)) {
numberOfOverloadingWorker += 1;
boolean adjustedLoad =
adjustJobsOnWorker(
rebalancingWorkerTable, rebalancingWorkerWithSortedJobs, workerIdx, allWorkers);
if (!adjustedLoad) {
numberOfUnadjustedWorker += 1;
logger.warn(
"Worker is overloaded after adjusting workload.",
StructuredLogging.workerId(rebalancingWorkerWithSortedJobs.getWorkerId()),
StructuredLogging.count(rebalancingWorkerWithSortedJobs.getNumberOfJobs()),
StructuredLogging.workloadScale(rebalancingWorkerWithSortedJobs.getLoad()),
StructuredLogging.virtualPartition(partitionIdx));
if (rebalancingWorkerWithSortedJobs.getLoad() > placementWorkerScaleHardLimit) {
numberOfWorkerHittingWorkloadHardLimit += 1;
}
}
}
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx)))
.gauge(MetricNames.OVERLOAD_WORKER_NUMBER)
.update(numberOfOverloadingWorker);
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx)))
.gauge(MetricNames.UNADJUSTED_WORKLOAD_WORKER)
.update(numberOfUnadjustedWorker);
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx), POD_TAG, pod))
.gauge(MetricNames.HIT_WORKLOAD_HARDLIMIT_WORKER)
.update(numberOfWorkerHittingWorkloadHardLimit);
}
}