private void ensureWorkersLoadBalanced()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [283:351]


  private void ensureWorkersLoadBalanced(RebalancingWorkerTable rebalancingWorkerTable) {
    for (long partitionIdx : rebalancingWorkerTable.getAllPartitions()) {
      List<RebalancingWorkerWithSortedJobs> allWorkersInPartition =
          rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx);
      // TODO: emit metric if the workers within partition is empty
      List<RebalancingWorkerWithSortedJobs> allWorkers = new ArrayList<>(allWorkersInPartition);
      allWorkers.sort(RebalancingWorkerWithSortedJobs::compareTo);

      int numberOfOverloadingWorker = 0;
      int numberOfUnadjustedWorker = 0;
      // starting from the most loaded worker
      for (int workerIdx = allWorkers.size() - 1; workerIdx >= 0; workerIdx--) {
        RebalancingWorkerWithSortedJobs rebalancingWorkerWithSortedJobs = allWorkers.get(workerIdx);
        if (rebalancingWorkerWithSortedJobs.getNumberOfJobs() == 1) {
          // we don't move if the worker only has one job
          continue;
        }
        if (!isWorkerUnderLoadLimit(rebalancingWorkerWithSortedJobs)) {
          numberOfOverloadingWorker += 1;
          boolean adjustedLoad =
              adjustJobsOnWorker(
                  rebalancingWorkerTable, rebalancingWorkerWithSortedJobs, workerIdx, allWorkers);
          if (!adjustedLoad) {
            numberOfUnadjustedWorker += 1;
            logger.warn(
                "Worker is overloaded after adjusting workload.",
                StructuredLogging.workerId(rebalancingWorkerWithSortedJobs.getWorkerId()),
                StructuredLogging.count(rebalancingWorkerWithSortedJobs.getNumberOfJobs()),
                StructuredLogging.workloadScale(rebalancingWorkerWithSortedJobs.getLoad()),
                StructuredLogging.virtualPartition(partitionIdx));
          }
        }
      }

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx)))
          .gauge(MetricNames.OVERLOAD_WORKER_NUMBER)
          .update(numberOfOverloadingWorker);
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx)))
          .gauge(MetricNames.UNADJUSTED_WORKLOAD_WORKER)
          .update(numberOfUnadjustedWorker);

      // do a second traverse to make sure we don't have overloaded job(>1.0) on same worker
      for (int workerIdx = allWorkers.size() - 1; workerIdx >= 0; workerIdx--) {
        RebalancingWorkerWithSortedJobs rebalancingWorkerWithSortedJobs = allWorkers.get(workerIdx);
        if (rebalancingWorkerWithSortedJobs.getNumberOfJobs() == 1) {
          // we don't move if the worker only has one job
          continue;
        }

        if (!isWorkerUnderLoadLimit(rebalancingWorkerWithSortedJobs)) {
          boolean adjustedLoad =
              ensureOverloadedJobsMovedToIdleWorker(
                  rebalancingWorkerTable, rebalancingWorkerWithSortedJobs, workerIdx, allWorkers);
          if (!adjustedLoad) {
            logger.warn(
                "Worker is still overloaded after adjusting large workload.",
                StructuredLogging.workerId(rebalancingWorkerWithSortedJobs.getWorkerId()),
                StructuredLogging.count(rebalancingWorkerWithSortedJobs.getNumberOfJobs()),
                StructuredLogging.workloadScale(rebalancingWorkerWithSortedJobs.getLoad()),
                StructuredLogging.virtualPartition(partitionIdx));
          }
        }
      }
    }
  }