private ControllerWorkloadInfo calculateWorkloadInfo()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/AdminRestletResource.java [92:132]


  private ControllerWorkloadInfo calculateWorkloadInfo() {
    ControllerWorkloadInfo controllerWorkloadInfo = new ControllerWorkloadInfo();
    PriorityQueue<InstanceTopicPartitionHolder> itphList = _helixMirrorMakerManager.getCurrentServingInstance();
    int numberOfLaggingWorkers = 0;
    boolean workloadRetrieverInitialized = _helixMirrorMakerManager.getWorkloadInfoRetriever().isInitialized();
    while (!itphList.isEmpty()) {
      InstanceTopicPartitionHolder itph = itphList.poll();
      WorkerWorkloadInfo workerWorkload = new WorkerWorkloadInfo();
      Set<TopicPartition> topicPartitionSet = itph.getServingTopicPartitionSet();
      workerWorkload.setWorkerId(itph.getInstanceName());
      workerWorkload.setNumberOfTopicPartitions(topicPartitionSet.size());
      for (TopicPartition topicPartition : topicPartitionSet) {
        if (workloadRetrieverInitialized) {
          TopicWorkload topicWorkload = _helixMirrorMakerManager.getWorkloadInfoRetriever().topicWorkload(topicPartition.getTopic());
          TopicWorkload partitionWorkLoad = new TopicWorkload(topicWorkload.getBytesPerSecondPerPartition(), topicWorkload.getMsgsPerSecondPerPartition(), 1);
          workerWorkload.getTotalWorkload().add(partitionWorkLoad);
        } else {
          workerWorkload.setTotalWorkload(null);
        }

        TopicPartitionLag tpl = _helixMirrorMakerManager.calculateLagTime(topicPartition);
        if (tpl != null) {
          workerWorkload.getLaggingTopicPartition().add(tpl);
        }
      }
      if (workerWorkload.getTotalWorkload() != null) {
        controllerWorkloadInfo.getTopicWorkload().add(workerWorkload.getTotalWorkload());
        controllerWorkloadInfo.getWorkerInstances().add(workerWorkload);
      }
      if (workerWorkload.getLaggingTopicPartition().size() != 0) {
        numberOfLaggingWorkers++;
      }
    }
    controllerWorkloadInfo.setNumOfLaggingWorkers(numberOfLaggingWorkers);
    int numberOfExpectedWorkers = (int) Math.round(controllerWorkloadInfo.getTopicWorkload().getBytesPerSecond() / _helixMirrorMakerManager.getMaxWorkloadPerWorkerBytes()) + 1;
    int numOfDedicatedWorkers = (int) (controllerWorkloadInfo.getWorkerInstances().size() * _helixMirrorMakerManager.getMaxDedicatedInstancesRatio());
    int laggingAdditional = numberOfLaggingWorkers < numOfDedicatedWorkers ? 0 : numberOfLaggingWorkers - numOfDedicatedWorkers;
    controllerWorkloadInfo.setNumOfExpectedWorkers(numberOfExpectedWorkers + laggingAdditional);
    controllerWorkloadInfo.setAutoBalancingEnabled(_helixMirrorMakerManager.isAutoBalancingEnabled());
    return controllerWorkloadInfo;
  }