in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/ShadowRebalancerDelegateImpl.java [72:105]
private void consolidateJobGroupWithCacheBeforeRebalance(
final Map<String, RebalancingJobGroup> jobGroups, Set<Long> existingWorkers) {
// pre rebalance step, we need to apply the last computed and cached worker id into the input
// jobGroups without changing other parameters
for (Map.Entry<String, RebalancingJobGroup> jobGroupEntry : jobGroups.entrySet()) {
String jobGroupId = jobGroupEntry.getKey();
RebalancingJobGroup jobGroup = jobGroupEntry.getValue();
if (!cachedJobGroupStatus.containsKey(jobGroupId)) {
// this is a new job group, there is no-op here because the worker is already set to -1 in
// JobManager
continue;
}
Map<Long, StoredJob> allJobs = jobGroupEntry.getValue().getJobs();
Map<Long, Long> cachedJobs = cachedJobGroupStatus.get(jobGroupId);
for (Map.Entry<Long, StoredJob> jobEntry : allJobs.entrySet()) {
Long jobId = jobEntry.getKey();
if (!cachedJobs.containsKey(jobId)) {
// this is a new job
continue;
}
if (!existingWorkers.contains(cachedJobs.get(jobId))) {
// worker no longer exist
continue;
}
// update the last computed worker id to the job
jobGroup.updateJob(
jobId, jobEntry.getValue().toBuilder().setWorkerId(cachedJobs.get(jobId)).build());
}
}
}