in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java [295:323]
private synchronized void updatePerWorkerEVMetrics(
Map<String, Integer> topicPartitionMapForExternalView) {
for (String worker : topicPartitionMapForExternalView.keySet()) {
if (!_externalViewPerWorkerTopicPartitionCounter.containsKey(worker)) {
Counter workCounter = new Counter();
try {
KafkaUReplicatorMetricsReporter.get().getRegistry().register(
getExternalViewPerWorkMetricName(worker), workCounter);
} catch (Exception e) {
LOGGER.error("Error registering metrics!", e);
}
_externalViewPerWorkerTopicPartitionCounter.put(worker, workCounter);
}
Counter counter = _externalViewPerWorkerTopicPartitionCounter.get(worker);
counter.inc(topicPartitionMapForExternalView.get(worker) - counter.getCount());
}
for (String worker : _externalViewPerWorkerTopicPartitionCounter.keySet()) {
if (!topicPartitionMapForExternalView.containsKey(worker)) {
//Counter counter = _externalViewPerWorkerTopicPartitionCounter.get(worker);
//counter.dec(counter.getCount());
_externalViewPerWorkerTopicPartitionCounter.remove(getExternalViewPerWorkMetricName(worker));
try {
KafkaUReplicatorMetricsReporter.get().getRegistry().remove(getExternalViewPerWorkMetricName(worker));
} catch (Exception e) {
LOGGER.warn("Got exception when removing metrics for {}", getExternalViewPerWorkMetricName(worker), e);
}
}
}
}