private void updateMetrics()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [479:533]


  private void updateMetrics(
      Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap,
      Map<String, InstanceTopicPartitionHolder> instanceMap) {
    // int[3]: 0: #topic, 1: #controller, 2: #worker
    Map<String, int[]> currRouteInfo = new ConcurrentHashMap<>();
    //LOGGER.info("instanceToTopicPartitionsMap: {}", instanceToTopicPartitionsMap);
    for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
      Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceName);
      for (TopicPartition tp : topicPartitions) {
        String topicName = tp.getTopic();
        if (topicName.startsWith(SEPARATOR)) {
          // route
          String route = topicName + SEPARATOR + tp.getPartition();
          String routeString = convert(route);
          currRouteInfo.putIfAbsent(routeString, new int[3]);
          currRouteInfo.get(routeString)[1]++;
          currRouteInfo.get(routeString)[2] += instanceMap.get(instanceName).getWorkerSet().size();

          // register metrics if needed
          maybeRegisterMetrics(routeString);
        } else {
          // topic
          String route = tp.getPipeline();
          String routeString = convert(route);
          currRouteInfo.putIfAbsent(routeString, new int[3]);
          currRouteInfo.get(routeString)[0]++;
        }
      }
    }
    //LOGGER.info("currRouteInfo: {}", currRouteInfo);
    //LOGGER.info("_routeToCounterMap: {}", _routeToCounterMap);

    for (String routeString : _routeToCounterMap.keySet()) {
      int topicTotalNumber = 0;
      int controllerTotalNumber = 0;
      int workerTotalNumber = 0;
      if (currRouteInfo.containsKey(routeString)) {
        topicTotalNumber = currRouteInfo.get(routeString)[0];
        controllerTotalNumber = currRouteInfo.get(routeString)[1];
        workerTotalNumber = currRouteInfo.get(routeString)[2];
      }
      Counter topicTotalNumberCounter = _routeToCounterMap.get(routeString).get(TOPIC_TOTAL_NUMBER);
      topicTotalNumberCounter.inc(topicTotalNumber - topicTotalNumberCounter.getCount());

      Counter controllerTotalNumberCounter = _routeToCounterMap.get(routeString)
          .get(CONTROLLER_TOTAL_NUMBER);
      controllerTotalNumberCounter
          .inc(controllerTotalNumber - controllerTotalNumberCounter.getCount());

      Counter workerTotalNumberCounter = _routeToCounterMap.get(routeString)
          .get(WORKER_TOTAL_NUMBER);
      workerTotalNumberCounter.inc(workerTotalNumber - workerTotalNumberCounter.getCount());
      // LOGGER.info("update metrics for {}", routeString);
    }
  }