void ClientImpl::updateRouteCache()

in cpp/source/rocketmq/ClientImpl.cpp [435:478]


void ClientImpl::updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) {
  if (ec || !route || route->messageQueues().empty()) {
    SPDLOG_WARN("Yuck! route for {} is invalid. Cause: {}", topic, ec.message());
    return;
  }

  {
    absl::MutexLock lk(&topic_route_table_mtx_);
    if (!topic_route_table_.contains(topic)) {
      topic_route_table_.insert({topic, route});
      SPDLOG_INFO("TopicRouteData for topic={} has changed. NONE --> {}", topic, route->debugString());
    } else {
      TopicRouteDataPtr cached = topic_route_table_.at(topic);
      if (*cached != *route) {
        topic_route_table_.insert_or_assign(topic, route);
        std::string previous = cached->debugString();
        SPDLOG_INFO("TopicRouteData for topic={} has changed. {} --> {}", topic, cached->debugString(),
                    route->debugString());
      }
    }
  }

  absl::flat_hash_set<std::string> targets;
  for (const auto& message_queue : route->messageQueues()) {
    targets.insert(urlOf(message_queue));
  }

  {
    absl::MutexLock lk(&session_map_mtx_);
    for (auto it = targets.begin(); it != targets.end();) {
      if (session_map_.contains(*it)) {
        targets.erase(it++);
      } else {
        ++it;
      }
    }
  }

  if (!targets.empty()) {
    for (const auto& target : targets) {
      createSession(target, true);
    }
  }
}