private synchronized void updateOffsetMetrics()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [384:419]


  private synchronized void updateOffsetMetrics() {
    MetricRegistry metricRegistry = KafkaUReplicatorMetricsReporter.get().getRegistry();
    @SuppressWarnings("rawtypes")
    Map<String, Gauge> gauges = metricRegistry.getGauges();
    for (final TopicAndPartition topicPartition : topicPartitionToOffsetMap.keySet()) {
      String metricName = getOffsetLagName(topicPartition);
      if (!gauges.containsKey(metricName)) {
        Gauge<Long> gauge = new Gauge<Long>() {
          @Override
          public Long getValue() {
            TopicPartitionLag lag = topicPartitionToOffsetMap.get(topicPartition);
            if (lag == null || lag.getLatestOffset() <= 0 || lag.getCommitOffset() <= 0
                || lag.getLatestOffset() <= lag.getCommitOffset()) {
              return 0L;
            }
            return lag.getLatestOffset() - lag.getCommitOffset();
          }
        };
        try {
          metricRegistry.register(metricName, gauge);
        } catch (Exception e) {
          logger.error("Error while registering lag metric " + metricName, e);
        }
      }
    }

    try {
      List<TopicAndPartition> noProgressPartitions = getNoProgessTopicPartitions();
      numNoProgressTopicPartitions.set(noProgressPartitions.size());
      if (!noProgressPartitions.isEmpty()) {
        logger.warn("Topic partitions with no progress: " + noProgressPartitions);
      }
    } catch (Exception e) {
      logger.warn("Got exception when getNoProgessTopicPartitions", e);
    }
  }