in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java [155:225]
public synchronized String validateExternalView() {
try {
Map<String, Integer> topicPartitionMapForIdealState =
new HashMap<String, Integer>();
Map<String, Integer> topicPartitionMapForExternalView =
new HashMap<String, Integer>();
int numOnlineTopicPartitions = 0;
int numOfflineTopicPartitions = 0;
int numErrorTopicPartitions = 0;
int numTopicPartitions = 0;
int numServingTopics = 0;
int numErrorTopics = 0;
for (String topicName : _helixMirrorMakerManager.getTopicLists()) {
numServingTopics++;
IdealState idealStateForTopic =
_helixMirrorMakerManager.getIdealStateForTopic(topicName);
ExternalView externalViewForTopic =
_helixMirrorMakerManager.getExternalViewForTopic(topicName);
numTopicPartitions += idealStateForTopic.getNumPartitions();
if (idealStateForTopic.getNumPartitions() != externalViewForTopic.getPartitionSet()
.size()) {
numErrorTopics++;
LOGGER.error(
"For topic:{}, number of partitions for IdealState: {} doesn't match ExternalView: {}",
topicName, idealStateForTopic, externalViewForTopic);
}
// IdealState Counting
updateIdealstateInfo(topicPartitionMapForIdealState, idealStateForTopic);
// ExternalView Counting
for (String partition : externalViewForTopic.getPartitionSet()) {
Map<String, String> stateMap = externalViewForTopic.getStateMap(partition);
for (String instance : stateMap.keySet()) {
String state = stateMap.get(instance);
if (!topicPartitionMapForExternalView.containsKey(instance)) {
topicPartitionMapForExternalView.put(instance, 1);
} else {
topicPartitionMapForExternalView.put(instance,
topicPartitionMapForExternalView.get(instance) + 1);
}
if ("ONLINE".equalsIgnoreCase(state)) {
numOnlineTopicPartitions++;
} else if ("OFFLINE".equalsIgnoreCase(state)) {
numOfflineTopicPartitions++;
} else if ("ERROR".equalsIgnoreCase(state)) {
numErrorTopicPartitions++;
}
}
}
}
if (_helixMirrorMakerManager.isLeader()) {
updateMetrics(numOnlineTopicPartitions, numOfflineTopicPartitions, numErrorTopicPartitions,
numTopicPartitions, numServingTopics, numErrorTopics);
updatePerWorkerISMetrics(topicPartitionMapForIdealState);
updatePerWorkerEVMetrics(topicPartitionMapForExternalView);
}
JSONObject perWorkerISCounterJson =
constructPerWorkerISCounterJson(topicPartitionMapForIdealState);
JSONObject perWorkerEVCounterJson =
constructPerWorkerEVCounterJson(topicPartitionMapForExternalView);
JSONObject validationResultJson = constructValidationResultJson(numOnlineTopicPartitions,
numOfflineTopicPartitions, numErrorTopicPartitions, numTopicPartitions,
numServingTopics, numErrorTopics, perWorkerISCounterJson, perWorkerEVCounterJson);
return validationResultJson.toJSONString();
} catch (Exception e) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("excpetion", e);
return jsonObject.toJSONString();
}
}