private void emitMetrics()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [440:532]


  private void emitMetrics(
      RebalancingWorkerTable rebalancingWorkerTable,
      List<Integer> workerNeededPerPartition,
      String pod) {
    Map<String, String> scopeTagsWithPod = new HashMap<>();
    scopeTagsWithPod.put(POD_TAG, pod);
    int usedWorkers = 0;
    for (RebalancingWorkerWithSortedJobs worker : rebalancingWorkerTable.getAllWorkers()) {
      if (worker.getNumberOfJobs() != 0) {
        usedWorkers += 1;
      }
    }
    scope.gauge(MetricNames.USED_WORKER_COUNT).update(usedWorkers);

    int totalNumberOfWorkersStillNeeded = 0;
    for (long partitionIdx : rebalancingWorkerTable.getAllPartitions()) {
      usedWorkers = 0;
      Map<String, String> partitionTags = new HashMap<>();
      partitionTags.put(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx));
      partitionTags.putAll(scopeTagsWithPod);
      DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
      List<RebalancingWorkerWithSortedJobs> allWorkersWithinPartition =
          rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx);
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.ASSIGNED_WORKER_NUMBER_IN_PARTITION)
          .update(allWorkersWithinPartition.size());

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.REQUESTED_WORKER_NUMBER_IN_PARTITION)
          .update(workerNeededPerPartition.get((int) partitionIdx));

      totalNumberOfWorkersStillNeeded +=
          workerNeededPerPartition.get((int) partitionIdx) - allWorkersWithinPartition.size();

      for (RebalancingWorkerWithSortedJobs worker : allWorkersWithinPartition) {
        if (worker.getNumberOfJobs() > 0) {
          usedWorkers += 1;
        }
        Map<String, String> workerTags = new HashMap<>();
        workerTags.put(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx));
        workerTags.put(MetricNames.WORKER_IDX, Long.toString(worker.getWorkerId()));
        workerTags.put(POD_TAG, pod);
        stats.accept(worker.getLoad());
        scope
            .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
            .tagged(workerTags)
            .gauge(MetricNames.WORKER_EXPECTED_LOAD)
            .update(worker.getLoad());
      }

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_AVG)
          .update(stats.getAverage());
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_MAX)
          .update(stats.getMax());
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_MIN)
          .update(stats.getMin());

      double standardDeviation = 0.0;
      for (RebalancingWorkerWithSortedJobs worker :
          rebalancingWorkerTable.getAllWorkersForPartition(partitionIdx)) {
        if (worker.getNumberOfJobs() == 0) {
          continue;
        }

        standardDeviation += Math.pow(worker.getLoad() - stats.getAverage(), 2);
      }

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_STD_DEVIATION)
          .update(standardDeviation / usedWorkers);
    }

    scope
        .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
        .tagged(scopeTagsWithPod)
        .gauge(MetricNames.EXTRA_WORKERS_UNFULFILLED)
        .update(totalNumberOfWorkersStillNeeded);
  }