private void updateTopicList()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [196:238]


  private void updateTopicList() {
    logger.info("Update topicList");
    topicList.clear();
    partitionLeader.clear();

    // update topicList
    topicList = helixMirrorMakerManager.getTopicLists();
    logger.debug("TopicList: {}", topicList);
    Set<String> topicSet = new HashSet<>(topicList);

    // update partitionLeader
    for (String broker : srcBrokerList) {
      try {
        SimpleConsumer consumer = getSimpleConsumer(broker);
        TopicMetadataRequest req = new TopicMetadataRequest(topicList);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
        List<TopicMetadata> metaData = resp.topicsMetadata();

        for (TopicMetadata tmd : metaData) {
          for (PartitionMetadata pmd : tmd.partitionsMetadata()) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(tmd.topic(),
                pmd.partitionId());
            if (topicSet.contains(tmd.topic())) {
              partitionLeader.put(topicAndPartition, pmd.leader());
            }
          }
        }
        Iterator<Entry<TopicAndPartition, TopicPartitionLag>> iter = noProgressMap.entrySet()
            .iterator();
        while (iter.hasNext()) {
          TopicAndPartition tp = iter.next().getKey();
          if (!topicSet.contains(tp.topic())) {
            iter.remove();
            logger.info("Remove non exist topic {} from noProgressMap", tp);
          }
        }
        break;
      } catch (Exception e) {
        logger.warn("Got exception to get metadata from broker=" + broker, e);
      }
    }
    logger.debug("partitionLeader: {}", partitionLeader);
  }