static void adjustWorkerCountForPartition()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [214:276]


  static void adjustWorkerCountForPartition(
      RpcJobColocatingRebalancer.RebalancingCache rebalancingCache,
      List<Integer> workersNeededPerPartition,
      final Map<Long, StoredWorker> workerMap,
      int numberOfPartition,
      RebalancerConfiguration rebalancerConfiguration) {
    // consolidate between cache and current worker set
    for (Long cacheWorkerId : rebalancingCache.getAllWorkerIds()) {
      // remove not exiting worker
      if (!workerMap.containsKey(cacheWorkerId)) {
        rebalancingCache.removeWorker(cacheWorkerId);
      }
    }

    List<Long> newWorkers = new ArrayList<>();
    Set<Long> allAvailableWorkerIds = new HashSet<>(workerMap.keySet());
    allAvailableWorkerIds.removeAll(rebalancingCache.getAllWorkerIds());
    allAvailableWorkerIds.forEach(worker -> newWorkers.add(worker));

    for (int parititionIdx = 0; parititionIdx < numberOfPartition; parititionIdx++) {
      // for partitions that have more workers than expected, we graually reduce in batch of 10%
      int diff =
          rebalancingCache.getAllWorkersForPartition(parititionIdx).size()
              - workersNeededPerPartition.get(parititionIdx);
      if (diff >= MINIMUM_WORKER_THRESHOLD) {
        int numberOfWorkersToRemove =
            (int) Math.floor(diff * rebalancerConfiguration.getWorkerToReduceRatio());
        // remove at least 2 workers
        numberOfWorkersToRemove = Math.max(MINIMUM_WORKER_THRESHOLD, numberOfWorkersToRemove);
        logger.info(
            "Need to remove {} workers for partition.",
            numberOfWorkersToRemove,
            StructuredLogging.virtualPartition(parititionIdx),
            StructuredLogging.workloadBasedWorkerCount(
                workersNeededPerPartition.get(parititionIdx)));
        List<Long> toRemoveWorkerIds =
            removeJobsFromLeastLoadedWorkers(
                rebalancingCache, parititionIdx, numberOfWorkersToRemove);
        toRemoveWorkerIds.forEach(rebalancingCache::removeWorker);
        newWorkers.addAll(toRemoveWorkerIds);
        // reset workers needed for this partition to be the same as the current worker size
        workersNeededPerPartition.set(
            parititionIdx, rebalancingCache.getAllWorkerIdsForPartition(parititionIdx).size());
      }
    }

    int totalExtraWorkersNeeded = 0;
    for (int partitionIdx = 0; partitionIdx < numberOfPartition; partitionIdx++) {
      if (workersNeededPerPartition.get(partitionIdx)
          > rebalancingCache.getAllWorkerIdsForPartition(partitionIdx).size()) {
        totalExtraWorkersNeeded +=
            (workersNeededPerPartition.get(partitionIdx)
                - rebalancingCache.getAllWorkerIdsForPartition(partitionIdx).size());
      }
    }

    roundRobinAssignWorkers(
        totalExtraWorkersNeeded,
        workersNeededPerPartition,
        rebalancingCache,
        newWorkers,
        numberOfPartition);
  }