sql/slave_stats_daemon.cc (161 lines of code) (raw):

#include "my_global.h" #include "mysql.h" #include "mysql_com.h" #include "sql_base.h" #include "sql_show.h" #include "sql_string.h" #include "global_threads.h" #include "srv_session.h" #include "my_time.h" #include "rpl_slave.h" #include "rpl_mi.h" #include "slave_stats_daemon.h" #if OPENSSL_VERSION_NUMBER >= 0x10100000L // Function removed after OpenSSL 1.1.0 #define ERR_remove_state(x) #endif /* * The Slave stats daemon thread is responsible for * continuously sending lag statistics from slaves to masters */ pthread_t slave_stats_daemon_thread; mysql_mutex_t LOCK_slave_stats_daemon; mysql_cond_t COND_slave_stats_daemon; /* connection/read timeout in seconds*/ const int REPLICA_STATS_NET_TIMEOUT = 5; #ifdef HAVE_REPLICATION static bool abort_slave_stats_daemon; static bool connected_to_master = false; /** Create and initialize the mysql object, and connect to the master. @retval true if connection successful @retval false otherwise. */ static int safe_connect_slave_stats_thread_to_master(MYSQL * &mysql) { if (mysql != NULL) { mysql_close(mysql); } mysql = mysql_init(NULL); if (!mysql) { return false; } mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &REPLICA_STATS_NET_TIMEOUT); mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &REPLICA_STATS_NET_TIMEOUT); configure_master_connection_options(mysql, active_mi); char pass[MAX_PASSWORD_LENGTH + 1]; int password_size = sizeof(pass); if (active_mi->get_password(pass, &password_size)) { return false; } DBUG_EXECUTE_IF("dbug.replica_stats_force_disconnet_primary", {return false;}); if (!mysql_real_connect(mysql, active_mi->host, active_mi->get_user(), pass, 0, active_mi->port, 0, 0)) { return false; } return true; } pthread_handler_t handle_slave_stats_daemon(void *arg MY_ATTRIBUTE((unused))) { THD *thd= NULL; int error = 0; struct timespec abstime; DBUG_ENTER("handle_slave_stats_daemon"); pthread_detach_this_thread(); slave_stats_daemon_thread = pthread_self(); MYSQL *mysql = nullptr; while(true) { mysql_mutex_lock(&LOCK_slave_stats_daemon); set_timespec(abstime, write_stats_frequency); while ((!error || error == EINTR) && !abort_slave_stats_daemon) { /* write_stats_frequency is set to 0. Do not send stats to master. Wait until a signal is received either for aborting the thread or for updating write_stats_frequency. */ if (write_stats_frequency == 0) { error = mysql_cond_wait(&COND_slave_stats_daemon, &LOCK_slave_stats_daemon); } else { /* wait for write_stats_frequency seconds before sending next set of slave lag statistics */ error = mysql_cond_timedwait(&COND_slave_stats_daemon, &LOCK_slave_stats_daemon, &abstime); } } mysql_mutex_unlock(&LOCK_slave_stats_daemon); if (abort_slave_stats_daemon) break; if (enable_raft_plugin && !raft_send_replica_statistics) { error = 0; continue; } if (error == ETIMEDOUT) { // Initialize connection thd, if not already done. if (thd == NULL) { my_thread_init(); thd= new THD; THD_CHECK_SENTRY(thd); thd->thread_stack= (char*) &thd; my_net_init(thd->get_net(), 0); thd->store_globals(); } DBUG_EXECUTE_IF("dbug.replica_stats_force_disconnet_primary", {connected_to_master = false;}); // If not connected to current master, try connection. If not // successful, try again in next cycle if (!connected_to_master) { connected_to_master = safe_connect_slave_stats_thread_to_master(mysql); if (connected_to_master) { DBUG_PRINT("info", ("Slave Stats Daemon: connected to master '%s@%s:%d'", active_mi->get_user(), active_mi->host, active_mi->port)); } else { DBUG_PRINT("info", ("Slave Stats Daemon: Couldn't connect to master '%s@%s:%d', " "will try again during next cycle, (Error: %s)", active_mi->get_user(), active_mi->host, active_mi->port, mysql_error(mysql))); } } if (connected_to_master && (enable_raft_plugin || active_mi->slave_running == MYSQL_SLAVE_RUN_CONNECT)) { if (send_replica_statistics_to_master(mysql, active_mi)) { DBUG_PRINT("info", ("Slave Stats Daemon: Failed to send lag " "statistics, resetting connection, (Error: %s)", mysql_error(mysql))); connected_to_master = false; } } error = 0; } } mysql_close(mysql); mysql = nullptr; connected_to_master = false; if (thd != NULL) { net_end(thd->get_net()); thd->release_resources(); delete (thd); } // DBUG_LEAVE; // Can't use DBUG_RETURN after my_thread_end my_thread_end(); ERR_remove_state(0); DBUG_ASSERT(slave_stats_daemon_thread_counter > 0); slave_stats_daemon_thread_counter--; pthread_exit(0); return (NULL); } /* Start handle Slave Stats Daemon thread */ bool start_handle_slave_stats_daemon() { DBUG_ENTER("start_handle_slave_stats_daemon"); pthread_t hThread; int error; slave_stats_daemon_thread_counter++; error = mysql_thread_create(key_thread_handle_slave_stats_daemon, &hThread, &connection_attrib, handle_slave_stats_daemon, 0); if (error) { sql_print_warning( "Can't create Slave Stats Daemon thread (errno= %d)", error); DBUG_ASSERT(slave_stats_daemon_thread_counter > 0); slave_stats_daemon_thread_counter--; DBUG_RETURN(false); } sql_print_information("Successfully created Slave Stats Daemon thread: 0x%lx", (ulong)slave_stats_daemon_thread); DBUG_RETURN(true); } /* Initiate shutdown of handle Slave Stats Daemon thread */ void stop_handle_slave_stats_daemon() { DBUG_ENTER("stop_handle_slave_stats_daemon"); abort_slave_stats_daemon = true; mysql_mutex_lock(&LOCK_slave_stats_daemon); sql_print_information("Shutting down Slave Stats Daemon thread: 0x%lx", (ulong)slave_stats_daemon_thread); mysql_cond_signal(&COND_slave_stats_daemon); mysql_mutex_unlock(&LOCK_slave_stats_daemon); while(slave_stats_daemon_thread_counter > 0) { // wait for the thread to finish, sleep for 10ms my_sleep(10000); } // Reset abort_slave_stats_daemon so slave_stats_daemon can be spawned in future abort_slave_stats_daemon = false; DBUG_VOID_RETURN; } #endif