std::vector ClusterTopologyMonitor::WaitForTopologyUpdate()

in src/failover/cluster_topology_monitor.cc [145:179]


std::vector<HostInfo> ClusterTopologyMonitor::WaitForTopologyUpdate(uint32_t timeout_ms) {
    std::vector<HostInfo> curr_hosts = topology_map_->Get(cluster_id_);
    std::vector<HostInfo> new_hosts = curr_hosts;
    {
        std::lock_guard<std::mutex> lock(request_update_topology_mutex_);
        request_update_topology_.store(true);
    }
    request_update_topology_cv_.notify_all();

    if (timeout_ms == 0) {
        LOG(INFO) << "A topology refresh was requested, but the given timeout for the request was 0ms. Returning cached hosts.";
        return curr_hosts;
    }
    std::chrono::steady_clock::time_point curr_time =
        std::chrono::steady_clock::time_point(std::chrono::high_resolution_clock::now().time_since_epoch());
    std::chrono::steady_clock::time_point end = curr_time + std::chrono::milliseconds(timeout_ms);

    std::unique_lock<std::mutex> topology_lock(topology_updated_mutex_);
    // TODO(karezche): refactor the code to compare references instead of values
    // Current implementation does not support comparing curr_hosts and new_hosts by their references.
    while (curr_time < end && curr_hosts == new_hosts) {
        LOG(INFO) << "Host reference comparison has failed, curr_hosts: " << ClusterTopologyHelper::LogTopology(curr_hosts)
                  << " new hosts: " << ClusterTopologyHelper::LogTopology(new_hosts);
        topology_updated_.wait_for(topology_lock, std::chrono::milliseconds(TOPOLOGY_UPDATE_WAIT_MS));
        new_hosts = topology_map_->Get(cluster_id_);
        curr_time = std::chrono::steady_clock::time_point(std::chrono::high_resolution_clock::now().time_since_epoch());
    }
    LOG(INFO) << "new hosts have been updated";

    if (curr_time >= end) {
        LOG(ERROR) << "Cluster Monitor topology did not update within the maximum time: " << std::to_string(timeout_ms) << "for cluster ID: " << cluster_id_;
    }

    return new_hosts;
}