driver/failover_handler.cc (420 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License, version 2.0 // (GPLv2), as published by the Free Software Foundation, with the // following additional permissions: // // This program is distributed with certain software that is licensed // under separate terms, as designated in a particular file or component // or in the license documentation. Without limiting your rights under // the GPLv2, the authors of this program hereby grant you an additional // permission to link the program and your derivative works with the // separately licensed software that they have included with the program. // // Without limiting the foregoing grant of rights under the GPLv2 and // additional permission as to separately licensed software, this // program is also subject to the Universal FOSS Exception, version 1.0, // a copy of which can be found along with its FAQ at // http://oss.oracle.com/licenses/universal-foss-exception. // // This program is distributed in the hope that it will be useful, but // WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU General Public License, version 2.0, for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see // http://www.gnu.org/licenses/gpl-2.0.html. /** @file failover_handler.c @brief Failover functions. */ #include <sstream> #include "driver.h" #include "mylog.h" #include "rds_utils.h" #if defined(__APPLE__) || defined(__linux__) #include <arpa/inet.h> #include <netdb.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/types.h> #endif namespace { const char* MYSQL_READONLY_QUERY = "SELECT @@innodb_read_only AS is_reader"; } // namespace FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds) : FAILOVER_HANDLER( dbc, ds, dbc ? dbc->connection_handler : nullptr, dbc ? dbc->get_topology_service() : nullptr, std::make_shared<CLUSTER_AWARE_METRICS_CONTAINER>(dbc, ds)) {} FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds, std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<CLUSTER_AWARE_METRICS_CONTAINER> metrics_container) { if (!dbc) { throw std::runtime_error("DBC cannot be null."); } if (!ds) { throw std::runtime_error("DataSource cannot be null."); } this->dbc = dbc; this->ds = ds; this->topology_service = topology_service; this->topology_service->set_refresh_rate(ds->opt_TOPOLOGY_REFRESH_RATE); this->topology_service->set_gather_metric(ds->opt_GATHER_PERF_METRICS); this->connection_handler = connection_handler; this->failover_reader_handler = std::make_shared<FAILOVER_READER_HANDLER>( this->topology_service, this->connection_handler, dbc->env->failover_thread_pool, ds->opt_FAILOVER_TIMEOUT, ds->opt_FAILOVER_READER_CONNECT_TIMEOUT, is_failover_mode(FAILOVER_MODE_STRICT_READER, ds), dbc->id, ds->opt_LOG_QUERY); this->failover_writer_handler = std::make_shared<FAILOVER_WRITER_HANDLER>( this->topology_service, this->failover_reader_handler, this->connection_handler, dbc->env->failover_thread_pool, ds->opt_FAILOVER_TIMEOUT, ds->opt_FAILOVER_TOPOLOGY_REFRESH_RATE, ds->opt_FAILOVER_WRITER_RECONNECT_INTERVAL, dbc->id, ds->opt_LOG_QUERY); this->metrics_container = metrics_container; } FAILOVER_HANDLER::~FAILOVER_HANDLER() {} SQLRETURN FAILOVER_HANDLER::init_connection() { SQLRETURN rc = connection_handler->do_connect(dbc, ds, false); if (SQL_SUCCEEDED(rc)) { metrics_container->register_invalid_initial_connection(false); } else { metrics_container->register_invalid_initial_connection(true); return rc; } bool reconnect_with_updated_timeouts = false; if (ds->opt_ENABLE_CLUSTER_FAILOVER) { this->init_cluster_info(); if (is_failover_enabled()) { // Since we can't determine whether failover should be enabled // before we connect, there is a possibility we need to reconnect // again with the correct connection settings for failover. const unsigned int connect_timeout = get_connect_timeout(ds->opt_CONNECT_TIMEOUT); const unsigned int network_timeout = get_network_timeout(ds->opt_NETWORK_TIMEOUT); reconnect_with_updated_timeouts = (connect_timeout != dbc->login_timeout || network_timeout != ds->opt_READTIMEOUT || network_timeout != ds->opt_WRITETIMEOUT); } if (!ds->opt_FAILOVER_MODE) { if (RDS_UTILS::is_rds_reader_cluster_dns(this->current_host->get_host())) { ds->opt_FAILOVER_MODE.set_remove_brackets((SQLWCHAR*)to_sqlwchar_string(FAILOVER_MODE_READER_OR_WRITER).c_str(), SQL_NTS); } else { ds->opt_FAILOVER_MODE.set_remove_brackets((SQLWCHAR*)to_sqlwchar_string(FAILOVER_MODE_STRICT_WRITER).c_str(), SQL_NTS); } } } if (should_connect_to_new_writer() || reconnect_with_updated_timeouts) { rc = reconnect(reconnect_with_updated_timeouts); } return rc; } void FAILOVER_HANDLER::init_cluster_info() { if (is_cluster_info_initialized) { return; } std::stringstream err; // Cluster-aware failover is enabled this->current_host = get_host_info_from_ds(ds); std::string main_host = this->current_host->get_host(); unsigned int main_port = this->current_host->get_port(); const char* hp = (const char*)ds->opt_HOST_PATTERN; std::string hp_str(hp ? hp : ""); const char* clid = (const char*)ds->opt_CLUSTER_ID; std::string clid_str(clid ? clid : ""); if (!hp_str.empty()) { unsigned int port = !ds->opt_PORT.is_default() ? ds->opt_PORT : MYSQL_PORT; std::vector<Srv_host_detail> host_patterns; try { host_patterns = parse_host_list(hp_str.c_str(), port); } catch (std::string&) { err << "Invalid host pattern: '" << hp_str << "' - the value could not be parsed"; MYLOG_TRACE(dbc->log_file, dbc->id, err.str().c_str()); throw std::runtime_error(err.str()); } if (host_patterns.empty()) { err << "Empty host pattern."; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } std::string host_pattern(host_patterns[0].name); unsigned int host_pattern_port = host_patterns[0].port; if (!RDS_UTILS::is_dns_pattern_valid(host_pattern)) { err << "Invalid host pattern: '" << host_pattern << "' - the host pattern must contain a '?' character as a " "placeholder for the DB instance identifiers of the cluster " "instances"; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } auto host_template = std::make_shared<HOST_INFO>(host_pattern, host_pattern_port); topology_service->set_cluster_instance_template(host_template); m_is_rds = RDS_UTILS::is_rds_dns(host_pattern); MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] m_is_rds=%s", m_is_rds ? "true" : "false"); m_is_rds_proxy = RDS_UTILS::is_rds_proxy_dns(host_pattern); MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] m_is_rds_proxy=%s", m_is_rds_proxy ? "true" : "false"); m_is_rds_custom_cluster = RDS_UTILS::is_rds_custom_cluster_dns(host_pattern); if (m_is_rds_proxy) { err << "RDS Proxy url can't be used as an instance pattern."; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } if (m_is_rds_custom_cluster) { err << "RDS Custom Cluster endpoint can't be used as an instance pattern."; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } if (!clid_str.empty()) { set_cluster_id(clid_str); } else if (m_is_rds) { // If it's a cluster endpoint, or a reader cluster endpoint, then // let's use as cluster identification std::string cluster_rds_host = RDS_UTILS::get_rds_cluster_host_url(host_pattern); if (!cluster_rds_host.empty()) { set_cluster_id(cluster_rds_host, host_pattern_port); } } initialize_topology(); } else if (RDS_UTILS::is_ipv4(main_host) || RDS_UTILS::is_ipv6(main_host)) { // TODO: do we need to setup host template in this case? // HOST_INFO* host_template = new HOST_INFO(); // host_template->host.assign(main_host); // host_template->port = main_port; // ts->setClusterInstanceTemplate(host_template); if (!clid_str.empty()) { set_cluster_id(clid_str); } initialize_topology(); if (m_is_cluster_topology_available) { err << "Host Pattern configuration setting is required when IP " "address is used to connect to a cluster that provides topology " "information. If you would instead like to connect without " "failover functionality, set the 'Disable Cluster Failover' " "configuration property to true."; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } m_is_rds = false; // actually we don't know m_is_rds_proxy = false; // actually we don't know } else { m_is_rds = RDS_UTILS::is_rds_dns(main_host); MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] m_is_rds=%s", m_is_rds ? "true" : "false"); m_is_rds_proxy = RDS_UTILS::is_rds_proxy_dns(main_host); MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] m_is_rds_proxy=%s", m_is_rds_proxy ? "true" : "false"); if (!m_is_rds) { // it's not RDS, maybe custom domain (CNAME) auto host_template = std::make_shared<HOST_INFO>(main_host, main_port); topology_service->set_cluster_instance_template(host_template); if (!clid_str.empty()) { set_cluster_id(clid_str); } initialize_topology(); if (m_is_cluster_topology_available) { err << "The provided host appears to be a custom domain. The " "driver requires the Host Pattern configuration setting " "to be set for custom domains. If you would instead like " "to connect without failover functionality, set the " "'Disable Cluster Failover' configuration property to true."; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } } else { // It's RDS std::string rds_instance_host = RDS_UTILS::get_rds_instance_host_pattern(main_host); if (!rds_instance_host.empty()) { topology_service->set_cluster_instance_template( std::make_shared<HOST_INFO>(rds_instance_host, main_port)); } else { err << "The provided host does not appear to match an expected " "Aurora DNS pattern. Please set the Host Pattern " "configuration to specify the host pattern for the " "cluster you are trying to connect to."; MYLOG_DBC_TRACE(dbc, err.str().c_str()); throw std::runtime_error(err.str()); } if (!clid_str.empty()) { set_cluster_id(clid_str); } else if (m_is_rds_proxy) { // Each proxy is associated with a single cluster so it's safe // to use RDS Proxy Url as cluster identification set_cluster_id(main_host, main_port); } else { // If it's cluster endpoint or reader cluster endpoint, // then let's use as cluster identification std::string cluster_rds_host = RDS_UTILS::get_rds_cluster_host_url(main_host); if (!cluster_rds_host.empty()) { set_cluster_id(cluster_rds_host, main_port); } else { // Main host is an instance endpoint set_cluster_id(main_host, main_port); } } initialize_topology(); } } is_cluster_info_initialized = true; } bool FAILOVER_HANDLER::should_connect_to_new_writer() { auto host = (const char*)ds->opt_SERVER; if (host == nullptr || host == "") { return false; } if (!RDS_UTILS::is_rds_writer_cluster_dns(host)) { return false; } std::string host_ip = host_to_IP(host); if (host_ip == "") { return false; } this->init_cluster_info(); // We need to force refresh the topology if we are connected to a read only instance. auto topology = topology_service->get_topology(dbc->connection_proxy, is_read_only()); std::shared_ptr<HOST_INFO> writer; try { writer = topology->get_writer(); } catch (std::runtime_error) { return false; } std::string writer_host = writer->get_host(); if (RDS_UTILS::is_rds_cluster_dns(writer_host.c_str())) { return false; } std::string writer_host_ip = host_to_IP(writer_host); if (writer_host_ip == "" || writer_host_ip == host_ip) { return false; } // DNS must have resolved the cluster endpoint to a wrong writer // so we should reconnect to a proper writer node. const sqlwchar_string writer_host_wstr = to_sqlwchar_string(writer_host); ds->opt_SERVER.set_remove_brackets((SQLWCHAR*)writer_host_wstr.c_str(), writer_host_wstr.size()); return true; } void FAILOVER_HANDLER::set_cluster_id(std::string host, int port) { set_cluster_id(host + ":" + std::to_string(port)); } void FAILOVER_HANDLER::set_cluster_id(std::string cid) { this->cluster_id = cid; topology_service->set_cluster_id(this->cluster_id); metrics_container->set_cluster_id(this->cluster_id); } bool FAILOVER_HANDLER::is_read_only() { bool read_only = false; if (dbc->connection_proxy->query(MYSQL_READONLY_QUERY) == 0) { auto result = dbc->connection_proxy->store_result(); MYSQL_ROW row; if (row = dbc->connection_proxy->fetch_row(result)) { read_only = (strcmp(row[0], "1") == 0); } dbc->connection_proxy->free_result(result); } return read_only; } std::string FAILOVER_HANDLER::host_to_IP(std::string host) { int status; struct addrinfo hints; struct addrinfo* servinfo; struct addrinfo* p; char ipstr[INET_ADDRSTRLEN]; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_INET; //IPv4 hints.ai_socktype = SOCK_STREAM; if ((status = getaddrinfo(host.c_str(), NULL, &hints, &servinfo)) != 0) { return ""; } for (p = servinfo; p != NULL; p = p->ai_next) { void* addr; struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr; addr = &(ipv4->sin_addr); inet_ntop(p->ai_family, addr, ipstr, sizeof(ipstr)); } freeaddrinfo(servinfo); return std::string(ipstr); } bool FAILOVER_HANDLER::is_failover_enabled() { return (dbc != nullptr && ds != nullptr && ds->opt_ENABLE_CLUSTER_FAILOVER && m_is_cluster_topology_available && !m_is_rds_proxy); } bool FAILOVER_HANDLER::is_rds() { return m_is_rds; } bool FAILOVER_HANDLER::is_rds_proxy() { return m_is_rds_proxy; } bool FAILOVER_HANDLER::is_cluster_topology_available() { return m_is_cluster_topology_available; } void FAILOVER_HANDLER::initialize_topology() { current_topology = topology_service->get_topology(dbc->connection_proxy, false); if (current_topology) { m_is_cluster_topology_available = current_topology->total_hosts() > 0; MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] m_is_cluster_topology_available=%s", m_is_cluster_topology_available ? "true" : "false"); MYLOG_DBC_TRACE(dbc, topology_service->log_topology(current_topology).c_str()); if (is_failover_enabled()) { this->dbc->env->failover_thread_pool.resize(current_topology->total_hosts()); } } } SQLRETURN FAILOVER_HANDLER::reconnect(bool failover_enabled) { if (dbc->connection_proxy != nullptr && dbc->connection_proxy->is_connected()) { dbc->close(); } return connection_handler->do_connect(dbc, ds, failover_enabled); } // return true if failover is triggered, false if not triggered bool FAILOVER_HANDLER::trigger_failover_if_needed(const char* error_code, const char*& new_error_code, const char*& error_msg) { new_error_code = error_code; std::string ec(error_code ? error_code : ""); if (!is_failover_enabled() || ec.empty()) { return false; } bool failover_success = false; // If failover happened & succeeded bool in_transaction = !autocommit_on(dbc) || dbc->transaction_open; if (ec.rfind("08", 0) == 0) { // start with "08" // disable failure detection during failover auto failure_detection_old_state = ds->opt_ENABLE_FAILURE_DETECTION; ds->opt_ENABLE_FAILURE_DETECTION = false; // invalidate current connection current_host = nullptr; // close transaction if needed long long elasped_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - invoke_start_time_ms).count(); metrics_container->register_failure_detection_time(elasped_time_ms); failover_start_time_ms = std::chrono::steady_clock::now(); if (current_topology && current_topology->total_hosts() > 1 && // Trigger reader failover if failover mode is not strict writer !is_failover_mode(FAILOVER_MODE_STRICT_WRITER, ds)) { failover_success = failover_to_reader(new_error_code, error_msg); elasped_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - failover_start_time_ms).count(); metrics_container->register_reader_failover_procedure_time(elasped_time_ms); } else { failover_success = failover_to_writer(new_error_code, error_msg); elasped_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - failover_start_time_ms).count(); metrics_container->register_writer_failover_procedure_time(elasped_time_ms); } metrics_container->register_failover_connects(failover_success); if (failover_success && in_transaction) { new_error_code = "08007"; error_msg = "Connection failure during transaction."; } ds->opt_ENABLE_FAILURE_DETECTION = failure_detection_old_state; return true; } return false; } bool FAILOVER_HANDLER::failover_to_reader(const char*& new_error_code, const char*& error_msg) { MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting reader failover procedure with filtered topology: %s", this->topology_service->log_topology(this->topology_service->get_filtered_topology(current_topology)).c_str()); auto result = failover_reader_handler->failover(this->topology_service->get_filtered_topology(current_topology)); if (result->connected) { current_host = result->new_host; connection_handler->update_connection(result->new_connection, current_host->get_host()); new_error_code = "08S02"; // Failover succeeded error code. error_msg = "The active SQL connection has changed."; MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] The active SQL connection has changed " "due to a connection failure. Please re-configure session " "state if required."); return true; } else { MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Unable to establish SQL connection to reader node."); new_error_code = "08S01"; // Failover failed error code. error_msg = "The active SQL connection was lost."; return false; } return false; } bool FAILOVER_HANDLER::failover_to_writer(const char*& new_error_code, const char*& error_msg) { MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting writer failover procedure."); auto result = failover_writer_handler->failover(current_topology); if (!result->connected) { MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Unable to establish SQL connection to writer node."); new_error_code = "08S01"; error_msg = "The active SQL connection was lost."; return false; } const auto new_topology = result->new_topology; const auto new_host = new_topology->get_writer(); if (result->is_new_host) { // connected to a new writer host; take it over current_topology = new_topology; current_host = new_host; } const auto filtered_topology = this->topology_service->get_filtered_topology(new_topology); const auto allowed_hosts = filtered_topology->get_instances(); if (std::find(allowed_hosts.begin(), allowed_hosts.end(), new_host) == allowed_hosts.end()) { new_error_code = "08S01"; // Failover failed error code. error_msg = "The active SQL connection was lost."; MYLOG_DBC_TRACE( dbc, "[FAILOVER_HANDLER] The failover process identified the new writer but the host is not in the list of allowed hosts. " "New writer host: '%s'. Allowed hosts: '%s'", new_host->get_host().c_str(), this->topology_service->log_topology(filtered_topology).c_str()); return false; } connection_handler->update_connection(result->new_connection, new_host->get_host()); new_error_code = "08S02"; // Failover succeeded error code. error_msg = "The active SQL connection has changed."; MYLOG_DBC_TRACE( dbc, "[FAILOVER_HANDLER] The active SQL connection has changed due to a " "connection failure. Please re-configure session state if required."); return true; } void FAILOVER_HANDLER::invoke_start_time() { invoke_start_time_ms = std::chrono::steady_clock::now(); } bool FAILOVER_HANDLER::is_failover_mode(const char* expected_mode, DataSource* ds) { return myodbc_strcasecmp(expected_mode, (const char*) ds->opt_FAILOVER_MODE) == 0; }