in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [196:238]
private void updateTopicList() {
logger.info("Update topicList");
topicList.clear();
partitionLeader.clear();
// update topicList
topicList = helixMirrorMakerManager.getTopicLists();
logger.debug("TopicList: {}", topicList);
Set<String> topicSet = new HashSet<>(topicList);
// update partitionLeader
for (String broker : srcBrokerList) {
try {
SimpleConsumer consumer = getSimpleConsumer(broker);
TopicMetadataRequest req = new TopicMetadataRequest(topicList);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata tmd : metaData) {
for (PartitionMetadata pmd : tmd.partitionsMetadata()) {
TopicAndPartition topicAndPartition = new TopicAndPartition(tmd.topic(),
pmd.partitionId());
if (topicSet.contains(tmd.topic())) {
partitionLeader.put(topicAndPartition, pmd.leader());
}
}
}
Iterator<Entry<TopicAndPartition, TopicPartitionLag>> iter = noProgressMap.entrySet()
.iterator();
while (iter.hasNext()) {
TopicAndPartition tp = iter.next().getKey();
if (!topicSet.contains(tp.topic())) {
iter.remove();
logger.info("Remove non exist topic {} from noProgressMap", tp);
}
}
break;
} catch (Exception e) {
logger.warn("Got exception to get metadata from broker=" + broker, e);
}
}
logger.debug("partitionLeader: {}", partitionLeader);
}