public String validateSourceKafkaCluster()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/SourceKafkaClusterValidationManager.java [117:159]


  public String validateSourceKafkaCluster() {
    Set<String> notExistedTopics = new HashSet<String>();
    Map<String, Integer> misMatchedPartitionNumberTopics = new HashMap<String, Integer>();
    int numMismatchedTopicPartitions = 0;
    for (String topic : _helixMirrorMakerManager.getTopicLists()) {
      TopicPartition tp = _sourceKafkaTopicObserver.getTopicPartition(topic);
      if (tp == null) {
        LOGGER.warn("Topic {} is not in source kafka broker!", topic);
        notExistedTopics.add(topic);
      } else {
        int numPartitionsInMirrorMaker =
            _helixMirrorMakerManager.getIdealStateForTopic(topic).getNumPartitions();
        if (numPartitionsInMirrorMaker != tp.getPartition()) {
          int mismatchedPartitions = Math.abs(numPartitionsInMirrorMaker - tp.getPartition());
          if (_enableAutoTopicExpansion && (tp.getPartition() > numPartitionsInMirrorMaker)) {
            // Only do topic expansion
            LOGGER.warn(
                "Trying to expand topic {} from {} partitions in mirror maker to {} from source kafka broker!",
                topic, numPartitionsInMirrorMaker, tp.getPartition());
            _numAutoExpandedTopics.inc();
            _numAutoExpandedTopicPartitions.inc(mismatchedPartitions);
            _helixMirrorMakerManager.expandTopicInMirrorMaker(tp);
          } else {
            numMismatchedTopicPartitions += mismatchedPartitions;
            misMatchedPartitionNumberTopics.put(topic, mismatchedPartitions);
            LOGGER.warn(
                "Number of partitions not matched for topic {} between mirrormaker:{} and source kafka broker: {}!",
                topic, numPartitionsInMirrorMaker, tp.getPartition());
          }
        }
      }
    }
    JSONObject mismatchedTopicPartitionsJson =
        constructMismatchedTopicPartitionsJson(misMatchedPartitionNumberTopics);
    JSONObject validationResultJson = constructValidationResultJson(notExistedTopics.size(),
        misMatchedPartitionNumberTopics.size(),
        numMismatchedTopicPartitions, mismatchedTopicPartitionsJson);
    if (_helixMirrorMakerManager.isLeader()) {
      updateMetrics(notExistedTopics.size(), misMatchedPartitionNumberTopics.size(),
          numMismatchedTopicPartitions, misMatchedPartitionNumberTopics);
    }
    return validationResultJson.toJSONString();
  }