driver/failover.h (231 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License, version 2.0 // (GPLv2), as published by the Free Software Foundation, with the // following additional permissions: // // This program is distributed with certain software that is licensed // under separate terms, as designated in a particular file or component // or in the license documentation. Without limiting your rights under // the GPLv2, the authors of this program hereby grant you an additional // permission to link the program and your derivative works with the // separately licensed software that they have included with the program. // // Without limiting the foregoing grant of rights under the GPLv2 and // additional permission as to separately licensed software, this // program is also subject to the Universal FOSS Exception, version 1.0, // a copy of which can be found along with its FAQ at // http://oss.oracle.com/licenses/universal-foss-exception. // // This program is distributed in the hope that it will be useful, but // WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU General Public License, version 2.0, for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see // http://www.gnu.org/licenses/gpl-2.0.html. #ifndef __FAILOVER_H__ #define __FAILOVER_H__ #include "connection_handler.h" #include "connection_proxy.h" #include "topology_service.h" #include "mylog.h" #include <condition_variable> struct READER_FAILOVER_RESULT { bool connected = false; std::shared_ptr<HOST_INFO> new_host; CONNECTION_PROXY* new_connection; READER_FAILOVER_RESULT() : connected{false}, new_host{nullptr}, new_connection{nullptr} {} READER_FAILOVER_RESULT(bool connected, std::shared_ptr<HOST_INFO> new_host, CONNECTION_PROXY* new_connection) : connected{connected}, new_host{new_host}, new_connection{new_connection} {} }; // FAILOVER_SYNC enables synchronization between threads class FAILOVER_SYNC { public: FAILOVER_SYNC(int num_tasks); void increment_task(); void mark_as_complete(bool cancel_other_tasks); void wait_and_complete(int milliseconds); virtual bool is_completed(); private: int num_tasks; std::mutex mutex_; std::condition_variable cv; }; class FAILOVER_READER_HANDLER { public: FAILOVER_READER_HANDLER( std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<CONNECTION_HANDLER> connection_handler, ctpl::thread_pool& thread_pool, int failover_timeout_ms, int failover_reader_connect_timeout, bool enable_strict_reader_failover, unsigned long dbc_id, bool enable_logging = false); ~FAILOVER_READER_HANDLER(); std::shared_ptr<READER_FAILOVER_RESULT> failover( std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info); virtual std::shared_ptr<READER_FAILOVER_RESULT> get_reader_connection( std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info, std::shared_ptr<FAILOVER_SYNC> f_sync); std::vector<std::shared_ptr<HOST_INFO>> build_hosts_list( std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info, bool contain_writers); std::shared_ptr<READER_FAILOVER_RESULT> get_connection_from_hosts( std::vector<std::shared_ptr<HOST_INFO>> hosts_list, std::shared_ptr<FAILOVER_SYNC> global_sync); protected: int reader_connect_timeout_ms = 30000; // 30 sec int max_failover_timeout_ms = 60000; // 60 sec private: std::shared_ptr<TOPOLOGY_SERVICE> topology_service; std::shared_ptr<CONNECTION_HANDLER> connection_handler; const int READER_CONNECT_INTERVAL_SEC = 1; // 1 sec bool enable_strict_reader_failover = false; std::shared_ptr<FILE> logger = nullptr; unsigned long dbc_id = 0; ctpl::thread_pool& thread_pool; }; // This struct holds results of Writer Failover Process. struct WRITER_FAILOVER_RESULT { bool connected = false; bool is_new_host = false; // True if process connected to a new host. False if // process re-connected to the same host std::shared_ptr<CLUSTER_TOPOLOGY_INFO> new_topology; CONNECTION_PROXY* new_connection; WRITER_FAILOVER_RESULT() : connected{false}, is_new_host{false}, new_topology{nullptr}, new_connection{nullptr} {} WRITER_FAILOVER_RESULT(bool connected, bool is_new_host, std::shared_ptr<CLUSTER_TOPOLOGY_INFO> new_topology, CONNECTION_PROXY* new_connection) : connected{connected}, is_new_host{is_new_host}, new_topology{new_topology}, new_connection{new_connection} {} }; class FAILOVER_WRITER_HANDLER { public: FAILOVER_WRITER_HANDLER( std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler, std::shared_ptr<CONNECTION_HANDLER> connection_handler, ctpl::thread_pool& thread_pool, int writer_failover_timeout_ms, int read_topology_interval_ms, int reconnect_writer_interval_ms, unsigned long dbc_id, bool enable_logging = false); ~FAILOVER_WRITER_HANDLER(); std::shared_ptr<WRITER_FAILOVER_RESULT> failover( std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology); protected: int read_topology_interval_ms = 5000; // 5 sec int reconnect_writer_interval_ms = 5000; // 5 sec int writer_failover_timeout_ms = 60000; // 60 sec private: std::shared_ptr<TOPOLOGY_SERVICE> topology_service; std::shared_ptr<CONNECTION_HANDLER> connection_handler; std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler; std::shared_ptr<FILE> logger = nullptr; unsigned long dbc_id = 0; ctpl::thread_pool& thread_pool; }; class FAILOVER_HANDLER { public: FAILOVER_HANDLER(DBC* dbc, DataSource* ds); FAILOVER_HANDLER( DBC* dbc, DataSource* ds, std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<CLUSTER_AWARE_METRICS_CONTAINER> metrics_container); ~FAILOVER_HANDLER(); SQLRETURN init_connection(); bool trigger_failover_if_needed(const char* error_code, const char*& new_error_code, const char*& error_msg); bool is_failover_enabled(); bool is_rds(); bool is_rds_proxy(); bool is_cluster_topology_available(); void invoke_start_time(); std::string cluster_id = DEFAULT_CLUSTER_ID; private: DBC* dbc = nullptr; DataSource* ds = nullptr; std::shared_ptr<TOPOLOGY_SERVICE> topology_service; std::shared_ptr<FAILOVER_READER_HANDLER> failover_reader_handler; std::shared_ptr<FAILOVER_WRITER_HANDLER> failover_writer_handler; std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology; std::shared_ptr<HOST_INFO> current_host = nullptr; std::shared_ptr<CONNECTION_HANDLER> connection_handler = nullptr; bool m_is_cluster_topology_available = false; bool m_is_multi_writer_cluster = false; bool m_is_rds_proxy = false; bool m_is_rds = false; bool m_is_rds_custom_cluster = false; bool is_cluster_info_initialized = false; void init_cluster_info(); bool should_connect_to_new_writer(); void initialize_topology(); bool is_read_only(); virtual std::string host_to_IP(std::string host); SQLRETURN reconnect(bool failover_enabled); static bool is_failover_mode(const char* expected_mode, DataSource* ds); bool failover_to_reader(const char*& new_error_code, const char*& error_msg); bool failover_to_writer(const char*& new_error_code, const char*& error_msg); void set_cluster_id(std::string host, int port); void set_cluster_id(std::string cluster_id); std::shared_ptr<CLUSTER_AWARE_METRICS_CONTAINER> metrics_container; std::chrono::steady_clock::time_point invoke_start_time_ms; std::chrono::steady_clock::time_point failover_start_time_ms; #ifdef UNIT_TEST_BUILD // Allows for testing private/protected methods friend class TEST_UTILS; #endif }; // ************************************************************************************************ // These are failover utilities/helpers. Perhaps belong to a separate header // file, but here for now // class FAILOVER { public: FAILOVER(std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, unsigned long dbc_id, bool enable_logging = false); virtual ~FAILOVER() = default; bool is_writer_connected(); protected: bool connect(std::shared_ptr<HOST_INFO> host_info); void sleep(int miliseconds); void release_new_connection(); std::shared_ptr<CONNECTION_HANDLER> connection_handler; std::shared_ptr<TOPOLOGY_SERVICE> topology_service; CONNECTION_PROXY* new_connection; std::shared_ptr<FILE> logger = nullptr; unsigned long dbc_id = 0; }; class CONNECT_TO_READER_HANDLER : public FAILOVER { public: CONNECT_TO_READER_HANDLER( std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, unsigned long dbc_id, bool enable_logging = false); ~CONNECT_TO_READER_HANDLER(); void operator()( int id, std::shared_ptr<HOST_INFO> reader, std::shared_ptr<FAILOVER_SYNC> f_sync, std::shared_ptr<READER_FAILOVER_RESULT> result); }; class RECONNECT_TO_WRITER_HANDLER : public FAILOVER { public: RECONNECT_TO_WRITER_HANDLER( std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, int connection_interval, unsigned long dbc_id, bool enable_logging = false); ~RECONNECT_TO_WRITER_HANDLER(); void operator()( int id, std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync, std::shared_ptr<WRITER_FAILOVER_RESULT> result); private: int reconnect_interval_ms; bool is_current_host_writer( std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<CLUSTER_TOPOLOGY_INFO> latest_topology); }; class WAIT_NEW_WRITER_HANDLER : public FAILOVER { public: WAIT_NEW_WRITER_HANDLER( std::shared_ptr<CONNECTION_HANDLER> connection_handler, std::shared_ptr<TOPOLOGY_SERVICE> topology_service, std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology, std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler, int connection_interval, unsigned long dbc_id, bool enable_logging = false); ~WAIT_NEW_WRITER_HANDLER(); void operator()( int id, std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync, std::shared_ptr<WRITER_FAILOVER_RESULT> result); private: // TODO - initialize in constructor and define constant for default value int read_topology_interval_ms = 5000; std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler; std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology; CONNECTION_PROXY* reader_connection = nullptr; // To retrieve latest topology std::shared_ptr<HOST_INFO> current_reader_host = nullptr; void refresh_topology_and_connect_to_new_writer( std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync); void connect_to_reader(std::shared_ptr<FAILOVER_SYNC> f_sync); bool connect_to_writer(std::shared_ptr<HOST_INFO> writer_candidate); void clean_up_reader_connection(); }; #endif /* __FAILOVER_H__ */