driver/failover_writer_handler.cc (277 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License, version 2.0 // (GPLv2), as published by the Free Software Foundation, with the // following additional permissions: // // This program is distributed with certain software that is licensed // under separate terms, as designated in a particular file or component // or in the license documentation. Without limiting your rights under // the GPLv2, the authors of this program hereby grant you an additional // permission to link the program and your derivative works with the // separately licensed software that they have included with the program. // // Without limiting the foregoing grant of rights under the GPLv2 and // additional permission as to separately licensed software, this // program is also subject to the Universal FOSS Exception, version 1.0, // a copy of which can be found along with its FAQ at // http://oss.oracle.com/licenses/universal-foss-exception. // // This program is distributed in the hope that it will be useful, but // WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU General Public License, version 2.0, for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see // http://www.gnu.org/licenses/gpl-2.0.html. #include "driver.h" #include <chrono> #include <future> #include <thread> // **** FAILOVER_SYNC *************************************** // used for thread synchronization FAILOVER_SYNC::FAILOVER_SYNC(int num_tasks) : num_tasks{num_tasks} {} void FAILOVER_SYNC::increment_task() { std::lock_guard<std::mutex> lock(mutex_); num_tasks++; } void FAILOVER_SYNC::mark_as_complete(bool cancel_other_tasks) { std::lock_guard<std::mutex> lock(mutex_); if (cancel_other_tasks) { num_tasks = 0; } else { if (num_tasks <= 0) { throw std::runtime_error("Trying to cancel a failover process that is already done."); } num_tasks--; } cv.notify_one(); } void FAILOVER_SYNC::wait_and_complete(int milliseconds) { std::unique_lock<std::mutex> lock(mutex_); cv.wait_for(lock, std::chrono::milliseconds(milliseconds), [this] { return num_tasks <= 0; }); num_tasks = 0; } bool FAILOVER_SYNC::is_completed() { std::unique_lock<std::mutex> lock(mutex_); return num_tasks <= 0; } // ************* FAILOVER *********************************** // Base class of two writer failover task handlers FAILOVER::FAILOVER( std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, unsigned long dbc_id, bool enable_logging) : connection_handler{connection_handler}, topology_service{topology_service}, dbc_id{dbc_id}, new_connection{nullptr} { if (enable_logging) logger = init_log_file(); } bool FAILOVER::is_writer_connected() { return new_connection && new_connection->is_connected(); } bool FAILOVER::connect(std::shared_ptr<HOST_INFO> host_info) { new_connection = connection_handler->connect(host_info, nullptr); return is_writer_connected(); } void FAILOVER::sleep(int miliseconds) { std::this_thread::sleep_for(std::chrono::milliseconds(miliseconds)); } // Close new connection if not needed (other task finishes and returns first) void FAILOVER::release_new_connection() { if (new_connection) { new_connection->delete_ds(); delete new_connection; new_connection = nullptr; } } // ************************ RECONNECT_TO_WRITER_HANDLER // handler reconnecting to a given host, e.g. reconnect to a current writer RECONNECT_TO_WRITER_HANDLER::RECONNECT_TO_WRITER_HANDLER( std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, int connection_interval, unsigned long dbc_id, bool enable_logging) : FAILOVER{connection_handler, topology_service, dbc_id, enable_logging}, reconnect_interval_ms{connection_interval} {} RECONNECT_TO_WRITER_HANDLER::~RECONNECT_TO_WRITER_HANDLER() {} void RECONNECT_TO_WRITER_HANDLER::operator()( int id, std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync, std::shared_ptr<WRITER_FAILOVER_RESULT> result) { if (original_writer) { MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Attempting to " "re-connect to the current writer instance: %s", id, original_writer->get_host_port_pair().c_str()); while (!f_sync->is_completed()) { if (connect(original_writer)) { auto latest_topology = topology_service->get_topology(new_connection, true); if (latest_topology->total_hosts() > 0 && is_current_host_writer(original_writer, latest_topology)) { topology_service->mark_host_up(original_writer); if (f_sync->is_completed()) { break; } result->connected = true; result->is_new_host = false; result->new_topology = latest_topology; result->new_connection = std::move(new_connection); f_sync->mark_as_complete(true); MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Finished", id); return; } release_new_connection(); } sleep(reconnect_interval_ms); } MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Cancelled", id); } // Another thread finishes or both timeout, this thread is canceled release_new_connection(); MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Finished", id); } bool RECONNECT_TO_WRITER_HANDLER::is_current_host_writer( std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<CLUSTER_TOPOLOGY_INFO> latest_topology) { auto original_instance = original_writer->instance_name; if (original_instance.empty()) return false; auto latest_writer = latest_topology->get_writer(); auto latest_instance = latest_writer->instance_name; return latest_instance == original_instance; } // ************************ WAIT_NEW_WRITER_HANDLER // handler getting the latest cluster topology and connecting to a newly elected // writer WAIT_NEW_WRITER_HANDLER::WAIT_NEW_WRITER_HANDLER( std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology, std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler, int connection_interval, unsigned long dbc_id, bool enable_logging) : FAILOVER{connection_handler, topology_service, dbc_id, enable_logging}, current_topology{current_topology}, reader_handler{reader_handler}, read_topology_interval_ms{connection_interval} {} WAIT_NEW_WRITER_HANDLER::~WAIT_NEW_WRITER_HANDLER() {} void WAIT_NEW_WRITER_HANDLER::operator()( int id, std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync, std::shared_ptr<WRITER_FAILOVER_RESULT> result) { MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Attempting to connect to a new writer instance", id); while (!f_sync->is_completed()) { if (!is_writer_connected()) { connect_to_reader(f_sync); refresh_topology_and_connect_to_new_writer(original_writer, f_sync); clean_up_reader_connection(); } else { result->connected = true; result->is_new_host = true; result->new_topology = current_topology; result->new_connection = std::move(new_connection); f_sync->mark_as_complete(true); MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Finished", id); return; } } MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Cancelled", id); // Another thread finishes or both timeout, this thread is canceled clean_up_reader_connection(); release_new_connection(); MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Finished", id); } // Connect to a reader to later retrieve the latest topology void WAIT_NEW_WRITER_HANDLER::connect_to_reader(std::shared_ptr<FAILOVER_SYNC> f_sync) { while (!f_sync->is_completed()) { auto connection_result = reader_handler->get_reader_connection(current_topology, f_sync); if (connection_result->connected && connection_result->new_connection->is_connected()) { reader_connection = connection_result->new_connection; current_reader_host = connection_result->new_host; MYLOG_TRACE( logger, dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Connected to reader: %s", connection_result->new_host->get_host_port_pair().c_str()); break; } MYLOG_TRACE(logger, dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Failed to connect to any reader."); } } // Use just connected reader to refresh topology and try to connect to a new writer void WAIT_NEW_WRITER_HANDLER::refresh_topology_and_connect_to_new_writer( std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync) { while (!f_sync->is_completed()) { auto latest_topology = topology_service->get_topology(reader_connection, true); if (latest_topology->total_hosts() > 0) { current_topology = latest_topology; auto writer_candidate = current_topology->get_writer(); // Same case is handled by the reconnect handler if (!HOST_INFO::is_host_same(writer_candidate, original_writer)) { if (connect_to_writer(writer_candidate)) return; } } sleep(read_topology_interval_ms); } } // Try to connect to the writer candidate bool WAIT_NEW_WRITER_HANDLER::connect_to_writer( std::shared_ptr<HOST_INFO> writer_candidate) { MYLOG_TRACE(logger, dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Trying to connect to a new writer: %s", writer_candidate->get_host_port_pair().c_str()); if (HOST_INFO::is_host_same(writer_candidate, current_reader_host)) { new_connection = reader_connection; } else if (!connect(writer_candidate)) { topology_service->mark_host_down(writer_candidate); return false; } topology_service->mark_host_up(writer_candidate); return true; } // Close reader connection if not needed (open and not the same as current connection) void WAIT_NEW_WRITER_HANDLER::clean_up_reader_connection() { if (reader_connection && new_connection != reader_connection) { reader_connection->delete_ds(); delete reader_connection; reader_connection = nullptr; } } // ************************** FAILOVER_WRITER_HANDLER ************************** FAILOVER_WRITER_HANDLER::FAILOVER_WRITER_HANDLER( std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler, std::shared_ptr<CONNECTION_HANDLER> connection_handler, ctpl::thread_pool& thread_pool, int writer_failover_timeout_ms, int read_topology_interval_ms, int reconnect_writer_interval_ms, unsigned long dbc_id, bool enable_logging) : connection_handler{connection_handler}, topology_service{topology_service}, reader_handler{reader_handler}, thread_pool{thread_pool}, writer_failover_timeout_ms{writer_failover_timeout_ms}, read_topology_interval_ms{read_topology_interval_ms}, reconnect_writer_interval_ms{reconnect_writer_interval_ms}, dbc_id{dbc_id} { if (enable_logging) logger = init_log_file(); } FAILOVER_WRITER_HANDLER::~FAILOVER_WRITER_HANDLER() {} std::shared_ptr<WRITER_FAILOVER_RESULT> FAILOVER_WRITER_HANDLER::failover( std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology) { if (!current_topology || current_topology->total_hosts() == 0) { MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Failover was called with " "an invalid (null or empty) topology"); return std::make_shared<WRITER_FAILOVER_RESULT>(false, false, nullptr, nullptr); } const auto start = std::chrono::steady_clock::now(); auto failover_sync = std::make_shared<FAILOVER_SYNC>(2); // Constructing the function objects RECONNECT_TO_WRITER_HANDLER reconnect_handler( connection_handler, topology_service, reconnect_writer_interval_ms, dbc_id, logger != nullptr); WAIT_NEW_WRITER_HANDLER new_writer_handler( connection_handler, topology_service, current_topology, reader_handler, read_topology_interval_ms, dbc_id, logger != nullptr); auto original_writer = current_topology->get_writer(); topology_service->mark_host_down(original_writer); auto reconnect_result = std::make_shared<WRITER_FAILOVER_RESULT>(false, false, nullptr, nullptr); auto new_writer_result = std::make_shared<WRITER_FAILOVER_RESULT>(false, false, nullptr, nullptr); if (thread_pool.n_idle() <= 1) { int size = thread_pool.size() + 2 - thread_pool.n_idle(); MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Resizing thread pool to %d", size); thread_pool.resize(size); } auto reconnect_future = thread_pool.push(std::move(reconnect_handler), original_writer, failover_sync, reconnect_result); auto wait_new_writer_future = thread_pool.push(std::move(new_writer_handler), original_writer, failover_sync, new_writer_result); // Wait for task complete signal with specified timeout failover_sync->wait_and_complete(writer_failover_timeout_ms); // Constantly polling for results until timeout while (true) { // Check if reconnect task result is ready if (reconnect_future.valid() && reconnect_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { reconnect_future.get(); if (reconnect_result->connected) { MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Successfully re-connected to the current writer instance: %s", reconnect_result->new_topology->get_writer()->get_host_port_pair().c_str()); return reconnect_result; } } // Check if wait new writer task result is ready if (wait_new_writer_future.valid() && wait_new_writer_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { wait_new_writer_future.get(); if (new_writer_result->connected) { MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Successfully connected to the new writer instance: %s", new_writer_result->new_topology->get_writer()->get_host_port_pair().c_str()); return new_writer_result; } } // Results are ready but non has valid connection if (!reconnect_future.valid() && !wait_new_writer_future.valid()) { break; } // No result it ready, update remaining timeout const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start); const auto remaining_wait_ms = writer_failover_timeout_ms - duration.count(); if (remaining_wait_ms <= 0) { // Writer failover timed out MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Writer failover timed out. Failed to connect to the writer instance."); return std::make_shared<WRITER_FAILOVER_RESULT>(false, false, nullptr, nullptr); } } // Writer failover finished but not connected MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Failed to connect to the writer instance."); return std::make_shared<WRITER_FAILOVER_RESULT>(false, false, nullptr, nullptr); }