in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [479:533]
private void updateMetrics(
Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap,
Map<String, InstanceTopicPartitionHolder> instanceMap) {
// int[3]: 0: #topic, 1: #controller, 2: #worker
Map<String, int[]> currRouteInfo = new ConcurrentHashMap<>();
//LOGGER.info("instanceToTopicPartitionsMap: {}", instanceToTopicPartitionsMap);
for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceName);
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (topicName.startsWith(SEPARATOR)) {
// route
String route = topicName + SEPARATOR + tp.getPartition();
String routeString = convert(route);
currRouteInfo.putIfAbsent(routeString, new int[3]);
currRouteInfo.get(routeString)[1]++;
currRouteInfo.get(routeString)[2] += instanceMap.get(instanceName).getWorkerSet().size();
// register metrics if needed
maybeRegisterMetrics(routeString);
} else {
// topic
String route = tp.getPipeline();
String routeString = convert(route);
currRouteInfo.putIfAbsent(routeString, new int[3]);
currRouteInfo.get(routeString)[0]++;
}
}
}
//LOGGER.info("currRouteInfo: {}", currRouteInfo);
//LOGGER.info("_routeToCounterMap: {}", _routeToCounterMap);
for (String routeString : _routeToCounterMap.keySet()) {
int topicTotalNumber = 0;
int controllerTotalNumber = 0;
int workerTotalNumber = 0;
if (currRouteInfo.containsKey(routeString)) {
topicTotalNumber = currRouteInfo.get(routeString)[0];
controllerTotalNumber = currRouteInfo.get(routeString)[1];
workerTotalNumber = currRouteInfo.get(routeString)[2];
}
Counter topicTotalNumberCounter = _routeToCounterMap.get(routeString).get(TOPIC_TOTAL_NUMBER);
topicTotalNumberCounter.inc(topicTotalNumber - topicTotalNumberCounter.getCount());
Counter controllerTotalNumberCounter = _routeToCounterMap.get(routeString)
.get(CONTROLLER_TOTAL_NUMBER);
controllerTotalNumberCounter
.inc(controllerTotalNumber - controllerTotalNumberCounter.getCount());
Counter workerTotalNumberCounter = _routeToCounterMap.get(routeString)
.get(WORKER_TOTAL_NUMBER);
workerTotalNumberCounter.inc(workerTotalNumber - workerTotalNumberCounter.getCount());
// LOGGER.info("update metrics for {}", routeString);
}
}