private Map balanceJobGroupWorker()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/HibernatingJobRebalancer.java [84:163]


  private Map<RebalancingJobGroup, Long> balanceJobGroupWorker(
      Map<RebalancingJobGroup, Long> jobGroupToWorker,
      Set<Long> workerIds,
      List<Integer> placementPlan) {
    Map<RebalancingJobGroup, Long> result = new HashMap<>();
    // compute worker job groups
    Map<Long, List<RebalancingJobGroup>> workerJobGroupMap =
        jobGroupToWorker
            .entrySet()
            .stream()
            .collect(
                Collectors.toMap(
                    entry -> entry.getValue(),
                    entry -> Arrays.asList(entry.getKey()),
                    (list1, list2) ->
                        Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList())));
    workerIds
        .stream()
        .forEach(
            workerId -> workerJobGroupMap.computeIfAbsent(workerId, o -> Collections.EMPTY_LIST));

    // non-empty workers sorted by number of jobGroups in ascending order
    List<Map.Entry<Long, List<RebalancingJobGroup>>> workerJobGroupList =
        workerJobGroupMap
            .entrySet()
            .stream()
            .filter(entry -> entry.getKey() != WorkerUtils.UNSET_WORKER_ID)
            .sorted(Comparator.comparingInt(entry -> entry.getValue().size()))
            .collect(Collectors.toList());

    int nSpareWorkers = workerJobGroupList.size() - placementPlan.size();
    // free spare workers
    List<RebalancingJobGroup> freeJobGroups =
        workerJobGroupList
            .subList(0, nSpareWorkers)
            .stream()
            .map(entry -> entry.getValue())
            .flatMap(list -> list.stream())
            .collect(Collectors.toList());
    // include all unassigned job groups
    freeJobGroups.addAll(
        workerJobGroupMap.getOrDefault(WorkerUtils.UNSET_WORKER_ID, Collections.EMPTY_LIST));

    workerJobGroupList = workerJobGroupList.subList(nSpareWorkers, workerJobGroupList.size());

    // switch to mutable list
    workerJobGroupList.stream().forEach(entry -> entry.setValue(new ArrayList<>(entry.getValue())));

    // make sure worker under load
    for (int i = 0; i < placementPlan.size(); ++i) {
      Map.Entry<Long, List<RebalancingJobGroup>> entry = workerJobGroupList.get(i);
      int expectedSize = placementPlan.get(i);
      int actualSize = entry.getValue().size();
      if (actualSize > expectedSize) {
        List<RebalancingJobGroup> jobGroups = entry.getValue().subList(0, expectedSize);
        freeJobGroups.addAll(entry.getValue().subList(expectedSize, actualSize));
        entry.setValue(jobGroups);
      }
    }

    // assign free work load to worker
    LinkedList<RebalancingJobGroup> linkFreeJobGroups = new LinkedList<>(freeJobGroups);
    for (int i = 0; i < placementPlan.size(); ++i) {
      Map.Entry<Long, List<RebalancingJobGroup>> entry = workerJobGroupList.get(i);
      int expectedSize = placementPlan.get(i);
      int actualSize = entry.getValue().size();
      for (int j = 0; j < expectedSize - actualSize; ++j) {
        entry.getValue().add(linkFreeJobGroups.poll());
      }
    }

    // collect result
    workerJobGroupList
        .stream()
        .forEach(
            entry -> entry.getValue().stream().forEach(group -> result.put(group, entry.getKey())));

    return result;
  }