private void assignJobsToCorrectVirtualPartition()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [206:243]


  private void assignJobsToCorrectVirtualPartition(
      PodAwareRebalanceGroup podAwareRebalanceGroup,
      Map<String, RebalancingJobGroup> jobGroupMap,
      Map<Long, StaleWorkerReplacement> toBeMovedStaleJobs,
      Map<String, Integer> jobGroupToPartitionMap,
      RebalancingWorkerTable rebalancingWorkerTable) {
    for (Map.Entry<String, List<StoredJob>> entry :
        podAwareRebalanceGroup.getGroupIdToJobs().entrySet()) {
      String jobGroupId = entry.getKey();
      Preconditions.checkArgument(jobGroupToPartitionMap.containsKey(jobGroupId));
      Preconditions.checkArgument(jobGroupMap.containsKey(jobGroupId));

      long partitionIdxForGroup = jobGroupToPartitionMap.get(jobGroupId);

      for (StoredJob job : entry.getValue()) {
        if (job.getState() != JobState.JOB_STATE_RUNNING) {
          continue;
        }

        long currentWorkerId = job.getWorkerId();
        // job is not on the correct worker
        if (!rebalancingWorkerTable.isWorkerIdValid(currentWorkerId)
            || !rebalancingWorkerTable
                .getAllWorkerIdsForPartition(partitionIdxForGroup)
                .contains(currentWorkerId)) {
          toBeMovedStaleJobs.putIfAbsent(
              partitionIdxForGroup, new StaleWorkerReplacement(partitionIdxForGroup));
          toBeMovedStaleJobs
              .get(partitionIdxForGroup)
              .addRebalancingJob(new RebalancingJob(job, jobGroupMap.get(jobGroupId)));
        } else {
          rebalancingWorkerTable
              .getRebalancingWorkerWithSortedJobs(currentWorkerId)
              .addJob(new RebalancingJob(job, jobGroupMap.get(jobGroupId)));
        }
      }
    }
  }