in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [384:419]
private synchronized void updateOffsetMetrics() {
MetricRegistry metricRegistry = KafkaUReplicatorMetricsReporter.get().getRegistry();
@SuppressWarnings("rawtypes")
Map<String, Gauge> gauges = metricRegistry.getGauges();
for (final TopicAndPartition topicPartition : topicPartitionToOffsetMap.keySet()) {
String metricName = getOffsetLagName(topicPartition);
if (!gauges.containsKey(metricName)) {
Gauge<Long> gauge = new Gauge<Long>() {
@Override
public Long getValue() {
TopicPartitionLag lag = topicPartitionToOffsetMap.get(topicPartition);
if (lag == null || lag.getLatestOffset() <= 0 || lag.getCommitOffset() <= 0
|| lag.getLatestOffset() <= lag.getCommitOffset()) {
return 0L;
}
return lag.getLatestOffset() - lag.getCommitOffset();
}
};
try {
metricRegistry.register(metricName, gauge);
} catch (Exception e) {
logger.error("Error while registering lag metric " + metricName, e);
}
}
}
try {
List<TopicAndPartition> noProgressPartitions = getNoProgessTopicPartitions();
numNoProgressTopicPartitions.set(noProgressPartitions.size());
if (!noProgressPartitions.isEmpty()) {
logger.warn("Topic partitions with no progress: " + noProgressPartitions);
}
} catch (Exception e) {
logger.warn("Got exception when getNoProgessTopicPartitions", e);
}
}