in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/JobGroupAndWorkerPodifier.java [38:96]
List<PodAwareRebalanceGroup> podifyJobGroupsAndWorkers(
final Map<String, RebalancingJobGroup> jobGroupMap, final Map<Long, StoredWorker> workerMap) {
Map<String, Map<String, List<StoredJob>>> podToJobs = new HashMap<>();
Map<String, Map<Long, StoredWorker>> podToWorkers = new HashMap<>();
for (Map.Entry<String, RebalancingJobGroup> jobGroupEntry : jobGroupMap.entrySet()) {
String jobGroupId = jobGroupEntry.getKey();
for (StoredJob job : jobGroupEntry.getValue().getJobs().values()) {
String pod = jobPodPlacementProvider.getJobPod(job);
podToJobs.putIfAbsent(pod, new HashMap<>());
podToJobs.get(pod).putIfAbsent(jobGroupId, new ArrayList<>());
Objects.requireNonNull(podToJobs.get(pod).get(jobGroupId)).add(job);
}
}
for (Map.Entry<Long, StoredWorker> workerEntry : workerMap.entrySet()) {
String workerPod = jobPodPlacementProvider.getWorkerPod(workerEntry.getValue());
podToWorkers.putIfAbsent(workerPod, new HashMap<>());
Objects.requireNonNull(podToWorkers.get(workerPod))
.put(workerEntry.getKey(), workerEntry.getValue());
}
List<PodAwareRebalanceGroup> allRebalanceGroups = new ArrayList<>();
Map<String, List<StoredJob>> jobGroupsWithoutWorkersInPod = new HashMap<>();
// find out jobs that don't have workers in the pod
for (Map.Entry<String, Map<String, List<StoredJob>>> entry : podToJobs.entrySet()) {
String pod = entry.getKey();
if (!podToWorkers.containsKey(pod)) {
logger.warn(
"Pod doesn't have workers. Distributing to other pods", StructuredLogging.pod(pod));
scope.tagged(ImmutableMap.of("pod", pod)).counter("pod.without.worker").inc(1);
jobGroupsWithoutWorkersInPod.putAll(entry.getValue());
}
}
// redistribute these jobs to other pods because they don't have workers in their pods
if (!jobGroupsWithoutWorkersInPod.isEmpty()) {
selectFallbackPod(jobGroupsWithoutWorkersInPod, podToWorkers, podToJobs, workerMap.size());
}
for (Map.Entry<String, Map<String, List<StoredJob>>> entry : podToJobs.entrySet()) {
String pod = entry.getKey();
if (!podToWorkers.containsKey(pod)) {
continue;
}
Map<Long, StoredWorker> workers = podToWorkers.get(pod);
allRebalanceGroups.add(
new PodAwareRebalanceGroup(
pod,
jobPodPlacementProvider.getNumberOfPartitionsForPod(pod),
entry.getValue(),
workers));
}
return allRebalanceGroups;
}