in cpp/source/rocketmq/ClientImpl.cpp [436:479]
void ClientImpl::updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec || !route || route->messageQueues().empty()) {
SPDLOG_WARN("Yuck! route for {} is invalid. Cause: {}", topic, ec.message());
return;
}
{
absl::MutexLock lk(&topic_route_table_mtx_);
if (!topic_route_table_.contains(topic)) {
topic_route_table_.insert({topic, route});
SPDLOG_INFO("TopicRouteData for topic={} has changed. NONE --> {}", topic, route->debugString());
} else {
TopicRouteDataPtr cached = topic_route_table_.at(topic);
if (*cached != *route) {
topic_route_table_.insert_or_assign(topic, route);
std::string previous = cached->debugString();
SPDLOG_INFO("TopicRouteData for topic={} has changed. {} --> {}", topic, cached->debugString(),
route->debugString());
}
}
}
absl::flat_hash_set<std::string> targets;
for (const auto& message_queue : route->messageQueues()) {
targets.insert(urlOf(message_queue));
}
{
absl::MutexLock lk(&session_map_mtx_);
for (auto it = targets.begin(); it != targets.end();) {
if (session_map_.contains(*it)) {
targets.erase(it++);
} else {
++it;
}
}
}
if (!targets.empty()) {
for (const auto& target : targets) {
createSession(target, true);
}
}
}