in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java [262:293]
private synchronized void updatePerWorkerISMetrics(
Map<String, Integer> topicPartitionMapForIdealState) {
for (String worker : topicPartitionMapForIdealState.keySet()) {
if (!_idealStatePerWorkerTopicPartitionCounter.containsKey(worker)) {
Counter workCounter = new Counter();
try {
if (KafkaUReplicatorMetricsReporter.get().getRegistry() != null) {
KafkaUReplicatorMetricsReporter.get().getRegistry().register(getIdealStatePerWorkMetricName(worker), workCounter);
}
} catch (Exception e) {
LOGGER.error("Error registering metrics!", e);
}
_idealStatePerWorkerTopicPartitionCounter.put(worker, workCounter);
}
Counter counter = _idealStatePerWorkerTopicPartitionCounter.get(worker);
counter.inc(topicPartitionMapForIdealState.get(worker) - counter.getCount());
}
for (String worker : _idealStatePerWorkerTopicPartitionCounter.keySet()) {
if (!topicPartitionMapForIdealState.containsKey(worker)) {
//Counter counter = _idealStatePerWorkerTopicPartitionCounter.get(worker);
//counter.dec(counter.getCount());
_idealStatePerWorkerTopicPartitionCounter.remove(worker);
try {
if (KafkaUReplicatorMetricsReporter.get().getRegistry() != null) {
KafkaUReplicatorMetricsReporter.get().getRegistry().remove(getIdealStatePerWorkMetricName(worker));
}
} catch (Exception e) {
LOGGER.warn("Got exception when removing metrics for {}", getIdealStatePerWorkMetricName(worker), e);
}
}
}
}