in src/failover/cluster_topology_monitor.cc [145:179]
std::vector<HostInfo> ClusterTopologyMonitor::WaitForTopologyUpdate(uint32_t timeout_ms) {
std::vector<HostInfo> curr_hosts = topology_map_->Get(cluster_id_);
std::vector<HostInfo> new_hosts = curr_hosts;
{
std::lock_guard<std::mutex> lock(request_update_topology_mutex_);
request_update_topology_.store(true);
}
request_update_topology_cv_.notify_all();
if (timeout_ms == 0) {
LOG(INFO) << "A topology refresh was requested, but the given timeout for the request was 0ms. Returning cached hosts.";
return curr_hosts;
}
std::chrono::steady_clock::time_point curr_time =
std::chrono::steady_clock::time_point(std::chrono::high_resolution_clock::now().time_since_epoch());
std::chrono::steady_clock::time_point end = curr_time + std::chrono::milliseconds(timeout_ms);
std::unique_lock<std::mutex> topology_lock(topology_updated_mutex_);
// TODO(karezche): refactor the code to compare references instead of values
// Current implementation does not support comparing curr_hosts and new_hosts by their references.
while (curr_time < end && curr_hosts == new_hosts) {
LOG(INFO) << "Host reference comparison has failed, curr_hosts: " << ClusterTopologyHelper::LogTopology(curr_hosts)
<< " new hosts: " << ClusterTopologyHelper::LogTopology(new_hosts);
topology_updated_.wait_for(topology_lock, std::chrono::milliseconds(TOPOLOGY_UPDATE_WAIT_MS));
new_hosts = topology_map_->Get(cluster_id_);
curr_time = std::chrono::steady_clock::time_point(std::chrono::high_resolution_clock::now().time_since_epoch());
}
LOG(INFO) << "new hosts have been updated";
if (curr_time >= end) {
LOG(ERROR) << "Cluster Monitor topology did not update within the maximum time: " << std::to_string(timeout_ms) << "for cluster ID: " << cluster_id_;
}
return new_hosts;
}