private void emitMetrics()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/RpcJobColocatingRebalancer.java [284:383]


  private void emitMetrics(List<Integer> workerNeededPerPartition) {
    int usedWorkers = 0;
    for (RebalancingWorkerWithSortedJobs worker : rebalancingCache.getAllWorkers()) {
      if (worker.getNumberOfJobs() != 0) {
        usedWorkers += 1;
      }
    }
    scope.gauge(MetricNames.USED_WORKER_COUNT).update(usedWorkers);

    int totalNumberOfWorkersStillNeeded = 0;
    for (long partitionIdx : rebalancingCache.getAllPartitions()) {
      usedWorkers = 0;
      Map<String, String> partitionTags =
          ImmutableMap.of(VIRTUAL_PARTITION_TAG, Long.toString(partitionIdx));
      DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
      List<RebalancingWorkerWithSortedJobs> allWorkersWithinPartition =
          rebalancingCache.getAllWorkersForPartition(partitionIdx);
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.ASSIGNED_WORKER_NUMBER_IN_PARTITION)
          .update(allWorkersWithinPartition.size());

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.REQUESTED_WORKER_NUMBER_IN_PARTITION)
          .update(workerNeededPerPartition.get((int) partitionIdx));

      totalNumberOfWorkersStillNeeded +=
          workerNeededPerPartition.get((int) partitionIdx) - allWorkersWithinPartition.size();

      for (RebalancingWorkerWithSortedJobs worker : allWorkersWithinPartition) {
        if (worker.getNumberOfJobs() > 0) {
          usedWorkers += 1;
        }
        Map<String, String> workerTags =
            ImmutableMap.of(
                VIRTUAL_PARTITION_TAG,
                Long.toString(partitionIdx),
                MetricNames.WORKER_IDX,
                Long.toString(worker.getWorkerId()));
        stats.accept(worker.getLoad());
        scope
            .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
            .tagged(workerTags)
            .gauge(MetricNames.WORKER_EXPECTED_LOAD)
            .update(worker.getLoad());
      }

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_AVG)
          .update(stats.getAverage());
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_MAX)
          .update(stats.getMax());
      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_MIN)
          .update(stats.getMin());

      double standardDeviation = 0.0;
      for (RebalancingWorkerWithSortedJobs worker :
          rebalancingCache.getAllWorkersForPartition(partitionIdx)) {
        if (worker.getNumberOfJobs() == 0) {
          continue;
        }

        standardDeviation += Math.pow(worker.getLoad() - stats.getAverage(), 2);
      }

      scope
          .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
          .tagged(partitionTags)
          .gauge(MetricNames.WORKER_LOAD_STD_DEVIATION)
          .update(standardDeviation / usedWorkers);
    }

    scope
        .subScope(COLOCATING_REBALANCER_SUB_SCOPE)
        .gauge(MetricNames.EXTRA_WORKERS_UNFULFILLED)
        .update(totalNumberOfWorkersStillNeeded);

    int totalRequestedNumberOfWorker =
        workerNeededPerPartition.stream().mapToInt(Integer::intValue).sum();
    totalRequestedNumberOfWorker =
        roundUpToNearestNumber(totalRequestedNumberOfWorker, TARGET_UNIT_NUMBER);

    if (shadowRun) {
      // if it's shadow, still emit metric for comparison
      scope.gauge(MetricNames.REQUESTED_WORKER_COUNT).update(totalRequestedNumberOfWorker);
    } else {
      scope.gauge(MetricNames.WORKERS_TARGET).update(totalRequestedNumberOfWorker);
    }
  }