in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [283:351]
private void ensureWorkersLoadBalanced(RebalancingWorkerTable rebalancingWorkerTable) {
for (long partitionIdx : rebalancingWorkerTable.getAllPartitions()) {
List<RebalancingWorkerWithSortedJobs> allWorkersInPartition =
rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx);
// TODO: emit metric if the workers within partition is empty
List<RebalancingWorkerWithSortedJobs> allWorkers = new ArrayList<>(allWorkersInPartition);
allWorkers.sort(RebalancingWorkerWithSortedJobs::compareTo);
int numberOfOverloadingWorker = 0;
int numberOfUnadjustedWorker = 0;
// starting from the most loaded worker
for (int workerIdx = allWorkers.size() - 1; workerIdx >= 0; workerIdx--) {
RebalancingWorkerWithSortedJobs rebalancingWorkerWithSortedJobs = allWorkers.get(workerIdx);
if (rebalancingWorkerWithSortedJobs.getNumberOfJobs() == 1) {
// we don't move if the worker only has one job
continue;
}
if (!isWorkerUnderLoadLimit(rebalancingWorkerWithSortedJobs)) {
numberOfOverloadingWorker += 1;
boolean adjustedLoad =
adjustJobsOnWorker(
rebalancingWorkerTable, rebalancingWorkerWithSortedJobs, workerIdx, allWorkers);
if (!adjustedLoad) {
numberOfUnadjustedWorker += 1;
logger.warn(
"Worker is overloaded after adjusting workload.",
StructuredLogging.workerId(rebalancingWorkerWithSortedJobs.getWorkerId()),
StructuredLogging.count(rebalancingWorkerWithSortedJobs.getNumberOfJobs()),
StructuredLogging.workloadScale(rebalancingWorkerWithSortedJobs.getLoad()),
StructuredLogging.virtualPartition(partitionIdx));
}
}
}
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx)))
.gauge(MetricNames.OVERLOAD_WORKER_NUMBER)
.update(numberOfOverloadingWorker);
scope
.subScope(COLOCATING_REBALANCER_SUB_SCOPE)
.tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx)))
.gauge(MetricNames.UNADJUSTED_WORKLOAD_WORKER)
.update(numberOfUnadjustedWorker);
// do a second traverse to make sure we don't have overloaded job(>1.0) on same worker
for (int workerIdx = allWorkers.size() - 1; workerIdx >= 0; workerIdx--) {
RebalancingWorkerWithSortedJobs rebalancingWorkerWithSortedJobs = allWorkers.get(workerIdx);
if (rebalancingWorkerWithSortedJobs.getNumberOfJobs() == 1) {
// we don't move if the worker only has one job
continue;
}
if (!isWorkerUnderLoadLimit(rebalancingWorkerWithSortedJobs)) {
boolean adjustedLoad =
ensureOverloadedJobsMovedToIdleWorker(
rebalancingWorkerTable, rebalancingWorkerWithSortedJobs, workerIdx, allWorkers);
if (!adjustedLoad) {
logger.warn(
"Worker is still overloaded after adjusting large workload.",
StructuredLogging.workerId(rebalancingWorkerWithSortedJobs.getWorkerId()),
StructuredLogging.count(rebalancingWorkerWithSortedJobs.getNumberOfJobs()),
StructuredLogging.workloadScale(rebalancingWorkerWithSortedJobs.getLoad()),
StructuredLogging.virtualPartition(partitionIdx));
}
}
}
}
}