in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [107:168]
private static List<Integer> calculateWorkerNeededPerPartition(
PodAwareRebalanceGroup podAwareRebalanceGroup,
RebalancerConfiguration rebalancerConfiguration,
final Map<String, Integer> jobGroupToPartitionMap) {
int numberOfPartition = podAwareRebalanceGroup.getNumberOfVirtualPartitions();
// calculate the total workload of job group per partition
List<Integer> overloadedWorkerNeededByPartitionList =
new ArrayList<>(Collections.nCopies(numberOfPartition, 0));
List<List<Double>> workloadPerJobByPartitionList = new ArrayList<>();
for (int idx = 0; idx < numberOfPartition; idx++) {
workloadPerJobByPartitionList.add(new ArrayList<>());
}
List<Integer> jobCountByPartitionList =
new ArrayList<>(Collections.nCopies(numberOfPartition, 0));
for (Map.Entry<String, List<StoredJob>> entry :
podAwareRebalanceGroup.getGroupIdToJobs().entrySet()) {
String groupId = entry.getKey();
long hashValue =
Math.abs(groupId.hashCode() % rebalancerConfiguration.getMaxAssignmentHashValueRange());
int partition = (int) (hashValue % numberOfPartition);
jobGroupToPartitionMap.put(groupId, partition);
for (StoredJob job : entry.getValue()) {
// for single job with >= 1.0 scale, we need to put in a dedicated worker
if (job.getScale() >= 1.0) {
overloadedWorkerNeededByPartitionList.set(
partition, overloadedWorkerNeededByPartitionList.get(partition) + 1);
} else {
jobCountByPartitionList.set(partition, jobCountByPartitionList.get(partition) + 1);
workloadPerJobByPartitionList.get(partition).add(job.getScale());
}
}
}
// calculate how many workers are needed per partition based on workload
List<Integer> workersNeededForPartition = new ArrayList<>();
for (int idx = 0; idx < numberOfPartition; idx++) {
int expectedNumberOfWorkerForWorkload =
getWorkerNumberPerWorkload(workloadPerJobByPartitionList.get(idx));
int expectedNumberOfWorkerForJobCount =
(jobCountByPartitionList.get(idx)
+ rebalancerConfiguration.getMaxJobNumberPerWorker()
- 1)
/ rebalancerConfiguration.getMaxJobNumberPerWorker();
int neededNumberOfWorkerWithoutOverloadWorker =
Math.max(expectedNumberOfWorkerForWorkload, expectedNumberOfWorkerForJobCount);
// leave some spare to handle traffic increase
int neededNumberOfWorker =
(int)
Math.ceil(
neededNumberOfWorkerWithoutOverloadWorker
* (1
+ (double) rebalancerConfiguration.getTargetSpareWorkerPercentage()
/ 100))
+ overloadedWorkerNeededByPartitionList.get(idx);
workersNeededForPartition.add(neededNumberOfWorker);
}
return workersNeededForPartition;
}