private boolean adjustJobsOnWorker()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [393:433]


  private boolean adjustJobsOnWorker(
      RebalancingWorkerTable rebalancingWorkerTable,
      RebalancingWorkerWithSortedJobs adjustedWorker,
      int toAdjustWorkerIdx,
      List<RebalancingWorkerWithSortedJobs> allWorkersInPartition) {
    List<RebalancingJob> allJobsInQueue = new ArrayList<>(adjustedWorker.getAllJobs());
    allJobsInQueue.sort(RebalancingJob::compareTo);

    boolean adjustedLoad = false;
    // starting from the most loaded job
    for (RebalancingJob toBeMovedJob : allJobsInQueue) {
      long newWorkerId = -1L;
      for (int otherWorkerIdx = 0; otherWorkerIdx < toAdjustWorkerIdx; otherWorkerIdx++) {
        RebalancingWorkerWithSortedJobs otherWorker = allWorkersInPartition.get(otherWorkerIdx);
        if (otherWorker.getLoad() + toBeMovedJob.getLoad() <= placementWorkerScaleHardLimit
            && otherWorker.getNumberOfJobs() + 1
                <= rebalancerConfiguration.getMaxJobNumberPerWorker()) {
          newWorkerId = otherWorker.getWorkerId();
          break;
        }
      }
      // if we can't find the worker, it means:
      // 1. there is no worker that can hold the new workload if all workers already have more
      // workload than expected
      // 2. there is no worker that can hold the new workload if all workers already have more jobs
      // than expected
      // in either case, the rest of the workers will exceed the capacity
      // and we shouldn't move the jobs to overload other workers.
      if (newWorkerId != -1L) {
        adjustedWorker.removeJob(toBeMovedJob);
        rebalancingWorkerTable.getRebalancingWorkerWithSortedJobs(newWorkerId).addJob(toBeMovedJob);
        scope.subScope(COLOCATING_REBALANCER_SUB_SCOPE).counter(MetricNames.JOB_MOVEMENT).inc(1);
      }

      if (isWorkerUnderLoadLimit(adjustedWorker)) {
        adjustedLoad = true;
        break;
      }
    }
    return adjustedLoad;
  }