in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [206:243]
private void assignJobsToCorrectVirtualPartition(
PodAwareRebalanceGroup podAwareRebalanceGroup,
Map<String, RebalancingJobGroup> jobGroupMap,
Map<Long, StaleWorkerReplacement> toBeMovedStaleJobs,
Map<String, Integer> jobGroupToPartitionMap,
RebalancingWorkerTable rebalancingWorkerTable) {
for (Map.Entry<String, List<StoredJob>> entry :
podAwareRebalanceGroup.getGroupIdToJobs().entrySet()) {
String jobGroupId = entry.getKey();
Preconditions.checkArgument(jobGroupToPartitionMap.containsKey(jobGroupId));
Preconditions.checkArgument(jobGroupMap.containsKey(jobGroupId));
long partitionIdxForGroup = jobGroupToPartitionMap.get(jobGroupId);
for (StoredJob job : entry.getValue()) {
if (job.getState() != JobState.JOB_STATE_RUNNING) {
continue;
}
long currentWorkerId = job.getWorkerId();
// job is not on the correct worker
if (!rebalancingWorkerTable.isWorkerIdValid(currentWorkerId)
|| !rebalancingWorkerTable
.getAllWorkerIdsForPartition(partitionIdxForGroup)
.contains(currentWorkerId)) {
toBeMovedStaleJobs.putIfAbsent(
partitionIdxForGroup, new StaleWorkerReplacement(partitionIdxForGroup));
toBeMovedStaleJobs
.get(partitionIdxForGroup)
.addRebalancingJob(new RebalancingJob(job, jobGroupMap.get(jobGroupId)));
} else {
rebalancingWorkerTable
.getRebalancingWorkerWithSortedJobs(currentWorkerId)
.addJob(new RebalancingJob(job, jobGroupMap.get(jobGroupId)));
}
}
}
}