in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [353:391]
  private boolean ensureOverloadedJobsMovedToIdleWorker(
      RebalancingWorkerTable rebalancingWorkerTable,
      RebalancingWorkerWithSortedJobs adjustedWorker,
      int toAdjustWorkerIdx,
      List<RebalancingWorkerWithSortedJobs> allWorkersInPartition) {
    List<RebalancingJob> allJobsInQueue = new ArrayList<>(adjustedWorker.getAllJobs());
    allJobsInQueue.sort(RebalancingJob::compareTo);
    boolean adjustedLoad = false;
    // starting from the most loaded job
    for (RebalancingJob toBeMovedJob : allJobsInQueue) {
      if (toBeMovedJob.getLoad() <= 1.0) {
        // if the job is with load <= 1.0, which means it should considered in #adjustJobsOnWorker
        // and we can cancel the adjust
        break;
      }
      long newWorkerId = -1L;
      for (int otherWorkerIdx = 0; otherWorkerIdx < toAdjustWorkerIdx; otherWorkerIdx++) {
        RebalancingWorkerWithSortedJobs otherWorker = allWorkersInPartition.get(otherWorkerIdx);
        if (otherWorker.getNumberOfJobs() == 0) {
          newWorkerId = otherWorker.getWorkerId();
          break;
        }
      }
      if (newWorkerId != -1L) {
        adjustedWorker.removeJob(toBeMovedJob);
        rebalancingWorkerTable.getRebalancingWorkerWithSortedJobs(newWorkerId).addJob(toBeMovedJob);
        scope.subScope(COLOCATING_REBALANCER_SUB_SCOPE).counter(MetricNames.JOB_MOVEMENT).inc(1);
      }
      if (adjustedWorker.getNumberOfJobs() == 1) {
        adjustedLoad = true;
        break;
      }
    }
    return adjustedLoad;
  }