private void refreshCache()

in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/core/KafkaBrokerTopicObserver.java [191:231]


  private void refreshCache() {
    Context context = _refreshLatency.time();

    Set<String> servingTopics;
    try {
      servingTopics = new HashSet<>(_zkClient.getChildren(KAFKA_TOPICS_PATH));
      servingTopics.removeAll(KAFKA_INNER_TOPICS);
    } catch (Exception e) {
      LOGGER.error("Failed to get topics from kafka zk: {}", e);
      _refreshFailureCounter.inc(1);
      return;
    }
    scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>> partitionAssignmentForTopics =
        _zkUtils.getPartitionAssignmentForTopics(
            JavaConversions.asScalaBuffer(ImmutableList.copyOf(servingTopics)));

    synchronized (_lock) {
      LOGGER.info("updating topic cache map for {} new topics", servingTopics.size());
      for (String existedTopic : getAllTopics()) {
        if (!servingTopics.contains(existedTopic)) {
          _topicPartitionInfoMap.remove(existedTopic);
        }
      }

      for (String topic : servingTopics) {
        try {
          scala.collection.Map<Object, Seq<Object>> partitionsMap =
              partitionAssignmentForTopics.get(topic).get();
          TopicPartition tp = new TopicPartition(topic, partitionsMap.size());
          _topicPartitionInfoMap.put(topic, tp);
        } catch (Exception e) {
          LOGGER.warn("Failed to get topicPartition info for {} from kafka zk: {}", topic, e);
        }
      }
      LOGGER.info("Now serving {} topics in Kafka cluster: {}", _topicPartitionInfoMap.size(),
          _kakfaClusterName);
      _kafkaTopicsCounter.inc(_topicPartitionInfoMap.size() - _kafkaTopicsCounter.getCount());
      _lastRefreshTime.set(System.currentTimeMillis());
    }
    context.stop();
  }