private static List calculateWorkerNeededPerPartition()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [107:168]


  private static List<Integer> calculateWorkerNeededPerPartition(
      PodAwareRebalanceGroup podAwareRebalanceGroup,
      RebalancerConfiguration rebalancerConfiguration,
      final Map<String, Integer> jobGroupToPartitionMap) {
    int numberOfPartition = podAwareRebalanceGroup.getNumberOfVirtualPartitions();
    // calculate the total workload of job group per partition
    List<Integer> overloadedWorkerNeededByPartitionList =
        new ArrayList<>(Collections.nCopies(numberOfPartition, 0));
    List<List<Double>> workloadPerJobByPartitionList = new ArrayList<>();
    for (int idx = 0; idx < numberOfPartition; idx++) {
      workloadPerJobByPartitionList.add(new ArrayList<>());
    }

    List<Integer> jobCountByPartitionList =
        new ArrayList<>(Collections.nCopies(numberOfPartition, 0));
    for (Map.Entry<String, List<StoredJob>> entry :
        podAwareRebalanceGroup.getGroupIdToJobs().entrySet()) {
      String groupId = entry.getKey();
      long hashValue =
          Math.abs(groupId.hashCode() % rebalancerConfiguration.getMaxAssignmentHashValueRange());

      int partition = (int) (hashValue % numberOfPartition);
      jobGroupToPartitionMap.put(groupId, partition);
      for (StoredJob job : entry.getValue()) {
        // for single job with >= 1.0 scale, we need to put in a dedicated worker
        if (job.getScale() >= 1.0) {
          overloadedWorkerNeededByPartitionList.set(
              partition, overloadedWorkerNeededByPartitionList.get(partition) + 1);
        } else {
          jobCountByPartitionList.set(partition, jobCountByPartitionList.get(partition) + 1);
          workloadPerJobByPartitionList.get(partition).add(job.getScale());
        }
      }
    }

    // calculate how many workers are needed per partition based on workload
    List<Integer> workersNeededForPartition = new ArrayList<>();
    for (int idx = 0; idx < numberOfPartition; idx++) {
      int expectedNumberOfWorkerForWorkload =
          getWorkerNumberPerWorkload(workloadPerJobByPartitionList.get(idx));
      int expectedNumberOfWorkerForJobCount =
          (jobCountByPartitionList.get(idx)
                  + rebalancerConfiguration.getMaxJobNumberPerWorker()
                  - 1)
              / rebalancerConfiguration.getMaxJobNumberPerWorker();
      int neededNumberOfWorkerWithoutOverloadWorker =
          Math.max(expectedNumberOfWorkerForWorkload, expectedNumberOfWorkerForJobCount);
      // leave some spare to handle traffic increase
      int neededNumberOfWorker =
          (int)
                  Math.ceil(
                      neededNumberOfWorkerWithoutOverloadWorker
                          * (1
                              + (double) rebalancerConfiguration.getTargetSpareWorkerPercentage()
                                  / 100))
              + overloadedWorkerNeededByPartitionList.get(idx);

      workersNeededForPartition.add(neededNumberOfWorker);
    }

    return workersNeededForPartition;
  }