in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [121:155]
private void assignJobsToCorrectVirtualPartition(
Map<String, RebalancingJobGroup> jobGroupMap,
List<StaleWorkerReplacement> toBeMovedStaleJobs,
Map<String, Integer> jobGroupToPartitionMap) {
for (RebalancingJobGroup jobGroup : jobGroupMap.values()) {
String jobGroupId = jobGroup.getJobGroup().getJobGroupId();
Preconditions.checkArgument(jobGroupToPartitionMap.containsKey(jobGroupId));
long partitionIdx = jobGroupToPartitionMap.get(jobGroupId);
List<StoredJob> allJobs = new ArrayList<>(jobGroup.getJobs().values());
StaleWorkerReplacement staleWorkerReplacement =
new StaleWorkerReplacement(jobGroup, partitionIdx);
for (StoredJob job : allJobs) {
if (job.getState() != JobState.JOB_STATE_RUNNING) {
continue;
}
long currentWorkerId = job.getWorkerId();
// job is not on the correct worker
if (!rebalancingCache.isWorkerIdValid(currentWorkerId)
|| !rebalancingCache
.getAllWorkerIdsForPartition(partitionIdx)
.contains(currentWorkerId)) {
staleWorkerReplacement.addStoredJob(job);
} else {
rebalancingCache
.getRebalancingWorkerWithSortedJobs(currentWorkerId)
.addJob(new RebalancingJob(job, jobGroup));
}
}
if (staleWorkerReplacement.storedJobs.size() != 0) {
toBeMovedStaleJobs.add(staleWorkerReplacement);
}
}
}