in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [142:204]
public void computeJobConfiguration(
final Map<String, RebalancingJobGroup> jobGroups, final Map<Long, StoredWorker> workers)
throws Exception {
List<PodAwareRebalanceGroup> podAwareRebalanceGroups =
jobGroupAndWorkerPodifier.podifyJobGroupsAndWorkers(jobGroups, workers);
for (PodAwareRebalanceGroup podAwareRebalanceGroup : podAwareRebalanceGroups) {
String pod = podAwareRebalanceGroup.getPod();
Optional<Double> maybeFlowControlRatio =
jobPodPlacementProvider.getMaybeReservedFlowControlRatioForPods(pod);
for (Map.Entry<String, List<StoredJob>> groupIdToJobsPair :
podAwareRebalanceGroup.getGroupIdToJobs().entrySet()) {
RebalancingJobGroup rebalancingJobGroup = jobGroups.get(groupIdToJobsPair.getKey());
Preconditions.checkNotNull(rebalancingJobGroup);
List<StoredJob> allJobsInPod = groupIdToJobsPair.getValue();
if (allJobsInPod.isEmpty()) {
// this will not happen, but warn just in case
logger.warn("No jobs for pod", StructuredLogging.jobPod(pod));
continue;
}
double flowControlRatio = maybeFlowControlRatio.orElse(1.0);
double scalePerJobInPod =
rebalancingJobGroup.getScale().orElse(Scalar.ZERO)
* flowControlRatio
/ allJobsInPod.size();
double messagePerSecondPerJobInPod =
rebalancingJobGroup.getJobGroup().getFlowControl().getMessagesPerSec()
* flowControlRatio
/ allJobsInPod.size();
double bytePerSecondPerJobInPod =
rebalancingJobGroup.getJobGroup().getFlowControl().getBytesPerSec()
* flowControlRatio
/ allJobsInPod.size();
double maxInflightPerJobInPod =
rebalancingJobGroup.getJobGroup().getFlowControl().getMaxInflightMessages()
* flowControlRatio
/ allJobsInPod.size();
FlowControl flowControlForJobInPod =
FlowControl.newBuilder()
.setBytesPerSec(bytePerSecondPerJobInPod)
.setMessagesPerSec(messagePerSecondPerJobInPod)
.setMaxInflightMessages(maxInflightPerJobInPod)
.build();
for (StoredJob job : allJobsInPod) {
rebalancingJobGroup.updateJob(
job.getJob().getJobId(),
job.toBuilder()
.setJob(
// use the job util that merges a new job group with the old job.
Rebalancer.mergeJobGroupAndJob(
rebalancingJobGroup.getJobGroup(), job.getJob())
.setFlowControl(
flowControlForJobInPod) // override per partition flow control
.build())
.setScale(scalePerJobInPod)
.build());
}
}
}
}