in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/SourceKafkaClusterValidationManager.java [117:159]
public String validateSourceKafkaCluster() {
Set<String> notExistedTopics = new HashSet<String>();
Map<String, Integer> misMatchedPartitionNumberTopics = new HashMap<String, Integer>();
int numMismatchedTopicPartitions = 0;
for (String topic : _helixMirrorMakerManager.getTopicLists()) {
TopicPartition tp = _sourceKafkaTopicObserver.getTopicPartition(topic);
if (tp == null) {
LOGGER.warn("Topic {} is not in source kafka broker!", topic);
notExistedTopics.add(topic);
} else {
int numPartitionsInMirrorMaker =
_helixMirrorMakerManager.getIdealStateForTopic(topic).getNumPartitions();
if (numPartitionsInMirrorMaker != tp.getPartition()) {
int mismatchedPartitions = Math.abs(numPartitionsInMirrorMaker - tp.getPartition());
if (_enableAutoTopicExpansion && (tp.getPartition() > numPartitionsInMirrorMaker)) {
// Only do topic expansion
LOGGER.warn(
"Trying to expand topic {} from {} partitions in mirror maker to {} from source kafka broker!",
topic, numPartitionsInMirrorMaker, tp.getPartition());
_numAutoExpandedTopics.inc();
_numAutoExpandedTopicPartitions.inc(mismatchedPartitions);
_helixMirrorMakerManager.expandTopicInMirrorMaker(tp);
} else {
numMismatchedTopicPartitions += mismatchedPartitions;
misMatchedPartitionNumberTopics.put(topic, mismatchedPartitions);
LOGGER.warn(
"Number of partitions not matched for topic {} between mirrormaker:{} and source kafka broker: {}!",
topic, numPartitionsInMirrorMaker, tp.getPartition());
}
}
}
}
JSONObject mismatchedTopicPartitionsJson =
constructMismatchedTopicPartitionsJson(misMatchedPartitionNumberTopics);
JSONObject validationResultJson = constructValidationResultJson(notExistedTopics.size(),
misMatchedPartitionNumberTopics.size(),
numMismatchedTopicPartitions, mismatchedTopicPartitionsJson);
if (_helixMirrorMakerManager.isLeader()) {
updateMetrics(notExistedTopics.size(), misMatchedPartitionNumberTopics.size(),
numMismatchedTopicPartitions, misMatchedPartitionNumberTopics);
}
return validationResultJson.toJSONString();
}