private Map ensureWorkerUnderloaded()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/AbstractRpcUriRebalancer.java [480:527]


  private Map<Long, Long> ensureWorkerUnderloaded(
      HashBasedTable<Long, Long, RebalancingJob> table) {

    // calculate mapping from jobId to overloaded worker
    Map<Long, Double> workerToLoad = new HashMap<>();
    Map<Long, Long> result = new HashMap<>();
    for (long jobId : table.rowKeySet()) {
      Map<Long, RebalancingJob> row = table.row(jobId);
      long workerId = row.keySet().iterator().next();
      if (workerId == WorkerUtils.UNSET_WORKER_ID) {
        continue;
      }
      RebalancingJob job = row.get(workerId);
      Preconditions.checkNotNull(
          job,
          "rebalancingJob should not be null because Guava table guarantees >=1 column (worker_id) per row");
      if (workerToLoad.containsKey(workerId)) {
        double load = workerToLoad.get(workerId) + job.getLoad();
        if (load < CAPACITY_PER_WORKER) {
          workerToLoad.put(workerId, load);
        } else {
          result.put(jobId, workerId);
        }
      } else {
        // pin first job to its worker no matter it's overloaded or not
        // to stabilize assignment of over-sized job and prevent cascading re-assign
        workerToLoad.put(workerId, job.getLoad());
      }
    }

    // unpin job from overloaded worker
    for (Map.Entry<Long, Long> jobWorker : result.entrySet()) {
      long jobId = jobWorker.getKey();
      long workerId = jobWorker.getValue();
      RebalancingJob job = table.get(jobId, workerId);
      table.remove(jobId, workerId);
      table.put(jobWorker.getKey(), WorkerUtils.UNSET_WORKER_ID, job);
      // Unpin job from worker, but if spare worker is not enough, we will recover the mapping by
      // func ensureJobsAssigned
      // Anyway, it will trigger status change of job group
      // worker who picked up status change will compare existing job group and new job group to
      // calculate new job to run or to cancel
      // see AbstractKafkaFetcherThread::extractTopicPartitionMap
      job.setWorkerId(WorkerUtils.UNSET_WORKER_ID);
    }

    return ImmutableMap.copyOf(result);
  }