in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [353:391]
private boolean ensureOverloadedJobsMovedToIdleWorker(
RebalancingWorkerTable rebalancingWorkerTable,
RebalancingWorkerWithSortedJobs adjustedWorker,
int toAdjustWorkerIdx,
List<RebalancingWorkerWithSortedJobs> allWorkersInPartition) {
List<RebalancingJob> allJobsInQueue = new ArrayList<>(adjustedWorker.getAllJobs());
allJobsInQueue.sort(RebalancingJob::compareTo);
boolean adjustedLoad = false;
// starting from the most loaded job
for (RebalancingJob toBeMovedJob : allJobsInQueue) {
if (toBeMovedJob.getLoad() <= 1.0) {
// if the job is with load <= 1.0, which means it should considered in #adjustJobsOnWorker
// and we can cancel the adjust
break;
}
long newWorkerId = -1L;
for (int otherWorkerIdx = 0; otherWorkerIdx < toAdjustWorkerIdx; otherWorkerIdx++) {
RebalancingWorkerWithSortedJobs otherWorker = allWorkersInPartition.get(otherWorkerIdx);
if (otherWorker.getNumberOfJobs() == 0) {
newWorkerId = otherWorker.getWorkerId();
break;
}
}
if (newWorkerId != -1L) {
adjustedWorker.removeJob(toBeMovedJob);
rebalancingWorkerTable.getRebalancingWorkerWithSortedJobs(newWorkerId).addJob(toBeMovedJob);
scope.subScope(COLOCATING_REBALANCER_SUB_SCOPE).counter(MetricNames.JOB_MOVEMENT).inc(1);
}
if (adjustedWorker.getNumberOfJobs() == 1) {
adjustedLoad = true;
break;
}
}
return adjustedLoad;
}