private void minimizeWorkers()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/AbstractRpcUriRebalancer.java [203:264]


  private void minimizeWorkers(HashBasedTable<Long, Long, RebalancingJob> table) {
    Map<String, Map<Long, Double>> uriToLoadPerWorker = new HashMap<>();
    for (RebalancingJob job : table.values()) {
      String uri = job.getRpcUri();
      uriToLoadPerWorker
          .computeIfAbsent(uri, o -> new HashMap<>())
          .compute(
              job.getWorkerId(),
              (aLong, aDouble) -> aDouble == null ? job.getLoad() : aDouble + job.getLoad());
    }

    Map<Long, Map<Long, RebalancingJob>> jobsPerWorkerId = table.columnMap();
    for (Map<Long, Double> loadPerWorker : uriToLoadPerWorker.values()) {
      if (loadPerWorker.size() <= config.getNumWorkersPerUri()) {
        // do not minimize workers below numWorkersPerUri config.
        continue;
      }
      ImmutableList<Long> workerIdsIncreasingLoad =
          ImmutableList.copyOf(
              ImmutableMap.<Long, Double>builder()
                  .orderEntriesByValue(
                      Double::compare) // WARNING: orderEntriesByValue is marked @Beta
                  .putAll(loadPerWorker)
                  .build()
                  .keySet());
      ImmutableList<Long> workerIdsDecreasingLoad = workerIdsIncreasingLoad.reverse();
      workerIdsIncreasingLoad.forEach(
          thisWorkerId -> {
            // O(n) lookup from max loaded worker to find something to merge
            // the current implementation merges full workers. This could be improved to split a job
            // on
            // a single worker over multiple other workers.
            Double thisLoad = loadPerWorker.get(thisWorkerId);
            Preconditions.checkNotNull(
                thisLoad, "load should exist for worker_id since we built iterator from keyset");
            for (long otherWorkerId : workerIdsDecreasingLoad) {
              if (otherWorkerId == thisWorkerId) {
                // no need to consider otherWorkerIds < thisWorkerId because those have already
                // been shrunk if possible.
                break;
              }
              Double otherLoad = loadPerWorker.get(otherWorkerId);
              Preconditions.checkNotNull(
                  otherLoad, "load should exist for worker_id since we built iterator from keyset");
              if (otherLoad + thisLoad < CAPACITY_PER_WORKER) {
                jobsPerWorkerId
                    .getOrDefault(thisWorkerId, new HashMap<>())
                    .values()
                    .forEach(
                        job -> {
                          job.setWorkerId(otherWorkerId);
                          table.put(job.getJobId(), otherWorkerId, job);
                          table.remove(job.getJobId(), thisWorkerId);
                        });
                loadPerWorker.put(otherWorkerId, otherLoad + thisLoad);
                loadPerWorker.put(thisWorkerId, 0.0);
                break;
              }
            }
          });
    }
  }