in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/ShadowRebalancerDelegateImpl.java [72:105]
  private void consolidateJobGroupWithCacheBeforeRebalance(
      final Map<String, RebalancingJobGroup> jobGroups, Set<Long> existingWorkers) {
    // pre rebalance step, we need to apply the last computed and cached worker id into the input
    // jobGroups without changing other parameters
    for (Map.Entry<String, RebalancingJobGroup> jobGroupEntry : jobGroups.entrySet()) {
      String jobGroupId = jobGroupEntry.getKey();
      RebalancingJobGroup jobGroup = jobGroupEntry.getValue();
      if (!cachedJobGroupStatus.containsKey(jobGroupId)) {
        // this is a new job group, there is no-op here because the worker is already set to -1 in
        // JobManager
        continue;
      }
      Map<Long, StoredJob> allJobs = jobGroupEntry.getValue().getJobs();
      Map<Long, Long> cachedJobs = cachedJobGroupStatus.get(jobGroupId);
      for (Map.Entry<Long, StoredJob> jobEntry : allJobs.entrySet()) {
        Long jobId = jobEntry.getKey();
        if (!cachedJobs.containsKey(jobId)) {
          // this is a new job
          continue;
        }
        if (!existingWorkers.contains(cachedJobs.get(jobId))) {
          // worker no longer exist
          continue;
        }
        // update the last computed worker id to the job
        jobGroup.updateJob(
            jobId, jobEntry.getValue().toBuilder().setWorkerId(cachedJobs.get(jobId)).build());
      }
    }
  }