sql/binlog.h (910 lines of code) (raw):

#ifndef BINLOG_H_INCLUDED /* Copyright (c) 2010, 2018, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #define BINLOG_H_INCLUDED #include "mysqld.h" /* opt_relay_logname */ #include "log_event.h" #include "log.h" #include "rpl_gtid.h" #include <atomic> #include <list> #include <unordered_map> extern ulong rpl_read_size; extern char *histogram_step_size_binlog_fsync; extern int opt_histogram_step_size_binlog_group_commit; extern char* opt_histogram_binlog_commit_time_step_size; extern latency_histogram histogram_binlog_fsync; extern latency_histogram histogram_raft_trx_wait; extern latency_histogram histogram_binlog_group_commit_trx; extern latency_histogram histogram_binlog_engine_commit_trx; extern counter_histogram histogram_binlog_group_commit; extern Slow_log_throttle log_throttle_sbr_unsafe_query; class Relay_log_info; class Master_info; class Format_description_log_event; struct RaftRotateInfo; /* The enum defining the server's action when a trx fails inside ordered commit * due to an error related to consensus (raft plugin) */ enum enum_commit_consensus_error_actions { /* Transactions that fail in ordered commit will be rolled back. * Currently all trxs in the group will be rolled back when the leader thread * of the group fails. An optimization to just rollback the failing trx is * left as a TODO */ ROLLBACK_TRXS_IN_GROUP= 0, /* Ignore consensus errors and proceed as usual. Might be useful as an * override in some cases like consensus plugin bugs, easier rollouts, full * region failures etc */ IGNORE_COMMIT_CONSENSUS_ERROR= 1, INVALID_COMMIT_CONSENSUS_ERROR_ACTION }; enum enum_raft_signal_async_dump_threads_options { AFTER_CONSENSUS= 0, AFTER_ENGINE_COMMIT= 1, INVALID_OPTION }; /** Class for maintaining the commit stages for binary log group commit. */ class Stage_manager { public: class Mutex_queue { friend class Stage_manager; public: Mutex_queue() : m_first(NULL), m_last(&m_first), group_prepared_engine(NULL) { } void init( #ifdef HAVE_PSI_INTERFACE PSI_mutex_key key_LOCK_queue #endif ) { mysql_mutex_init(key_LOCK_queue, &m_lock, MY_MUTEX_INIT_FAST); } void deinit() { mysql_mutex_destroy(&m_lock); if (group_prepared_engine) { delete group_prepared_engine; } } bool is_empty() const { return m_first == NULL; } /** Append a linked list of threads to the queue */ bool append(THD *first); /** Fetch the entire queue for a stage. This will fetch the entire queue in one go. */ THD *fetch_and_empty(); private: void lock() { mysql_mutex_lock(&m_lock); } void unlock() { mysql_mutex_unlock(&m_lock); } /** Pointer to the first thread in the queue, or NULL if the queue is empty. */ THD *m_first; /** Pointer to the location holding the end of the queue. This is either @c &first, or a pointer to the @c next_to_commit of the last thread that is enqueued. */ THD **m_last; /** Store the max prepared log for each engine that supports ha_flush_logs. We have to init group_prepared_engine after all plugins are inited. */ engine_lsn_map* group_prepared_engine; /** Lock for protecting the queue. */ mysql_mutex_t m_lock; } MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE))); public: Stage_manager() { } ~Stage_manager() { } /** Constants for queues for different stages. */ enum StageID { FLUSH_STAGE, SYNC_STAGE, SEMISYNC_STAGE, COMMIT_STAGE, STAGE_COUNTER }; void init( #ifdef HAVE_PSI_INTERFACE PSI_mutex_key key_LOCK_flush_queue, PSI_mutex_key key_LOCK_sync_queue, PSI_mutex_key key_LOCK_semisync_queue, PSI_mutex_key key_LOCK_commit_queue, PSI_mutex_key key_LOCK_done, PSI_cond_key key_COND_done #endif ) { mysql_mutex_init(key_LOCK_done, &m_lock_done, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_done, &m_cond_done, NULL); #ifndef DBUG_OFF /* reuse key_COND_done 'cos a new PSI object would be wasteful in DBUG_ON */ mysql_cond_init(key_COND_done, &m_cond_preempt, NULL); #endif m_queue[FLUSH_STAGE].init( #ifdef HAVE_PSI_INTERFACE key_LOCK_flush_queue #endif ); m_queue[SYNC_STAGE].init( #ifdef HAVE_PSI_INTERFACE key_LOCK_sync_queue #endif ); m_queue[SEMISYNC_STAGE].init( #ifdef HAVE_PSI_INTERFACE key_LOCK_semisync_queue #endif ); m_queue[COMMIT_STAGE].init( #ifdef HAVE_PSI_INTERFACE key_LOCK_commit_queue #endif ); } void deinit() { for (size_t i = 0 ; i < STAGE_COUNTER ; ++i) m_queue[i].deinit(); mysql_cond_destroy(&m_cond_done); mysql_mutex_destroy(&m_lock_done); } /** Enroll a set of sessions for a stage. This will queue the session thread for writing and flushing. If the thread being queued is assigned as stage leader, it will return immediately. If wait_if_follower is true the thread is not the stage leader, the thread will be wait for the queue to be processed by the leader before it returns. In DBUG-ON version the follower marks is preempt status as ready. @param stage Stage identifier for the queue to append to. @param first Queue to append. @param leave_mutex Pointer to the currently held stage mutex, or NULL if we're not in a stage. @param enter_mutex Pointer to the mutex for the stage being entered. @retval true Thread is stage leader. @retval false Thread was not stage leader and processing has been done. */ bool enroll_for(StageID stage, THD *first, mysql_mutex_t *leave_mutex, mysql_mutex_t *enter_mutex); #ifndef DBUG_OFF /** The method ensures the follower's execution path can be preempted by the leader's thread. Preempt status of @c head follower is checked to engange the leader into waiting when set. @param head THD* of a follower thread */ void clear_preempt_status(THD *head); #endif /** Fetch the entire queue and empty it. @return Pointer to the first session of the queue. */ THD *fetch_queue_for(StageID stage) { DBUG_PRINT("debug", ("Fetching queue for stage %d", stage)); return m_queue[stage].fetch_and_empty(); } void signal_done(THD *queue) { mysql_mutex_lock(&m_lock_done); for (THD *thd= queue ; thd ; thd = thd->next_to_commit) thd->transaction.flags.pending= false; mysql_mutex_unlock(&m_lock_done); mysql_cond_broadcast(&m_cond_done); } private: /** Queues for sessions. We need two queues: - Waiting. Threads waiting to be processed - Committing. Threads waiting to be committed. */ Mutex_queue m_queue[STAGE_COUNTER]; /** Condition variable to indicate that the commit was processed */ mysql_cond_t m_cond_done; /** Mutex used for the condition variable above */ mysql_mutex_t m_lock_done; #ifndef DBUG_OFF /** Flag is set by Leader when it starts waiting for follower's all-clear */ bool leader_await_preempt_status; /** Condition variable to indicate a follower started waiting for commit */ mysql_cond_t m_cond_preempt; #endif }; /** * A class abstracting a hybrid logical clock. Some important aspects of this * clock: * 1. 64 bit unsigned integer is used to track nanosecond precision * ticks (this should suffice well into 2500 AD) * 2. There is no explicit 'Logical' component. The rationale is that it is not * common to generate 'events' frequently at a rate of greater than 1 per * nanosecond. If we do hit this ocassionally, then this clock simply * increments the current wall clock by 1 and sets that as the current * nanosecond time. The wall clock should eventually catch up with the * internal nanosecond clock. */ class HybridLogicalClock { public: explicit HybridLogicalClock(uint64_t start) { current_= start; } HybridLogicalClock() { current_= 0; } ~HybridLogicalClock() = default; /** * Get the next hlc value and update the internal clock to match the next hlc * value. By definition of this clock (see above), the next value is * max(current+1, wall-clock). This is called when we want to assign a HLC * timestamp to a event. * * @return The next HLC */ uint64_t get_next(); /** * Get the current value of the HLC * * @return The current value of internal nanosecond clock */ uint64_t get_current(); /** * Update the internal clock to a value greater than or equal to minimum_hlc * and return the updated value. This method is used to synchronize HLC * clocks across different instances OR to set a minimum bound on the next * hlc by an external entity. Returns the value of the updated internal hlc * * @param minimum_hlc - The minimum/lower bound on the HLC * * @return The hlc after setting the minimum bound */ uint64_t update(uint64_t minimum_hlc); /** * Update the applied HLC for specified databases * * @param databases - the databases for which hlc needs to be updated * @param applied_hlc - Applied HLC */ void update_database_hlc( const std::unordered_set<std::string>& databases, uint64_t applied_hlc); /** * Get the applied hlc for all the database that is being tracked by this * clock * * @param [out] A map of database->applied_hlc */ void get_database_hlc(std::unordered_map<std::string, uint64_t>& applied_hlc); /** * Get the applied hlc for the specified database * * @param [in] A database name to select HLC for * * @return The HLC for the specified database, or 0 if not found */ uint64_t get_selected_database_hlc(const std::string& database); /** * Clear database HLC map */ void clear_database_hlc(); /** * Block the THD if the query attribute specified HLC isn't * present in the engine according to database_applied_hlc_ */ bool wait_for_hlc_applied(THD *thd, TABLE_LIST *all_tables); /** * Check that lower HLC bound requirements are satisfied for * insert/update/delete queries. */ bool check_hlc_bound(THD *thd); /** * Verify if the given HLC value is 'valid', by which it isn't 0 or intmax * * @param [in] The HLC value to validate */ static bool is_valid_hlc(uint64_t hlc) { return hlc != 0 && hlc != ULLONG_MAX; } /** * Convert a string to an HLC value. * * @param hlc_str [in] The string representing an HLC value. * * @param hlc [out] a pointer to a numeric variable that will receive the * HLC value read from "hlc_str" * * @return "true" if the conversion was successfull and "hlc" parameter * was updated with the conversion result, "false" otherwise. */ static bool str_to_hlc(const char *hlc_str, uint64_t *hlc); /** * Check if upper and lower HLC bound attributes are set, and store * their values in the transaction context object. */ bool capture_hlc_bound(THD *thd); private: // nanosecond precision internal clock std::atomic<uint64_t> current_; /** * A map of applied HLC for each database. The key is the name of the database * and the value is the applied_hlc for that database. Applied HLC is the HLC * of the last known trx that was applied (committed) to the engine */ std::unordered_map<std::string, uint64_t> database_applied_hlc_; std::mutex database_applied_hlc_lock_; // Per-database entry to track the list of waiting queries class DatabaseEntry { public: DatabaseEntry() { #ifdef HAVE_PSI_INTERFACE mysql_mutex_init(key_hlc_wait_mutex, &mutex_, MY_MUTEX_INIT_FAST); mysql_cond_init(key_hlc_wait_cond, &cond_, nullptr); #else mysql_mutex_init(0, &mutex_, MY_MUTEX_INIT_FAST); mysql_cond_init(0, &cond_, nullptr); #endif } ~DatabaseEntry() { mysql_mutex_destroy(&mutex_); mysql_cond_destroy(&cond_); } void update_hlc(uint64_t applied_hlc); bool wait_for_hlc(THD *thd, uint64_t requested_hlc, uint64_t timeout_ms); uint64_t max_applied_hlc() const { return max_applied_hlc_; } private: mysql_mutex_t mutex_; mysql_cond_t cond_; std::atomic<uint64_t> max_applied_hlc_{0}; }; std::shared_ptr<DatabaseEntry> getEntry(const std::string& database) { std::shared_ptr<DatabaseEntry> entry = nullptr; auto entryIt = database_map_.find(database); if (entryIt == database_map_.end()) { entry = std::make_shared<DatabaseEntry>(); database_map_.emplace(database, entry); } else { entry = entryIt->second; } return entry; } std::unordered_map<std::string, std::shared_ptr<DatabaseEntry>> database_map_; mutable std::mutex database_map_lock_; }; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { friend class Dump_log; public: enum enum_read_gtids_from_binlog_status { GOT_GTIDS, GOT_PREVIOUS_GTIDS, NO_GTIDS, ERROR, TRUNCATED }; private: #ifdef HAVE_PSI_INTERFACE /** The instrumentation key to use for @ LOCK_index. */ PSI_mutex_key m_key_LOCK_index; PSI_mutex_key m_key_COND_done; PSI_mutex_key m_key_LOCK_commit_queue; PSI_mutex_key m_key_LOCK_semisync_queue; PSI_mutex_key m_key_LOCK_done; PSI_mutex_key m_key_LOCK_flush_queue; PSI_mutex_key m_key_LOCK_sync_queue; /** The instrumentation key to use for @ LOCK_commit. */ PSI_mutex_key m_key_LOCK_commit; /** The instrumentation key to use for @ LOCK_semisync. */ PSI_mutex_key m_key_LOCK_semisync; /** The instrumentation key to use for @ LOCK_sync. */ PSI_mutex_key m_key_LOCK_sync; /** The instrumentation key to use for @ LOCK_xids. */ PSI_mutex_key m_key_LOCK_xids; PSI_mutex_key m_key_LOCK_non_xid_trxs; /** The instrumentation key to use for @ LOCK_binlog. */ PSI_mutex_key m_key_LOCK_binlog_end_pos; /** The instrumentation key to use for @ update_cond. */ PSI_cond_key m_key_update_cond; /** The instrumentation key to use for @ prep_xids_cond. */ PSI_cond_key m_key_prep_xids_cond; /** The instrumentation key to use for @ non_xid_trxs_cond. */ PSI_cond_key m_key_non_xid_trxs_cond; /** The instrumentation key to use for opening the log file. */ PSI_file_key m_key_file_log; /** The instrumentation key to use for opening the log index file. */ PSI_file_key m_key_file_log_index; #endif /* POSIX thread objects are inited by init_pthread_objects() */ mysql_mutex_t LOCK_index; mysql_mutex_t LOCK_commit; mysql_mutex_t LOCK_semisync; mysql_mutex_t LOCK_sync; mysql_mutex_t LOCK_xids; mysql_mutex_t LOCK_non_xid_trxs; mysql_mutex_t LOCK_binlog_end_pos; mysql_cond_t update_cond; ulonglong bytes_written; IO_CACHE index_file; char index_file_name[FN_REFLEN]; /* Mapping from binlog file name to the previous gtid set in encoded form which is found at the top of the binlog as Previous_gtids_log_event. This structure is protected by LOCK_index mutex. A new mapping is added in add_log_to_index() function, and this is totally rebuilt in init_gtid_sets() function. */ Gtid_set_map previous_gtid_set_map; /* crash_safe_index_file is temp file used for guaranteeing index file crash safe when master server restarts. */ IO_CACHE crash_safe_index_file; char crash_safe_index_file_name[FN_REFLEN]; /* purge_file is a temp file used in purge_logs so that the index file can be updated before deleting files from disk, yielding better crash recovery. It is created on demand the first time purge_logs is called and then reused for subsequent calls. It is cleaned up in cleanup(). */ IO_CACHE purge_index_file; char purge_index_file_name[FN_REFLEN]; /* The max size before rotation (usable only if log_type == LOG_BIN: binary logs and relay logs). For a binlog, max_size should be max_binlog_size. For a relay log, it should be max_relay_log_size if this is non-zero, max_binlog_size otherwise. max_size is set in init(), and dynamically changed (when one does SET GLOBAL MAX_BINLOG_SIZE|MAX_RELAY_LOG_SIZE) by fix_max_binlog_size and fix_max_relay_log_size). */ ulong max_size; // current file sequence number for load data infile binary logging uint file_id; uint open_count; // For replication int readers_count; /* pointer to the sync period variable, for binlog this will be sync_binlog_period, for relay log this will be sync_relay_log_period */ uint *sync_period_ptr; uint sync_counter; my_atomic_rwlock_t m_prep_xids_lock; mysql_cond_t m_prep_xids_cond; volatile int32 m_prep_xids; my_off_t binlog_end_pos; // binlog_file_name is updated under LOCK_binlog_end_pos mutex // to match the latest log_file_name contents. This variable is used // in the execution of commands SHOW MASTER STATUS / SHOW BINARY LOGS // to avoid taking LOCK_log mutex. // // binlog_file_name is protected by LOCK_binlog_end_pos mutex where as // log_file_name is protected by LOCK_log mutex. char binlog_file_name[FN_REFLEN]; /** Increment the prepared XID counter. */ void inc_prep_xids(THD *thd) { DBUG_ENTER("MYSQL_BIN_LOG::inc_prep_xids"); my_atomic_rwlock_wrlock(&m_prep_xids_lock); #ifndef DBUG_OFF int result= my_atomic_add32(&m_prep_xids, 1); #else (void) my_atomic_add32(&m_prep_xids, 1); #endif DBUG_PRINT("debug", ("m_prep_xids: %d", result + 1)); my_atomic_rwlock_wrunlock(&m_prep_xids_lock); thd->transaction.flags.xid_written= true; DBUG_VOID_RETURN; } /** Decrement the prepared XID counter. Signal m_prep_xids_cond if the counter reaches zero. */ void dec_prep_xids(THD *thd) { DBUG_ENTER("MYSQL_BIN_LOG::dec_prep_xids"); my_atomic_rwlock_wrlock(&m_prep_xids_lock); int32 result= my_atomic_add32(&m_prep_xids, -1); DBUG_PRINT("debug", ("m_prep_xids: %d", result - 1)); my_atomic_rwlock_wrunlock(&m_prep_xids_lock); thd->transaction.flags.xid_written= false; /* If the old value was 1, it is zero now. */ if (result == 1) { mysql_mutex_lock(&LOCK_xids); mysql_cond_signal(&m_prep_xids_cond); mysql_mutex_unlock(&LOCK_xids); } DBUG_VOID_RETURN; } int32 get_prep_xids() { my_atomic_rwlock_rdlock(&m_prep_xids_lock); int32 result= my_atomic_load32(&m_prep_xids); my_atomic_rwlock_rdunlock(&m_prep_xids_lock); return result; } inline uint get_sync_period() { return *sync_period_ptr; } int write_to_file(IO_CACHE *cache); /* This is used to start writing to a new log file. The difference from new_file() is locking. new_file_without_locking() does not acquire LOCK_log. */ int new_file_without_locking(Format_description_log_event *extra_description_event); /** Manage the stages in ordered_commit. */ Stage_manager stage_manager; void do_flush(THD *thd); uint32_t non_xid_trxs; mysql_cond_t non_xid_trxs_cond; void inc_non_xid_trxs(THD *thd) { DBUG_ENTER("MYSQL_BIN_LOG::inc_non_xid_trxs"); mysql_mutex_lock(&LOCK_non_xid_trxs); ++non_xid_trxs; thd->non_xid_trx= true; mysql_mutex_unlock(&LOCK_non_xid_trxs); DBUG_VOID_RETURN; } void dec_non_xid_trxs(THD *thd) { DBUG_ENTER("MYSQL_BIN_LOG::dec_non_xid_trxs"); mysql_mutex_lock(&LOCK_non_xid_trxs); DBUG_ASSERT(non_xid_trxs > 0); if (non_xid_trxs > 0) --non_xid_trxs; thd->non_xid_trx= false; DBUG_PRINT("debug", ("non_xid_trxs: %d", non_xid_trxs)); /* Signal the threads that could be blocked in binlog rotation if the * non_xid_trxs is zero*/ if (non_xid_trxs == 0) mysql_cond_signal(&non_xid_trxs_cond); mysql_mutex_unlock(&LOCK_non_xid_trxs); DBUG_VOID_RETURN; } int32 get_non_xid_trxs() { mysql_mutex_assert_owner(&LOCK_non_xid_trxs); return non_xid_trxs; } public: using MYSQL_LOG::generate_name; using MYSQL_LOG::is_open; // Set to true if 'open' binlog was found during the trx log recovery bool open_binlog_found= false; // The starting position of the first gtid event in the trx log file my_off_t first_gtid_start_pos= 0; /* True if this binlog is an apply-log (in raft mode apply logs are the binlog * used as trx log on follower instances) * * This is set to true, at the end of converting a Raft FOLLOWER to a * MySQL Slave. It is set to false, when the Raft Candidate transitions * to LEADER, and converts the MySQL Slave to a MySQL Master as a part * of Election Decision Callback. * * @ref - rpl_handler.cc / point_binlog_to_binlog * rpl_handler.cc / point_binlog_to_apply */ bool is_apply_log= false; // Number of files that are created and maintained in index files std::atomic<uint64_t> apply_file_count; /* This is relay log */ bool is_relay_log; ulong signal_cnt; // update of the counter is checked by heartbeat uint8 checksum_alg_reset; // to contain a new value when binlog is rotated /* Holds the last seen in Relay-Log FD's checksum alg value. The initial value comes from the slave's local FD that heads the very first Relay-Log file. In the following the value may change with each received master's FD_m. Besides to be used in verification events that IO thread receives (except the 1st fake Rotate, see @c Master_info:: checksum_alg_before_fd), the value specifies if/how to compute checksum for slave's local events and the first fake Rotate (R_f^1) coming from the master. R_f^1 needs logging checksum-compatibly with the RL's heading FD_s. Legends for the checksum related comments: FD - Format-Description event, R - Rotate event R_f - the fake Rotate event E - an arbirary event The underscore indexes for any event `_s' indicates the event is generated by Slave `_m' - by Master Two special underscore indexes of FD: FD_q - Format Description event for queuing (relay-logging) FD_e - Format Description event for executing (relay-logging) Upper indexes: E^n - n:th event is a sequence RL - Relay Log (A) - checksum algorithm descriptor value FD.(A) - the value of (A) in FD */ uint8 relay_log_checksum_alg; MYSQL_BIN_LOG(uint *sync_period); /* note that there's no destructor ~MYSQL_BIN_LOG() ! The reason is that we don't want it to be automatically called on exit() - but only during the correct shutdown process */ char engine_binlog_file[FN_REFLEN + 1]; my_off_t engine_binlog_pos; Gtid engine_binlog_max_gtid; // copy of Relay_log_info::last_master_timestamp std::atomic<time_t> last_master_timestamp; #ifdef HAVE_PSI_INTERFACE void set_psi_keys(PSI_mutex_key key_LOCK_index, PSI_mutex_key key_LOCK_commit, PSI_mutex_key key_LOCK_commit_queue, PSI_mutex_key key_LOCK_semisync, PSI_mutex_key key_LOCK_semisync_queue, PSI_mutex_key key_LOCK_done, PSI_mutex_key key_LOCK_flush_queue, PSI_mutex_key key_LOCK_log, PSI_mutex_key key_LOCK_sync, PSI_mutex_key key_LOCK_sync_queue, PSI_mutex_key key_LOCK_xids, PSI_mutex_key key_LOCK_non_xid_trxs, PSI_mutex_key key_LOCK_binlog_end_pos, PSI_cond_key key_COND_done, PSI_cond_key key_update_cond, PSI_cond_key key_prep_xids_cond, PSI_cond_key key_non_xid_trxs_cond, PSI_file_key key_file_log, PSI_file_key key_file_log_index) { m_key_COND_done= key_COND_done; m_key_LOCK_commit_queue= key_LOCK_commit_queue; m_key_LOCK_semisync_queue = key_LOCK_semisync_queue; m_key_LOCK_done= key_LOCK_done; m_key_LOCK_flush_queue= key_LOCK_flush_queue; m_key_LOCK_sync_queue= key_LOCK_sync_queue; m_key_LOCK_index= key_LOCK_index; m_key_LOCK_log= key_LOCK_log; m_key_LOCK_commit= key_LOCK_commit; m_key_LOCK_semisync = key_LOCK_semisync; m_key_LOCK_sync= key_LOCK_sync; m_key_LOCK_xids= key_LOCK_xids; m_key_LOCK_non_xid_trxs= key_LOCK_non_xid_trxs; m_key_LOCK_binlog_end_pos = key_LOCK_binlog_end_pos; m_key_update_cond= key_update_cond; m_key_prep_xids_cond= key_prep_xids_cond; m_key_non_xid_trxs_cond= key_non_xid_trxs_cond; m_key_file_log= key_file_log; m_key_file_log_index= key_file_log_index; } #endif mysql_mutex_t* get_lock_index() { return &LOCK_index; } /** Find the oldest binary log that contains any GTID that is not in the given gtid set. This is done by scanning the map structure previous_gtid_set_map in reverse order. @param[out] binlog_file_name, the file name of oldest binary log found @param[in] gtid_set, the given gtid set @param[out] first_gtid, the first GTID information from the binary log file returned at binlog_file_name @param[out] errmsg, the error message outputted, which is left untouched if the function returns false @return false on success, true on error. */ bool find_first_log_not_in_gtid_set(char *binlog_file_name, const Gtid_set *gtid_set, Gtid *first_gtid, const char **errmsg); /** Builds the set of all GTIDs in the binary log, and the set of all lost GTIDs in the binary log, and stores each set in respective argument. This scans the index file from the beginning and builds previous_gtid_set_map. Since index file contains the previous gtid set in binary string format, this function doesn't open every binary log file. @param gtid_set Will be filled with all GTIDs in this binary log. @param lost_groups Will be filled with all GTIDs in the Previous_gtids_log_event of the first binary log that has a Previous_gtids_log_event. @param last_gtid Will be filled with the last availble GTID information in the binary/relay log files. @param verify_checksum If true, checksums will be checked. @param need_lock If true, LOCK_log, LOCK_index, and global_sid_lock->wrlock are acquired; otherwise they are asserted to be taken already. @param max_prev_hlc [out] The max prev HLC stored in the binlog @param startup True if the server is starting up. @return false on success, true on error. */ bool init_gtid_sets(Gtid_set *gtid_set, Gtid_set *lost_groups, Gtid *last_gtid, bool verify_checksum, bool need_lock, uint64_t *max_prev_hlc= NULL, bool startup= false); /** * This function is used by binlog_change_to_apply to update * the previous gtid set map, since we don't call init_gtid_sets * which would have initialized it from disk */ bool init_prev_gtid_sets_map(); void get_lost_gtids(Gtid_set *gtids) { gtids->clear(); mysql_mutex_lock(&LOCK_index); auto it = previous_gtid_set_map.begin(); if (it != previous_gtid_set_map.end() && !it->second.empty()) gtids->add_gtid_encoding( (const uchar*)it->second.c_str(), it->second.length()); mysql_mutex_unlock(&LOCK_index); } enum_read_gtids_from_binlog_status read_gtids_from_binlog(const char *filename, Gtid_set *all_gtids, Gtid_set *prev_gtids, Gtid *first_gtid, Gtid *last_gtid, Sid_map *sid_map, bool verify_checksum, my_off_t max_pos= ULONGLONG_MAX, uint64_t *max_prev_hlc= NULL); void set_previous_gtid_set(Gtid_set *previous_gtid_set_param) { previous_gtid_set= previous_gtid_set_param; } // Extract HLC time (either prev_hlc or regular hlc) from Metadata_log_event uint64_t extract_hlc(Metadata_log_event *metadata_ev); /* Return the HLC timestamp for the caller (next txn) */ uint64_t get_next_hlc() { return hlc.get_next(); } uint64_t get_current_hlc() { return hlc.get_current(); } /* Update the minimum HLC value. This is used to set the lower bound on the HLC for this instance. This is achieved by exposing a global system var 'minimum hlc' - updates on which will call this function. This can also be used to synchronize HLC across different communicating instances */ uint64_t update_hlc(uint64_t minimum_hlc) { return hlc.update(minimum_hlc); } /* get the applied HLC for all known databases in this instance */ void get_database_hlc( std::unordered_map<std::string, uint64_t>& database_hlc) { return hlc.get_database_hlc(database_hlc); } uint64_t get_selected_database_hlc(const std::string& database) { return hlc.get_selected_database_hlc(database); } void clear_database_hlc() { return hlc.clear_database_hlc(); } bool wait_for_hlc_applied(THD *thd, TABLE_LIST *all_tables) { return hlc.wait_for_hlc_applied(thd, all_tables); } bool check_hlc_bound(THD *thd) { return hlc.check_hlc_bound(thd); } bool capture_hlc_bound(THD *thd) { return hlc.capture_hlc_bound(thd); } /* * @param raft_rotate_info * Rotate related information passed in by listener callbacks. * Caters today to relay log rotates, no-op rotates and config * change rotates. */ int new_file_impl(bool need_lock, Format_description_log_event *extra_description_event, RaftRotateInfo *raft_rotate_info= nullptr); private: Gtid_set* previous_gtid_set; /* Hybrid logical clock for this instance. 1. The logical clock is tracked per instance today. It should be relatively easy to convert this to per-shard by having a map of such clocks */ HybridLogicalClock hlc; /* This is set when we have registered log entities with raft plugin during ordered commit, after we have become master on step up. Protected by LOCK_log. To prevent repeated re-registrations. */ bool setup_flush_done; int open(const char *opt_name) { return open_binlog(opt_name); } bool change_stage(THD *thd, Stage_manager::StageID stage, THD* queue, mysql_mutex_t *leave, mysql_mutex_t *enter); std::pair<int,my_off_t> flush_thread_caches(THD *thd, bool async); int flush_cache_to_file(my_off_t *flush_end_pos); int finish_commit(THD *thd, bool async); std::pair<bool, bool> sync_binlog_file(bool force, bool async); void process_semisync_stage_queue(THD *queue_head); void process_commit_stage_queue(THD *thd, THD *queue, bool async); void set_commit_consensus_error(THD *queue_head); void handle_commit_consensus_error(THD *thd, bool async); void process_after_commit_stage_queue(THD *thd, THD *first, bool async); int process_flush_stage_queue(my_off_t *total_bytes_var, bool *rotate_var, THD **out_queue_var, bool async); int ordered_commit(THD *thd, bool all, bool skip_commit = false, bool async=false); void handle_binlog_flush_or_sync_error(THD *thd, bool need_lock_log); public: int open_binlog(const char *opt_name); void close(); enum_result commit(THD *thd, bool all, bool async); int rollback(THD *thd, bool all); int prepare(THD *thd, bool all, bool async); int recover(IO_CACHE *log, Format_description_log_event *fdle, my_off_t *valid_pos, const std::string& cur_binlog_file); int recover(IO_CACHE *log, Format_description_log_event *fdle); int recover_raft_log(); int set_valid_pos( my_off_t* valid_pos, const std::string& cur_binlog_file, my_off_t first_gtid_start); std::pair<std::string, uint> extract_file_index(const std::string& file_name); #if !defined(MYSQL_CLIENT) void update_thd_next_event_pos(THD *thd); int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, bool is_transactional); #endif /* !defined(MYSQL_CLIENT) */ void add_bytes_written(ulonglong inc) { bytes_written += inc; } void reset_bytes_written() { bytes_written = 0; } void harvest_bytes_written(Relay_log_info *rli, bool need_log_space_lock); void set_max_size(ulong max_size_arg); void signal_update(); void update_binlog_end_pos(bool need_lock= true); int wait_for_update_relay_log(THD* thd, const struct timespec * timeout); int wait_for_update_bin_log(THD* thd, const struct timespec * timeout); public: /** register binlog/relay (its IO_CACHE) and mutexes to plugin. Sharing the pointers with the plugin enables the plugin to flush transactions to the appropriate file when the Raft engine calls back the log Wrapper. @param context - 0 for initial time, 1 for each time When we pass in 1 for re-registration, we also validate on the plugin side that the cached pointers have not shifted. @param is_relay_log register the relay log if true, otherwise binlog Different observers are used for different logs */ int register_log_entities(THD *thd, int context, bool need_lock, bool is_relay_log); void check_and_register_log_entities(THD *thd); void init_pthread_objects(); void cleanup(); /** Create a new binary log. @param log_name Name of binlog @param new_name Name of binlog, too. todo: what's the difference between new_name and log_name? @param io_cache_type_arg Specifies how the IO cache is opened: read-only or read-write. @param max_size The size at which this binlog will be rotated. @param null_created If false, and a Format_description_log_event is written, then the Format_description_log_event will have the timestamp 0. Otherwise, it the timestamp will be the time when the event was written to the log. @param need_lock_index If true, LOCK_index is acquired; otherwise LOCK_index must be taken by the caller. @param need_sid_lock If true, the read lock on global_sid_lock will be acquired. Otherwise, the caller must hold the read lock on global_sid_lock. */ bool open_binlog(const char *log_name, const char *new_name, enum cache_type io_cache_type_arg, ulong max_size, bool null_created, bool need_lock_index, bool need_sid_lock, Format_description_log_event *extra_description_event, RaftRotateInfo *raft_rotate_info= nullptr, bool need_end_log_pos_lock= true); /** Open an existing binlog/relaylog file @param log_name Name of binlog @param io_cache_type_arg Specifies how the IO cache is opened: read-only or read-write. @param max_size The size at which this binlog will be rotated. */ bool open_existing_binlog(const char *log_name, enum cache_type io_cache_type_arg, ulong max_size_arg, bool need_end_log_pos_lock= true); bool open_index_file(const char *index_file_name_arg, const char *log_name, bool need_lock_index); /* * Opens the index file for the transaction log. If a binlog apply index file * is found, then it opens the apply index file. Otherwise it opens the binlog * index file * * @return 0 on success, 1 on error */ int init_index_file(); /* Use this to start writing a new log file. * @param raft_rotate_info - Used by raft to optionally control how file rotation happens. Caters to relay log rotates, no-op rotates and config change rotates. */ int new_file(Format_description_log_event *extra_description_event, RaftRotateInfo *raft_rotate_info= nullptr); bool write_event(Log_event* event_info, int force_cache_type = Log_event::EVENT_INVALID_CACHE, bool write_meta_data_event= false); bool write_cache(THD *thd, class binlog_cache_data *binlog_cache_data, bool async); int do_write_cache(IO_CACHE *cache); /** * Called after a THD's iocache is written to binlog (i.e binlog's cache) * during ordered commit. Updates all internal state maintained by mysql * including gtid_executed, binlog_bytes_written and any flush stage error * handling * * @param thd Thread variable * @param cache The cache which was written to binlog * @param error will be 1 on errors which needs to be handled in post_write * * @returns true on error, false on success */ bool post_write(THD *thd, binlog_cache_data *cache_data, int error); /** * Handles error that occured when flushing the cache to binlog file * * @param thd Thread variable * @param cache The cache which resulted in an error when writing to binlog */ void handle_write_error(THD *thd, binlog_cache_data *cache_data); void set_write_error(THD *thd, bool is_transactional); bool check_write_error(THD *thd); bool write_incident(THD *thd, bool need_lock_log, bool do_flush_and_sync= true); bool write_incident(Incident_log_event *ev, bool need_lock_log, bool do_flush_and_sync= true); void start_union_events(THD *thd, query_id_t query_id_param); void stop_union_events(THD *thd); bool is_query_in_union(THD *thd, query_id_t query_id_param); #ifdef HAVE_REPLICATION bool append_buffer(const char* buf, uint len, Master_info *mi); bool append_event(Log_event* ev, Master_info *mi); private: bool after_append_to_relay_log(Master_info *mi); #endif // ifdef HAVE_REPLICATION public: void make_log_name(char* buf, const char* log_ident); bool is_active(const char* log_file_name); int remove_logs_from_index(LOG_INFO* linfo, bool need_update_threads); int remove_deleted_logs_from_index(bool need_lock_index, bool need_update_threads); int rotate(bool force_rotate, bool* check_purge); void purge(); int rotate_and_purge(THD* thd, bool force_rotate); /** * Take the config change payload and create a before_flush call into the * plugin after taking LOCK_log. * We do a rotation immediately after config change, because it enables * us to keep the invariant that we don't have free floating metadata * events due to Raft, except the rotate event at the end of a file. * @param thd - MySQL THD doing the config change rotation * @config_change - a description of the config change understood by Raft * (move semantics) */ int config_change_rotate(THD* thd, std::string config_change); /* * Reads the current index file and returns a list of all file names found in * the binlog file * * @param need_lock - Should LOCK_index be taken? * * @return A pair containing a list of string and an error status. Error * status is 1 if it fails to read from the index file. Else error status is 0 * and the list contains all the filenames present in the index file */ std::pair<std::vector<std::string>, int> get_lognames_from_index(bool need_lock); /** Flush binlog cache and synchronize to disk. This function flushes events in binlog cache to binary log file, it will do synchronizing according to the setting of system variable 'sync_binlog'. If file is synchronized, @c synced will be set to 1, otherwise 0. @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0 @param[in] force if TRUE, ignores the 'sync_binlog' and synchronizes the file. @retval 0 Success @retval other Failure */ bool flush_and_sync(bool async, const bool force); void purge_apply_logs(); int purge_logs(const char *to_log, bool included, bool need_lock_index, bool need_update_threads, std::atomic_ullong *decrease_log_space, bool auto_purge, const char* max_log= nullptr); int purge_logs_before_date( time_t purge_time, bool auto_purge, bool stop_purge= 0, bool need_lock_index= true, const char* max_log= nullptr); int purge_first_log(Relay_log_info* rli, bool included); int set_crash_safe_index_file_name(const char *base_file_name); int open_crash_safe_index_file(); int close_crash_safe_index_file(); int add_log_to_index(uchar* log_file_name, int name_len, bool need_lock_index, bool need_sid_lock); int move_crash_safe_index_file_to_index_file(bool need_lock_index); int set_purge_index_file_name(const char *base_file_name); int open_purge_index_file(bool destroy); bool is_inited_purge_index_file(); int close_purge_index_file(); int clean_purge_index_file(); int sync_purge_index_file(); int register_purge_index_entry(const char* entry); int register_create_index_entry(const char* entry); int purge_index_entry(THD *thd, std::atomic_ullong *decrease_log_space, bool need_lock_index); int purge_logs_in_list(std::list<std::string>& delete_list, THD *thd, std::atomic_ullong *decrease_log_space, bool need_lock_index); bool reset_logs(THD* thd); void close(uint exiting); // iterating through the log index file int find_log_pos(LOG_INFO* linfo, const char* log_name, bool need_lock_index); int find_next_log(LOG_INFO* linfo, bool need_lock_index); /** * Get the total number of log file entries in the index file * * @param need_lock_index - should we aquire LOCK_index * @param num_log_files (out) - number of log file entries in the index file * * @return 0 on success, non-zero on failure * */ int get_total_log_files(bool need_lock_index, uint64_t* num_log_files); int get_current_log(LOG_INFO* linfo, bool need_lock_log= true); /* This is called to find out the most recent binlog file coordinates without LOCK_log protection but with LOCK_binlog_end_pos protection. get_current_log() is called to find out the most recent binlog file coordinates with LOCK_log protection. raw_get_current_log() is a helper function to get_current_log(). */ void get_current_log_without_lock_log(LOG_INFO* linfo); int raw_get_current_log(LOG_INFO* linfo); uint next_file_id(); void lock_commits(snapshot_info_st *ss_info); void unlock_commits(const snapshot_info_st *ss_info); inline char* get_index_fname() { return index_file_name;} inline char* get_log_fname() { return log_file_name; } inline char* get_name() { return name; } inline mysql_mutex_t* get_log_lock() { return &LOCK_log; } inline mysql_cond_t* get_log_cond() { return &update_cond; } inline IO_CACHE* get_log_file() { return &log_file; } inline void lock_index() { mysql_mutex_lock(&LOCK_index);} inline void unlock_index() { mysql_mutex_unlock(&LOCK_index);} inline IO_CACHE *get_index_file() { return &index_file;} inline const Gtid_set_map *get_previous_gtid_set_map() const { return &previous_gtid_set_map; } inline uint32 get_open_count() { return open_count; } /* It is called by the threads(e.g. dump thread) which want to read hot log without LOCK_log protection. */ my_off_t get_binlog_end_pos() { mysql_mutex_assert_not_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_binlog_end_pos); return binlog_end_pos; } my_off_t get_binlog_end_pos_without_lock() { mysql_mutex_assert_not_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos); return binlog_end_pos; } mysql_mutex_t* get_binlog_end_pos_lock() { return &LOCK_binlog_end_pos; } void lock_binlog_end_pos() { mysql_mutex_lock(&LOCK_binlog_end_pos); } void unlock_binlog_end_pos() { mysql_mutex_unlock(&LOCK_binlog_end_pos); } inline void update_binlog_group_commit_step() { mysql_mutex_lock(&LOCK_log); counter_histogram_init(&histogram_binlog_group_commit, opt_histogram_step_size_binlog_group_commit); mysql_mutex_unlock(&LOCK_log); } void update_binlog_group_and_engine_commit_step_size() { mysql_mutex_lock(&LOCK_log); if (opt_histogram_binlog_commit_time_step_size) { latency_histogram_init(&histogram_binlog_engine_commit_trx, opt_histogram_binlog_commit_time_step_size); latency_histogram_init(&histogram_binlog_group_commit_trx, opt_histogram_binlog_commit_time_step_size); } mysql_mutex_unlock(&LOCK_log); } static const int MAX_RETRIES_FOR_DELETE_RENAME_FAILURE = 5; }; typedef struct st_load_file_info { THD* thd; my_off_t last_pos_in_file; bool wrote_create_file, log_delayed; } LOAD_FILE_INFO; typedef struct st_xid_to_gtid { my_xid x; Gtid gtid; st_xid_to_gtid() { x= 0; gtid.clear(); } } XID_TO_GTID; extern my_bool opt_process_can_disable_bin_log; extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; /** * Encapsulation over binlog or relay log for dumping raft logs during * COM_BINLOG_DUMP and COM_BINLOG_DUMP_GTID (see @mysql_binlog_send) */ class Dump_log { public: // RAII class to handle locking for Dump_log class Locker { public: Locker(Dump_log* dump_log) { dump_log_= dump_log; should_lock_= dump_log_->lock(); } ~Locker() { if (should_lock_) dump_log_->unlock(should_lock_); } private: bool should_lock_= false; Dump_log* dump_log_= nullptr; }; Dump_log(); void switch_log(bool relay_log, bool should_lock= true); MYSQL_BIN_LOG* get_log(bool should_lock= true) { bool is_locked= false; if (should_lock) is_locked= lock(); auto ret= log_; if (should_lock) unlock(is_locked); return ret; } bool is_relay_log() { Locker lock(this); return !(log_ == &mysql_bin_log); } bool is_open() { Locker lock(this); return log_->is_open(); } bool is_active(const char* log_file_name) { Locker lock(this); return log_->is_active(log_file_name); } my_off_t get_binlog_end_pos_without_lock() { Locker lock(this); return log_->get_binlog_end_pos_without_lock(); } void make_log_name(char* buf, const char* log_ident) { Locker lock(this); log_->make_log_name(buf, log_ident); } bool find_first_log_not_in_gtid_set(char *binlog_file_name, const Gtid_set *gtid_set, Gtid *first_gtid, const char **errmsg) { Locker lock(this); return log_->find_first_log_not_in_gtid_set(binlog_file_name, gtid_set, first_gtid, errmsg); } int find_log_pos(LOG_INFO* linfo, const char* log_name, bool need_lock_index) { Locker lock(this); return log_->find_log_pos(linfo, log_name, need_lock_index); } int find_next_log(LOG_INFO* linfo, bool need_lock_index) { Locker lock(this); return log_->find_next_log(linfo, need_lock_index); } void get_lost_gtids(Gtid_set *gtids) { Locker lock(this); log_->get_lost_gtids(gtids); } // Avoid using this and try to use Dump_log::Locker class instead bool lock() { // NOTE: we lock only when we're in raft mode. That's why we're returning a // bool to indicate wheather we locked or not. We pass this bool to unlock // method to unlock only then the mutex was actually locked. const bool should_lock= enable_raft_plugin; if (should_lock) log_mutex_.lock(); return should_lock; } // Avoid using this and try to use Dump_log::Locker class instead void unlock(bool is_locked) { if (is_locked) log_mutex_.unlock(); } private: MYSQL_BIN_LOG *log_; std::mutex log_mutex_; }; extern MYSQL_PLUGIN_IMPORT Dump_log dump_log; bool is_binlog_cache_empty(const THD* thd); bool trans_has_updated_trans_table(const THD* thd); bool stmt_has_updated_trans_table(Ha_trx_info* ha_list); bool ending_trans(THD* thd, const bool all); bool ending_single_stmt_trans(THD* thd, const bool all); bool trans_cannot_safely_rollback(const THD* thd); bool stmt_cannot_safely_rollback(const THD* thd); int log_loaded_block(IO_CACHE* file); /** Open a single binary log file for reading. */ File open_binlog_file(IO_CACHE *log, const char *log_file_name, const char **errmsg); int check_binlog_magic(IO_CACHE* log, const char** errmsg); bool purge_master_logs(THD* thd, const char* to_log); bool purge_master_logs_before_date(THD* thd, time_t purge_time); bool purge_raft_logs(THD* thd, const char* to_log); bool purge_raft_logs_before_date(THD* thd, time_t purge_time); bool update_relay_log_cordinates(Relay_log_info* rli); bool show_raft_logs(THD* thd, bool with_gtid = false); bool show_raft_status(THD* thd); bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log); bool show_binlog_cache(THD *thd, MYSQL_BIN_LOG *binary_log); bool mysql_show_binlog_events(THD* thd); bool mysql_show_binlog_cache(THD* thd); bool show_gtid_executed(THD *thd); void check_binlog_cache_size(THD *thd); void check_binlog_stmt_cache_size(THD *thd); void update_binlog_hlc(); bool binlog_enabled(); void register_binlog_handler(THD *thd, bool trx); int gtid_empty_group_log_and_cleanup(THD *thd); extern const char *log_bin_index; extern const char *log_bin_basename; extern bool opt_binlog_order_commits; extern bool opt_gtid_precommit; /** Turns a relative log binary log path into a full path, based on the opt_bin_logname or opt_relay_logname. @param from The log name we want to make into an absolute path. @param to The buffer where to put the results of the normalization. @param is_relay_log Switch that makes is used inside to choose which option (opt_bin_logname or opt_relay_logname) to use when calculating the base path. @returns true if a problem occurs, false otherwise. */ inline bool normalize_binlog_name(char *to, const char *from, bool is_relay_log) { DBUG_ENTER("normalize_binlog_name"); bool error= false; char buff[FN_REFLEN]; char *ptr= (char*) from; char *opt_name= is_relay_log ? opt_relay_logname : opt_bin_logname; DBUG_ASSERT(from); /* opt_name is not null and not empty and from is a relative path */ if (opt_name && opt_name[0] && from && !test_if_hard_path(from)) { // take the path from opt_name // take the filename from from char log_dirpart[FN_REFLEN], log_dirname[FN_REFLEN]; size_t log_dirpart_len, log_dirname_len; dirname_part(log_dirpart, opt_name, &log_dirpart_len); dirname_part(log_dirname, from, &log_dirname_len); /* log may be empty => relay-log or log-bin did not hold paths, just filename pattern */ if (log_dirpart_len > 0) { /* create the new path name */ if(fn_format(buff, from+log_dirname_len, log_dirpart, "", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH)) == NULL) { error= true; goto end; } ptr= buff; } } DBUG_ASSERT(ptr); if (ptr) strmake(to, ptr, strlen(ptr)); end: DBUG_RETURN(error); } /* Splits the first argument into two parts using the delimiter ' '. The second part is converted into an integer and the space is modified to '\0' in the first argument. @param file_name_and_gtid_set_length binlog file_name and gtid_set length in binary form separated by ' '. @return previous gtid_set length by converting the second string in to an integer. */ uint split_file_name_and_gtid_set_length(char *file_name_and_gtid_set_length); #ifndef MYSQL_CLIENT struct st_filenum_pos { uint file_num= 0; uint pos= 0; static const uint max_pos = std::numeric_limits<uint>::max(); st_filenum_pos() = default; st_filenum_pos(uint file_num, uint pos) { this->file_num= file_num; this->pos= pos; } int cmp(const st_filenum_pos& other) const { if (file_num == other.file_num && pos == other.pos) return 0; if (file_num == other.file_num) return pos < other.pos ? -1 : 1; return file_num < other.file_num ? -1 : 1; } bool operator==(const st_filenum_pos& other) const { return cmp(other) == 0; } bool operator<(const st_filenum_pos& other) const { return cmp(other) < 0; } bool operator>(const st_filenum_pos& other) const { return cmp(other) > 0; } bool operator<=(const st_filenum_pos& other) const { return cmp(other) <= 0; } bool operator>=(const st_filenum_pos& other) const { return cmp(other) >= 0; } }; extern std::atomic<st_filenum_pos> last_acked; extern mysql_mutex_t LOCK_last_acked; extern mysql_cond_t COND_last_acked; #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_last_acked; extern PSI_cond_key key_COND_last_acked; #endif extern bool semi_sync_last_ack_inited; // defined in plugin/semisync/semisync_master.cc extern char rpl_semi_sync_master_enabled; extern "C" void signal_semi_sync_ack(const std::string &file_num, uint file_pos); bool block_all_dump_threads(); void unblock_all_dump_threads(); void init_semi_sync_last_acked(); void destroy_semi_sync_last_acked(); bool wait_for_semi_sync_ack(const LOG_POS_COORD *const coord, NET *net, ulonglong wait_timeout_nsec); void signal_semi_sync_ack(const LOG_POS_COORD* const); void reset_semi_sync_last_acked(); int trim_logged_gtid(const std::vector<std::string>& trimmed_gtids); int get_committed_gtids(const std::vector<std::string>& gtids, std::vector<std::string> *committed_gtids); int get_executed_gtids(std::string* const gtids); /** * Start a raft configuration change on the binlog with the provided * config change payload * @param thd - MySQL THD doing the config change rotation * @param config_change - Has the committed Config and New Config */ int raft_config_change(THD *thd, std::string config_change); /** * Block/unblock dump threads */ int handle_dump_threads(bool block); /** * Updates slave_list datastructure with raft follower information */ int raft_update_follower_info( const std::unordered_map<std::string, std::string> &follower_info, bool is_leader, bool is_shutdown); int rotate_binlog_file(THD *thd); /* This is used to change the mysql_bin_log global MYSQL_BIN_LOG file to point to the apply binlog/reopen new one. Apply binlogs are binlog files used by FOLLOWERS/SLAVES in Raft. They are only on the State Machine side */ int binlog_change_to_apply(); /* This is used to change the mysql_bin_log global MYSQL_BIN_LOG file to point to latest binlog-330*.# (Raft LOG). This has to be done before a Raft LEADER can become a MySQL Master and start proposing transactions via ORDERED COMMIT */ int binlog_change_to_binlog(); #ifdef HAVE_REPLICATION int rotate_relay_log_for_raft(RaftRotateInfo *raft_rotate_info); #endif #endif #endif /* BINLOG_H_INCLUDED */