in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [393:433]
private boolean adjustJobsOnWorker(
RebalancingWorkerTable rebalancingWorkerTable,
RebalancingWorkerWithSortedJobs adjustedWorker,
int toAdjustWorkerIdx,
List<RebalancingWorkerWithSortedJobs> allWorkersInPartition) {
List<RebalancingJob> allJobsInQueue = new ArrayList<>(adjustedWorker.getAllJobs());
allJobsInQueue.sort(RebalancingJob::compareTo);
boolean adjustedLoad = false;
// starting from the most loaded job
for (RebalancingJob toBeMovedJob : allJobsInQueue) {
long newWorkerId = -1L;
for (int otherWorkerIdx = 0; otherWorkerIdx < toAdjustWorkerIdx; otherWorkerIdx++) {
RebalancingWorkerWithSortedJobs otherWorker = allWorkersInPartition.get(otherWorkerIdx);
if (otherWorker.getLoad() + toBeMovedJob.getLoad() <= placementWorkerScaleHardLimit
&& otherWorker.getNumberOfJobs() + 1
<= rebalancerConfiguration.getMaxJobNumberPerWorker()) {
newWorkerId = otherWorker.getWorkerId();
break;
}
}
// if we can't find the worker, it means:
// 1. there is no worker that can hold the new workload if all workers already have more
// workload than expected
// 2. there is no worker that can hold the new workload if all workers already have more jobs
// than expected
// in either case, the rest of the workers will exceed the capacity
// and we shouldn't move the jobs to overload other workers.
if (newWorkerId != -1L) {
adjustedWorker.removeJob(toBeMovedJob);
rebalancingWorkerTable.getRebalancingWorkerWithSortedJobs(newWorkerId).addJob(toBeMovedJob);
scope.subScope(COLOCATING_REBALANCER_SUB_SCOPE).counter(MetricNames.JOB_MOVEMENT).inc(1);
}
if (isWorkerUnderLoadLimit(adjustedWorker)) {
adjustedLoad = true;
break;
}
}
return adjustedLoad;
}