in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/AdminRestletResource.java [92:132]
private ControllerWorkloadInfo calculateWorkloadInfo() {
ControllerWorkloadInfo controllerWorkloadInfo = new ControllerWorkloadInfo();
PriorityQueue<InstanceTopicPartitionHolder> itphList = _helixMirrorMakerManager.getCurrentServingInstance();
int numberOfLaggingWorkers = 0;
boolean workloadRetrieverInitialized = _helixMirrorMakerManager.getWorkloadInfoRetriever().isInitialized();
while (!itphList.isEmpty()) {
InstanceTopicPartitionHolder itph = itphList.poll();
WorkerWorkloadInfo workerWorkload = new WorkerWorkloadInfo();
Set<TopicPartition> topicPartitionSet = itph.getServingTopicPartitionSet();
workerWorkload.setWorkerId(itph.getInstanceName());
workerWorkload.setNumberOfTopicPartitions(topicPartitionSet.size());
for (TopicPartition topicPartition : topicPartitionSet) {
if (workloadRetrieverInitialized) {
TopicWorkload topicWorkload = _helixMirrorMakerManager.getWorkloadInfoRetriever().topicWorkload(topicPartition.getTopic());
TopicWorkload partitionWorkLoad = new TopicWorkload(topicWorkload.getBytesPerSecondPerPartition(), topicWorkload.getMsgsPerSecondPerPartition(), 1);
workerWorkload.getTotalWorkload().add(partitionWorkLoad);
} else {
workerWorkload.setTotalWorkload(null);
}
TopicPartitionLag tpl = _helixMirrorMakerManager.calculateLagTime(topicPartition);
if (tpl != null) {
workerWorkload.getLaggingTopicPartition().add(tpl);
}
}
if (workerWorkload.getTotalWorkload() != null) {
controllerWorkloadInfo.getTopicWorkload().add(workerWorkload.getTotalWorkload());
controllerWorkloadInfo.getWorkerInstances().add(workerWorkload);
}
if (workerWorkload.getLaggingTopicPartition().size() != 0) {
numberOfLaggingWorkers++;
}
}
controllerWorkloadInfo.setNumOfLaggingWorkers(numberOfLaggingWorkers);
int numberOfExpectedWorkers = (int) Math.round(controllerWorkloadInfo.getTopicWorkload().getBytesPerSecond() / _helixMirrorMakerManager.getMaxWorkloadPerWorkerBytes()) + 1;
int numOfDedicatedWorkers = (int) (controllerWorkloadInfo.getWorkerInstances().size() * _helixMirrorMakerManager.getMaxDedicatedInstancesRatio());
int laggingAdditional = numberOfLaggingWorkers < numOfDedicatedWorkers ? 0 : numberOfLaggingWorkers - numOfDedicatedWorkers;
controllerWorkloadInfo.setNumOfExpectedWorkers(numberOfExpectedWorkers + laggingAdditional);
controllerWorkloadInfo.setAutoBalancingEnabled(_helixMirrorMakerManager.isAutoBalancingEnabled());
return controllerWorkloadInfo;
}