List podifyJobGroupsAndWorkers()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/JobGroupAndWorkerPodifier.java [38:96]


  List<PodAwareRebalanceGroup> podifyJobGroupsAndWorkers(
      final Map<String, RebalancingJobGroup> jobGroupMap, final Map<Long, StoredWorker> workerMap) {
    Map<String, Map<String, List<StoredJob>>> podToJobs = new HashMap<>();
    Map<String, Map<Long, StoredWorker>> podToWorkers = new HashMap<>();

    for (Map.Entry<String, RebalancingJobGroup> jobGroupEntry : jobGroupMap.entrySet()) {
      String jobGroupId = jobGroupEntry.getKey();
      for (StoredJob job : jobGroupEntry.getValue().getJobs().values()) {
        String pod = jobPodPlacementProvider.getJobPod(job);
        podToJobs.putIfAbsent(pod, new HashMap<>());
        podToJobs.get(pod).putIfAbsent(jobGroupId, new ArrayList<>());
        Objects.requireNonNull(podToJobs.get(pod).get(jobGroupId)).add(job);
      }
    }

    for (Map.Entry<Long, StoredWorker> workerEntry : workerMap.entrySet()) {
      String workerPod = jobPodPlacementProvider.getWorkerPod(workerEntry.getValue());
      podToWorkers.putIfAbsent(workerPod, new HashMap<>());
      Objects.requireNonNull(podToWorkers.get(workerPod))
          .put(workerEntry.getKey(), workerEntry.getValue());
    }

    List<PodAwareRebalanceGroup> allRebalanceGroups = new ArrayList<>();

    Map<String, List<StoredJob>> jobGroupsWithoutWorkersInPod = new HashMap<>();

    // find out jobs that don't have workers in the pod
    for (Map.Entry<String, Map<String, List<StoredJob>>> entry : podToJobs.entrySet()) {
      String pod = entry.getKey();
      if (!podToWorkers.containsKey(pod)) {
        logger.warn(
            "Pod doesn't have workers. Distributing to other pods", StructuredLogging.pod(pod));
        scope.tagged(ImmutableMap.of("pod", pod)).counter("pod.without.worker").inc(1);
        jobGroupsWithoutWorkersInPod.putAll(entry.getValue());
      }
    }

    // redistribute these jobs to other pods because they don't have workers in their pods
    if (!jobGroupsWithoutWorkersInPod.isEmpty()) {
      selectFallbackPod(jobGroupsWithoutWorkersInPod, podToWorkers, podToJobs, workerMap.size());
    }

    for (Map.Entry<String, Map<String, List<StoredJob>>> entry : podToJobs.entrySet()) {
      String pod = entry.getKey();
      if (!podToWorkers.containsKey(pod)) {
        continue;
      }

      Map<Long, StoredWorker> workers = podToWorkers.get(pod);
      allRebalanceGroups.add(
          new PodAwareRebalanceGroup(
              pod,
              jobPodPlacementProvider.getNumberOfPartitionsForPod(pod),
              entry.getValue(),
              workers));
    }

    return allRebalanceGroups;
  }