bool FailoverService::failover_reader()

in src/failover/failover_service.cc [196:308]


bool FailoverService::failover_reader(SQLHDBC hdbc) {
    auto get_current = [] {
        return std::chrono::steady_clock::time_point(std::chrono::high_resolution_clock::now().time_since_epoch());
    };
    auto curr_time = get_current();
    auto end = curr_time + std::chrono::milliseconds(failover_timeout_);

    LOG(INFO) << "Starting reader failover procedure.";
    // When we pass a timeout of 0, we inform the plugin service that it should update its topology without waiting
    // for it to get updated, since we do not need updated topology to establish a reader connection.
    topology_monitor_->ForceRefresh(false, 0);

    // The roles in this list might not be accurate, depending on whether the new topology has become available yet.
    std::vector<HostInfo> hosts = topology_map_->Get(cluster_id_);
    if (hosts.empty()) {
        LOG(INFO) << "No topology available.";
        return false;
    }

    std::vector<HostInfo> reader_candidates;
    HostInfo original_writer;

    for (const auto& host : hosts) {
        if (host.IsHostWriter()) {
            original_writer = host;
        } else {
            reader_candidates.push_back(host);
        }
    }

    std::unordered_map<std::string, std::string> properties;
    RoundRobinHostSelector::SetRoundRobinWeight(reader_candidates, properties);

    std::string host_string;
    bool is_original_writer_still_writer = false;
    do {
        std::vector<HostInfo> remaining_readers(reader_candidates);
        while (!remaining_readers.empty() && (curr_time = get_current()) < end) {
            LOG(INFO) << "Failover for ClusterId: " << cluster_id_ << ". Remaining Hosts: " << ClusterTopologyHelper::LogTopology(remaining_readers);
            HostInfo host;
            try {
                host = host_selector_->GetHost(remaining_readers, false, properties);
                host_string = host.GetHost();
                LOG(INFO) << "[Failover Service] Selected Host: " << host_string;
            } catch (const std::exception& e) {
                LOG(INFO) << "[Failover Service] no hosts in topology for: " << cluster_id_;
                return false;
            }
            bool is_connected = connect_to_host(hdbc, host_string);
            if (!is_connected) {
                LOG(INFO) << "[Failover Service] unable to connect to: " << host_string;
                remove_candidate(host_string, remaining_readers);
                continue;
            }

            bool is_reader = false;
            if (odbc_helper_->CheckConnection(hdbc)) {
                is_reader = is_connected_to_reader(hdbc);
                if (is_reader || (this->failover_mode_ != STRICT_READER)) {
                    LOG(INFO) << "[Failover Service] connected to a new reader for: " << host_string;
                    curr_host_ = host;
                    return true;
                }
                LOG(INFO) << "[Failover Service] Strict Reader Mode, not connected to a reader: " << host_string;
            }
            remove_candidate(host_string, remaining_readers);
            SQLDisconnect(hdbc);
            LOG(INFO) << "[Failover Service] Cleaned up first connection, required a strict reader: " << host_string << ", " << hdbc;

            if (!is_reader) {
                // The reader candidate is actually a writer, which is not valid when failoverMode is STRICT_READER.
                // We will remove it from the list of reader candidates to avoid retrying it in future iterations.
                remove_candidate(host_string, reader_candidates);
            }
        }

        // We were not able to connect to any of the original readers. We will try connecting to the original writer,
        // which may have been demoted to a reader.

        if (get_current() > end) {
            // Timed out.
            continue;
        }

        if (this->failover_mode_ == STRICT_READER && is_original_writer_still_writer) {
            // The original writer has been verified, so it is not valid when in STRICT_READER mode.
            continue;
        }

        // Try the original writer, which may have been demoted to a reader.
        host_string = original_writer.GetHost();
        bool is_connected = connect_to_host(hdbc, host_string);
        if (is_connected) {
            if (!odbc_helper_->CheckConnection(hdbc)) {
                SQLDisconnect(hdbc);
                continue;
            }
            if (is_connected_to_reader(hdbc) || failover_mode_ != STRICT_READER) {
                LOG(INFO) << "[Failover Service] reader failover connected to writer instance for: " << host_string;
                curr_host_ = original_writer;
                return true;
            }
        } else {
            LOG(INFO) << "[Failover Service] Failed to connect to host: " << original_writer;
        }

    } while (get_current() < end);

    // Timed out.
    SQLDisconnect(hdbc);
    LOG(INFO) << "[Failover Service] The reader failover process was not able to establish a connection before timing out.";
    return false;
}