private void assignJobsToCorrectVirtualPartition()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [121:155]


  private void assignJobsToCorrectVirtualPartition(
      Map<String, RebalancingJobGroup> jobGroupMap,
      List<StaleWorkerReplacement> toBeMovedStaleJobs,
      Map<String, Integer> jobGroupToPartitionMap) {
    for (RebalancingJobGroup jobGroup : jobGroupMap.values()) {
      String jobGroupId = jobGroup.getJobGroup().getJobGroupId();
      Preconditions.checkArgument(jobGroupToPartitionMap.containsKey(jobGroupId));
      long partitionIdx = jobGroupToPartitionMap.get(jobGroupId);

      List<StoredJob> allJobs = new ArrayList<>(jobGroup.getJobs().values());
      StaleWorkerReplacement staleWorkerReplacement =
          new StaleWorkerReplacement(jobGroup, partitionIdx);
      for (StoredJob job : allJobs) {
        if (job.getState() != JobState.JOB_STATE_RUNNING) {
          continue;
        }

        long currentWorkerId = job.getWorkerId();
        // job is not on the correct worker
        if (!rebalancingCache.isWorkerIdValid(currentWorkerId)
            || !rebalancingCache
                .getAllWorkerIdsForPartition(partitionIdx)
                .contains(currentWorkerId)) {
          staleWorkerReplacement.addStoredJob(job);
        } else {
          rebalancingCache
              .getRebalancingWorkerWithSortedJobs(currentWorkerId)
              .addJob(new RebalancingJob(job, jobGroup));
        }
      }
      if (staleWorkerReplacement.storedJobs.size() != 0) {
        toBeMovedStaleJobs.add(staleWorkerReplacement);
      }
    }
  }