public void computeJobConfiguration()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [142:204]


  public void computeJobConfiguration(
      final Map<String, RebalancingJobGroup> jobGroups, final Map<Long, StoredWorker> workers)
      throws Exception {
    List<PodAwareRebalanceGroup> podAwareRebalanceGroups =
        jobGroupAndWorkerPodifier.podifyJobGroupsAndWorkers(jobGroups, workers);

    for (PodAwareRebalanceGroup podAwareRebalanceGroup : podAwareRebalanceGroups) {
      String pod = podAwareRebalanceGroup.getPod();
      Optional<Double> maybeFlowControlRatio =
          jobPodPlacementProvider.getMaybeReservedFlowControlRatioForPods(pod);
      for (Map.Entry<String, List<StoredJob>> groupIdToJobsPair :
          podAwareRebalanceGroup.getGroupIdToJobs().entrySet()) {
        RebalancingJobGroup rebalancingJobGroup = jobGroups.get(groupIdToJobsPair.getKey());
        Preconditions.checkNotNull(rebalancingJobGroup);
        List<StoredJob> allJobsInPod = groupIdToJobsPair.getValue();
        if (allJobsInPod.isEmpty()) {
          // this will not happen, but warn just in case
          logger.warn("No jobs for pod", StructuredLogging.jobPod(pod));
          continue;
        }

        double flowControlRatio = maybeFlowControlRatio.orElse(1.0);
        double scalePerJobInPod =
            rebalancingJobGroup.getScale().orElse(Scalar.ZERO)
                * flowControlRatio
                / allJobsInPod.size();
        double messagePerSecondPerJobInPod =
            rebalancingJobGroup.getJobGroup().getFlowControl().getMessagesPerSec()
                * flowControlRatio
                / allJobsInPod.size();
        double bytePerSecondPerJobInPod =
            rebalancingJobGroup.getJobGroup().getFlowControl().getBytesPerSec()
                * flowControlRatio
                / allJobsInPod.size();
        double maxInflightPerJobInPod =
            rebalancingJobGroup.getJobGroup().getFlowControl().getMaxInflightMessages()
                * flowControlRatio
                / allJobsInPod.size();

        FlowControl flowControlForJobInPod =
            FlowControl.newBuilder()
                .setBytesPerSec(bytePerSecondPerJobInPod)
                .setMessagesPerSec(messagePerSecondPerJobInPod)
                .setMaxInflightMessages(maxInflightPerJobInPod)
                .build();

        for (StoredJob job : allJobsInPod) {
          rebalancingJobGroup.updateJob(
              job.getJob().getJobId(),
              job.toBuilder()
                  .setJob(
                      // use the job util that merges a new job group with the old job.
                      Rebalancer.mergeJobGroupAndJob(
                              rebalancingJobGroup.getJobGroup(), job.getJob())
                          .setFlowControl(
                              flowControlForJobInPod) // override per partition flow control
                          .build())
                  .setScale(scalePerJobInPod)
                  .build());
        }
      }
    }
  }