in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/core/KafkaBrokerTopicObserver.java [191:231]
private void refreshCache() {
Context context = _refreshLatency.time();
Set<String> servingTopics;
try {
servingTopics = new HashSet<>(_zkClient.getChildren(KAFKA_TOPICS_PATH));
servingTopics.removeAll(KAFKA_INNER_TOPICS);
} catch (Exception e) {
LOGGER.error("Failed to get topics from kafka zk: {}", e);
_refreshFailureCounter.inc(1);
return;
}
scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>> partitionAssignmentForTopics =
_zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(ImmutableList.copyOf(servingTopics)));
synchronized (_lock) {
LOGGER.info("updating topic cache map for {} new topics", servingTopics.size());
for (String existedTopic : getAllTopics()) {
if (!servingTopics.contains(existedTopic)) {
_topicPartitionInfoMap.remove(existedTopic);
}
}
for (String topic : servingTopics) {
try {
scala.collection.Map<Object, Seq<Object>> partitionsMap =
partitionAssignmentForTopics.get(topic).get();
TopicPartition tp = new TopicPartition(topic, partitionsMap.size());
_topicPartitionInfoMap.put(topic, tp);
} catch (Exception e) {
LOGGER.warn("Failed to get topicPartition info for {} from kafka zk: {}", topic, e);
}
}
LOGGER.info("Now serving {} topics in Kafka cluster: {}", _topicPartitionInfoMap.size(),
_kakfaClusterName);
_kafkaTopicsCounter.inc(_topicPartitionInfoMap.size() - _kafkaTopicsCounter.getCount());
_lastRefreshTime.set(System.currentTimeMillis());
}
context.stop();
}