in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RebalancerCommon.java [128:190]
private static List<Integer> calculateWorkerNeededPerPartition(
RebalancerConfiguration rebalancerConfiguration,
final Map<String, Integer> jobGroupToPartitionMap,
final Map<String, RebalancingJobGroup> jobGroupMap,
final Map<Long, StoredWorker> workerMap) {
int numberOfPartition = rebalancerConfiguration.getNumberOfVirtualPartitions();
// calculate the total workload of job group per partition
List<Integer> overloadedWorkerNeededByPartitionList =
new ArrayList<>(Collections.nCopies(numberOfPartition, 0));
List<List<Double>> workloadPerJobByPartitionList = new ArrayList<>();
for (int idx = 0; idx < numberOfPartition; idx++) {
workloadPerJobByPartitionList.add(new ArrayList<>());
}
List<Integer> jobCountByPartitionList =
new ArrayList<>(Collections.nCopies(numberOfPartition, 0));
for (RebalancingJobGroup jobGroup : jobGroupMap.values()) {
long hashValue =
Math.abs(
jobGroup.getJobGroup().getJobGroupId().hashCode()
% rebalancerConfiguration.getMaxAssignmentHashValueRange());
int partitionIdx = (int) (hashValue % numberOfPartition);
jobGroupToPartitionMap.put(jobGroup.getJobGroup().getJobGroupId(), partitionIdx);
for (StoredJob job : jobGroup.getJobs().values()) {
// for single job with >= 1.0 scale, we need to put in a dedicated worker
if (job.getScale() >= 1.0) {
overloadedWorkerNeededByPartitionList.set(
partitionIdx, overloadedWorkerNeededByPartitionList.get(partitionIdx) + 1);
} else {
jobCountByPartitionList.set(partitionIdx, jobCountByPartitionList.get(partitionIdx) + 1);
workloadPerJobByPartitionList.get(partitionIdx).add(job.getScale());
}
}
}
// calculate how many workers are needed per partition based on workload
List<Integer> workersNeededForPartition = new ArrayList<>();
for (int idx = 0; idx < numberOfPartition; idx++) {
int expectedNumberOfWorkerForWorkload =
getWorkerNumberPerWorkload(workloadPerJobByPartitionList.get(idx));
int expectedNumberOfWorkerForJobCount =
(jobCountByPartitionList.get(idx)
+ rebalancerConfiguration.getMaxJobNumberPerWorker()
- 1)
/ rebalancerConfiguration.getMaxJobNumberPerWorker();
int neededNumberOfWorkerWithoutOverloadWorker =
Math.max(expectedNumberOfWorkerForWorkload, expectedNumberOfWorkerForJobCount);
// leave some spare to handle traffic increase
int neededNumberOfWorker =
(int)
Math.ceil(
neededNumberOfWorkerWithoutOverloadWorker
* (1
+ (double) rebalancerConfiguration.getTargetSpareWorkerPercentage()
/ 100))
+ overloadedWorkerNeededByPartitionList.get(idx);
workersNeededForPartition.add(neededNumberOfWorker);
}
return workersNeededForPartition;
}