protected static void insertWorkersIntoRebalancingTable()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [186:244]


  protected static void insertWorkersIntoRebalancingTable(
      RpcJobColocatingRebalancer.RebalancingWorkerTable rebalancingWorkerTable,
      List<Integer> workersNeededPerPartition,
      PodAwareRebalanceGroup podAwareRebalanceGroup,
      RebalancerConfiguration rebalancerConfiguration,
      Map<String, Integer> jobGroupToPartitionMap) {
    // 1.) Add all workers for each job group to the rebalancing table, provided workers are still
    // valid
    Map<Long, StoredWorker> podWorkers = podAwareRebalanceGroup.getWorkers();
    for (Map.Entry<String, Integer> entry : jobGroupToPartitionMap.entrySet()) {
      String jobGroupId = entry.getKey();
      int groupPartitionIdx = entry.getValue();
      Preconditions.checkArgument(
          podAwareRebalanceGroup.getGroupIdToJobs().containsKey(jobGroupId),
          String.format("Job group id '%s' is missing, should never happen.", jobGroupId));

      for (StoredJob job : podAwareRebalanceGroup.getGroupIdToJobs().get(jobGroupId)) {
        long workerId = job.getWorkerId();
        if (!podWorkers.containsKey(workerId)) {
          continue;
        }
        // putIfAbsent to avoid worker accidentally being in 2 different partitions
        rebalancingWorkerTable.putIfAbsent(workerId, groupPartitionIdx);
        // for other cases, which means the job is not on the correct worker, it will be
        // handled in stale job step
      }
    }

    // 2.) "availableWorkers" not in rebalancing table are free to be used where needed
    Set<Long> allWorkerIds = new HashSet<>(podWorkers.keySet());
    allWorkerIds.removeAll(rebalancingWorkerTable.getAllWorkerIds());
    List<Long> availableWorkers = new ArrayList<>(allWorkerIds);

    int numberOfPartition = podAwareRebalanceGroup.getNumberOfVirtualPartitions();
    // 3.) remove workers from partition if there are too many
    freeExtraWorkers(
        rebalancingWorkerTable,
        workersNeededPerPartition,
        numberOfPartition,
        rebalancerConfiguration,
        availableWorkers);

    // 4.) assign new workers to partitions that need them from the pool of available workers
    int totalExtraWorkersNeeded = 0;
    for (int partitionIdx = 0; partitionIdx < numberOfPartition; partitionIdx++) {
      if (workersNeededPerPartition.get(partitionIdx)
          > rebalancingWorkerTable.getAllWorkerIdsForPartition(partitionIdx).size()) {
        totalExtraWorkersNeeded +=
            (workersNeededPerPartition.get(partitionIdx)
                - rebalancingWorkerTable.getAllWorkerIdsForPartition(partitionIdx).size());
      }
    }
    roundRobinAssignWorkers(
        totalExtraWorkersNeeded,
        workersNeededPerPartition,
        rebalancingWorkerTable,
        availableWorkers,
        numberOfPartition);
  }