sql/rpl_master.cc (2,735 lines of code) (raw):

/* Copyright (c) 2010, 2016, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ #include "sql_priv.h" #include "unireg.h" #include "sql_parse.h" // check_access #include "global_threads.h" #ifdef HAVE_REPLICATION #include "sql_acl.h" // SUPER_ACL #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> #include "rpl_handler.h" #include "rpl_master.h" #include "debug_sync.h" #include <errno.h> #include <sys/socket.h> #include "binlog.h" #include "rpl_slave.h" // stop_handle_slave_stats_daemon, start_handle_slave_stats_daemon #include "slave_stats_daemon.h" #include <queue> #include "sql_show.h" // schema_table_store_record #include "tztime.h" // struct Time_zone #include <boost/algorithm/string.hpp> #ifdef HAVE_JUNCTION #pragma GCC diagnostic ignored "-Wunused-value" #include <junction/ConcurrentMap_Grampa.h> #pragma GCC diagnostic error "-Wunused-value" #endif /* REPLICA_STATISTICS Associates a Slave ID to its replication statistics for last N(=MAX_LAG_DATAPOINT_PER_SLAVE) datapoints. These statistics are sent by slaves to the master at regular intervals. */ ST_FIELD_INFO replica_statistics_fields_info[] = { {"SERVER_ID", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONGLONG, 0, 0, 0, SKIP_OPEN_TABLE}, {"TIMESTAMP", MY_INT32_NUM_DECIMAL_DIGITS, MYSQL_TYPE_DATETIME, 0, 0, 0, SKIP_OPEN_TABLE}, {"MILLI_SEC_BEHIND_MASTER", MY_INT64_NUM_DECIMAL_DIGITS, MYSQL_TYPE_LONGLONG, 0, 0, 0, SKIP_OPEN_TABLE}, {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, SKIP_OPEN_TABLE} }; SLAVE_STATS::st_slave_stats(uchar* packet) { /* 4 bytes for the server id */ /* 4 bytes timestamp */ /* 4 bytes for milli_second_behind_master */ server_id = uint4korr(packet); packet += 4; timestamp = uint4korr(packet); packet += 4; milli_sec_behind_master = uint4korr(packet); } int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; ulong rpl_event_buffer_size; uint rpl_send_buffer_size = 0; /* Cache for compressed events and associated structures */ struct binlog_comp_event { std::shared_ptr<uchar> buff; size_t len; binlog_comp_event() : len(0) { } binlog_comp_event(std::shared_ptr<uchar> buff, size_t len): buff(buff), len(len) { } void destroy() { delete this; } }; #ifdef HAVE_JUNCTION typedef junction::ConcurrentMap_Grampa<ulonglong, binlog_comp_event*> comp_event_cache; #define COMP_EVENT_CACHE_NUM_SHARDS 32 static mysql_mutex_t LOCK_comp_event_cache[COMP_EVENT_CACHE_NUM_SHARDS]; #ifdef HAVE_PSI_INTERFACE static PSI_mutex_key key_LOCK_comp_event_cache[COMP_EVENT_CACHE_NUM_SHARDS]; #endif #else // !HAVE_JUNCTION typedef std::unordered_map<ulonglong, binlog_comp_event*> comp_event_cache; #define COMP_EVENT_CACHE_NUM_SHARDS 32 static mysql_rwlock_t LOCK_comp_event_cache[COMP_EVENT_CACHE_NUM_SHARDS]; #ifdef HAVE_PSI_INTERFACE static PSI_rwlock_key key_LOCK_comp_event_cache[COMP_EVENT_CACHE_NUM_SHARDS]; #endif #endif typedef std::queue<std::pair<ulonglong, std::size_t>> comp_event_queue; static comp_event_cache comp_event_cache_list[COMP_EVENT_CACHE_NUM_SHARDS]; static bool comp_event_cache_inited= false; /* Cache stats, reset every minute */ static std::atomic<ulonglong> cache_hit_count; static std::atomic<ulonglong> cache_miss_count; static std::atomic<time_t> cache_stats_timer; // used to record the order of insertions in the cache for eviction static comp_event_queue comp_event_queue_list[COMP_EVENT_CACHE_NUM_SHARDS]; // size of value part of the compressed event cache in MB static std::atomic<size_t> comp_event_cache_size_list[COMP_EVENT_CACHE_NUM_SHARDS]; std::atomic<bool> block_dump_threads{false}; #ifndef DBUG_OFF static int binlog_dump_count = 0; #endif #define SLAVE_LIST_CHUNK 128 #define SLAVE_ERRMSG_SIZE (FN_REFLEN+64) HASH slave_list; extern TYPELIB binlog_checksum_typelib; #define get_object(p, obj, msg) \ {\ uint len; \ if (p >= p_end) \ { \ my_error(ER_MALFORMED_PACKET, MYF(0)); \ my_free(si); \ return 1; \ } \ len= (uint)*p++; \ if (p + len > p_end || len >= sizeof(obj)) \ {\ errmsg= msg;\ goto err; \ }\ strmake(obj,(char*) p,len); \ p+= len; \ }\ extern "C" uint32 *slave_list_key(SLAVE_INFO* si, size_t *len, my_bool not_used MY_ATTRIBUTE((unused))) { *len = 4; return &si->server_id; } extern "C" void slave_info_free(void *s) { if (s != nullptr && ((SLAVE_INFO *)s)->slave_stats != nullptr) delete ((SLAVE_INFO *)s)->slave_stats; my_free(s); } #ifdef HAVE_PSI_INTERFACE static PSI_mutex_key key_LOCK_slave_list; static PSI_mutex_info all_slave_list_mutexes[]= { { &key_LOCK_slave_list, "LOCK_slave_list", PSI_FLAG_GLOBAL} }; static void init_all_slave_list_mutexes(void) { int count; count= array_elements(all_slave_list_mutexes); mysql_mutex_register("sql", all_slave_list_mutexes, count); } #endif /* HAVE_PSI_INTERFACE */ void init_slave_list() { #ifdef HAVE_PSI_INTERFACE init_all_slave_list_mutexes(); #endif my_hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0, (my_hash_get_key) slave_list_key, (my_hash_free_key) slave_info_free, 0); mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list, MY_MUTEX_INIT_FAST); } void end_slave_list() { /* No protection by a mutex needed as we are only called at shutdown */ if (my_hash_inited(&slave_list)) { my_hash_free(&slave_list); mysql_mutex_destroy(&LOCK_slave_list); } } static void evict_compressed_events(ulonglong shard, ulonglong max_cache_size); void init_compressed_event_cache() { for (int i = 0; i < COMP_EVENT_CACHE_NUM_SHARDS; ++i) { #ifdef HAVE_JUNCTION mysql_mutex_init(key_LOCK_comp_event_cache[i], &LOCK_comp_event_cache[i], MY_MUTEX_INIT_FAST); #else mysql_rwlock_init(key_LOCK_comp_event_cache[i], &LOCK_comp_event_cache[i]); #endif comp_event_cache_size_list[i]= 0; } comp_event_cache_inited= true; cache_hit_count= cache_miss_count= 0; cache_stats_timer= my_time(0); } void clear_compressed_event_cache() { for (int i = 0; i < COMP_EVENT_CACHE_NUM_SHARDS; ++i) { #ifdef HAVE_JUNCTION mysql_mutex_lock(&LOCK_comp_event_cache[i]); #else mysql_rwlock_wrlock(&LOCK_comp_event_cache[i]); #endif evict_compressed_events(i, 0); DBUG_ASSERT(comp_event_queue_list[i].empty()); #ifdef HAVE_JUNCTION mysql_mutex_unlock(&LOCK_comp_event_cache[i]); #else mysql_rwlock_unlock(&LOCK_comp_event_cache[i]); #endif DBUG_ASSERT(comp_event_cache_size_list[i] == 0); } cache_hit_count= cache_miss_count= 0; cache_stats_timer= my_time(0); comp_event_cache_hit_ratio= 0; } void free_compressed_event_cache() { /* No need to protect @comp_event_cache_inited because it's changed only at start and end of the server when the mutex is inited/destroyed */ if (comp_event_cache_inited) { clear_compressed_event_cache(); for (int i = 0; i < COMP_EVENT_CACHE_NUM_SHARDS; ++i) { #ifdef HAVE_JUNCTION junction::DefaultQSBR.flush(); mysql_mutex_destroy(&LOCK_comp_event_cache[i]); #else mysql_rwlock_destroy(&LOCK_comp_event_cache[i]); #endif } comp_event_cache_inited= false; } } /** Populates slave statistics data-point into the slave_lists hash table. These stats are sent by slaves to master at regular intervals. @return 0 ok @return 1 Error. Error message sent to client */ int store_replica_stats(THD *thd, uchar *packet, uint packet_length) { if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0)) return 1; if (sizeof(SLAVE_STATS) > packet_length) { my_error(ER_MALFORMED_PACKET, MYF(0)); return 1; } SLAVE_STATS stats(packet); int server_id = stats.server_id; mysql_mutex_lock(&LOCK_slave_list); SLAVE_INFO *si = (SLAVE_INFO *)my_hash_search(&slave_list, (uchar *)&server_id, 4); if (si != nullptr) { if (si->slave_stats == nullptr) { si->slave_stats = new std::list<SLAVE_STATS>(); } // We are over the configured size. Erase older entries first. while (!si->slave_stats->empty() && si->slave_stats->size() >= write_stats_count) { si->slave_stats->pop_back(); } if (write_stats_count > 0) si->slave_stats->push_front(std::move(stats)); } mysql_mutex_unlock(&LOCK_slave_list); return 0; } /** Scans through the replication lag reported by individual secondaries and returns replication lag for the entire topology. Replication lag for the topology is defined as kth largest replication lag reported by individual secondaries where k = write_throttle_lag_min_secondaries. This method is used for auto throttling write workload to avoid replication lag @retval replication_lag */ int get_current_replication_lag() { if (write_throttle_lag_pct_min_secondaries == 0) return 0; // find the lag std::vector<int> replica_lags; mysql_mutex_lock(&LOCK_slave_list); for (uint slaveIter = 0; slaveIter < slave_list.records; ++slaveIter) { SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, slaveIter); if (si->slave_stats != nullptr && !si->slave_stats->empty()) { // collect the most recent(front) lag by this secondary replica_lags.push_back(si->slave_stats->front().milli_sec_behind_master); } } mysql_mutex_unlock(&LOCK_slave_list); DBUG_EXECUTE_IF("dbug.simulate_lag_above_start_throttle_threshold", {return write_start_throttle_lag_milliseconds + 1;}); DBUG_EXECUTE_IF("dbug.simulate_lag_below_end_throttle_threshold", {return write_stop_throttle_lag_milliseconds - 1;}); DBUG_EXECUTE_IF("dbug.simulate_lag_between_start_end_throttle_threshold", {return (write_start_throttle_lag_milliseconds + write_stop_throttle_lag_milliseconds)/2;}); int min_secondaries_to_lag = ceil(replica_lags.size() * (double)write_throttle_lag_pct_min_secondaries / 100); if (min_secondaries_to_lag == 0) { // not enough secondaries(registered so far) in replication topology to qualify for lag return 0; } // return the kth largest lag value where k = min_secondaries_to_lag std::sort(replica_lags.begin(), replica_lags.end(), std::greater<int>()); return replica_lags[min_secondaries_to_lag - 1]; } /** Fills the INFORMATION_SCHEMA.REPLICA_STATISTICS table @return 0 ok @return 1 Error */ int fill_replica_statistics(THD *thd, TABLE_LIST *tables, Item *cond) { DBUG_ENTER("fill_replica_statistics"); TABLE* table= tables->table; mysql_mutex_lock(&LOCK_slave_list); for (uint slaveIter = 0; slaveIter < slave_list.records; ++slaveIter) { SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, slaveIter); if (si->slave_stats == nullptr) { // No stats collected for this slave so far, continue continue; } for (const SLAVE_STATS &stats : *(si->slave_stats)) { int fieldPos= 0; MYSQL_TIME time; // slave id table->field[fieldPos++]->store(si->server_id, TRUE); // timestamp if (stats.timestamp == 0) { table->field[fieldPos++]->set_null(); } else { thd->variables.time_zone->gmt_sec_to_TIME( &time, (my_time_t)stats.timestamp); table->field[fieldPos]->set_notnull(); table->field[fieldPos++]->store_time(&time); } // milli_second_behind_master table->field[fieldPos++]->store(stats.milli_sec_behind_master, TRUE); schema_table_store_record(thd, table); } } mysql_mutex_unlock(&LOCK_slave_list); DBUG_RETURN(0); } /** Register slave in 'slave_list' hash table. @return 0 ok @return 1 Error. Error message sent to client */ int register_slave(THD* thd, uchar* packet, uint packet_length) { int res; SLAVE_INFO *si; uchar *p= packet, *p_end= packet + packet_length; const char *errmsg= "Wrong parameters to function register_slave"; if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0)) return 1; if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) goto err2; /* 4 bytes for the server id */ if (p + 4 > p_end) { my_error(ER_MALFORMED_PACKET, MYF(0)); my_free(si); return 1; } thd->server_id= si->server_id= uint4korr(p); p+= 4; get_object(p,si->host, "Failed to register slave: too long 'report-host'"); get_object(p,si->user, "Failed to register slave: too long 'report-user'"); get_object(p,si->password, "Failed to register slave; too long 'report-password'"); if (p+10 > p_end) goto err; si->port= uint2korr(p); p += 2; /* We need to by pass the bytes used in the fake rpl_recovery_rank variable. It was removed in patch for BUG#13963. But this would make a server with that patch unable to connect to an old master. See: BUG#49259 */ p += 4; if (!(si->master_id= uint4korr(p))) si->master_id= server_id; si->slave_stats= nullptr; si->thd= thd; si->is_raft= false; mysql_mutex_lock(&LOCK_slave_list); unregister_slave(thd, false, false/*need_lock_slave_list=false*/); res= my_hash_insert(&slave_list, (uchar*) si); mysql_mutex_unlock(&LOCK_slave_list); return res; err: my_free(si); my_message(ER_UNKNOWN_ERROR, errmsg, MYF(0)); /* purecov: inspected */ err2: return 1; } void unregister_slave(THD* thd, bool only_mine, bool need_lock_slave_list) { if (thd->server_id) { if (need_lock_slave_list) mysql_mutex_lock(&LOCK_slave_list); else mysql_mutex_assert_owner(&LOCK_slave_list); SLAVE_INFO* old_si; if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list, (uchar*)&thd->server_id, 4)) && (!only_mine || old_si->thd == thd)) my_hash_delete(&slave_list, (uchar*)old_si); if (need_lock_slave_list) mysql_mutex_unlock(&LOCK_slave_list); } } /** * Register raft followers in the slave_list data-structure * * @param follower_info This is a map from uuid -> host:port:server_id * @param is_leader Are we the leader? * @param is_shutdown Are we shutting down? */ int register_raft_followers( const std::unordered_map<std::string, std::string> &follower_info, bool is_leader, bool is_shutdown) { int error= 0; mysql_mutex_lock(&LOCK_slave_list); // remove all raft entries, we'll repopulate again std::unordered_set<SLAVE_INFO*> raft_slave_infos; for (ulong i= 0; i < slave_list.records; ++i) { SLAVE_INFO* info= (SLAVE_INFO*) my_hash_element(&slave_list, i); if (info->is_raft) raft_slave_infos.insert(info); } for (const auto& info : raft_slave_infos) { DBUG_ASSERT(info->is_raft); my_hash_delete(&slave_list, (uchar*) info); } for (const std::pair<std::string, std::string> info : follower_info) { SLAVE_INFO *si= nullptr; if (!(si= (SLAVE_INFO *)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) { error= 1; break; } std::vector<std::string> splits; boost::split(splits, info.second, boost::is_any_of(":")); if (splits.size() != 3) { error= 1; my_free(si); break; } const std::string &uuid= info.first; const std::string &host= splits[0]; const std::string &port_str= splits[1]; const std::string &server_id_str= splits[2]; try { si->server_id= std::stoull(server_id_str); si->port= std::stoull(port_str); } catch (...) { error= 1; my_free(si); break; } strcpy(si->host, host.c_str()); strcpy(si->user, ""); strcpy(si->password, ""); si->master_id= server_id; si->slave_stats= nullptr; si->thd= nullptr; si->is_raft= true; strcpy(si->server_uuid, uuid.c_str()); error= my_hash_insert(&slave_list, (uchar *)si); if (error) { my_free(si); break; } } mysql_mutex_unlock(&LOCK_slave_list); // clean up - stop previous run of slave_stats_daemon, if any. stop_handle_slave_stats_daemon(); if (!is_shutdown && !is_leader) { // Spawn slave_stats_daemon thread only for followers and we are not // shutting down raft. start_handle_slave_stats_daemon(); } return error; } bool is_semi_sync_slave(THD *thd) { uchar name[] = "rpl_semi_sync_slave"; my_bool null_value; user_var_entry *entry = (user_var_entry*) my_hash_search(&thd->user_vars, name, sizeof(name) - 1); return entry ? entry->val_int(&null_value) : 0; } /** Execute a SHOW SLAVE HOSTS statement. @param thd Pointer to THD object for the client thread executing the statement. @retval FALSE success @retval TRUE failure */ bool show_slave_hosts(THD* thd, bool with_raft) { List<Item> field_list; Protocol *protocol= thd->protocol; DBUG_ENTER("show_slave_hosts"); field_list.push_back(new Item_return_int("Server_id", 10, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Host", 20)); if (opt_show_slave_auth_info) { field_list.push_back(new Item_empty_string("User",20)); field_list.push_back(new Item_empty_string("Password",20)); } field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG)); field_list.push_back(new Item_return_int("Master_id", 10, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Slave_UUID", UUID_LENGTH)); field_list.push_back(new Item_return_int("Is_semi_sync_slave", 7, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Replication_status", 20)); if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); mysql_mutex_lock(&LOCK_slave_list); for (uint i = 0; i < slave_list.records; ++i) { SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, i); if (si->is_raft && !with_raft) continue; protocol->prepare_for_resend(); protocol->store((uint32) si->server_id); protocol->store(si->host, &my_charset_bin); if (opt_show_slave_auth_info) { protocol->store(si->user, &my_charset_bin); protocol->store(si->password, &my_charset_bin); } protocol->store((uint32) si->port); protocol->store((uint32) si->master_id); /* get slave's UUID */ String slave_uuid; if (si->is_raft) protocol->store(si->server_uuid, &my_charset_bin); else if (si->thd && get_slave_uuid(si->thd, &slave_uuid)) protocol->store(slave_uuid.c_ptr_safe(), &my_charset_bin); if (si->is_raft) protocol->store(false); else protocol->store(is_semi_sync_slave(si->thd)); const char *replication_status = si->is_raft ? "RAFT" : si->thd->query(); if (replication_status) protocol->store(replication_status, &my_charset_bin); else protocol->store("", &my_charset_bin); protocol->update_checksum(); if (protocol->write()) { mysql_mutex_unlock(&LOCK_slave_list); DBUG_RETURN(TRUE); } } mysql_mutex_unlock(&LOCK_slave_list); my_eof(thd); DBUG_RETURN(FALSE); } /** Copy all slave hosts into a std::map for later access without a lock @param thd Pointer to THD object for the client thread executing the statement. @param slaves Pointer to std::map object for receiving all slaves */ void copy_slave_map(THD *thd, std::map<THD *, SLAVE_INFO> *slaves) { mutex_assert_owner_all_shards(SHARDED(&LOCK_thd_remove)); mysql_mutex_lock(&LOCK_slave_list); for (uint i = 0; i < slave_list.records; ++i) { SLAVE_INFO* si = (SLAVE_INFO*) my_hash_element(&slave_list, i); slaves->emplace(si->thd, *si); } mysql_mutex_unlock(&LOCK_slave_list); } /** Internal to mysql_binlog_send() routine that recalculates checksum for a FD event (asserted) that needs additional arranment prior sending to slave. */ inline void fix_checksum(String *packet, ulong ev_offset) { /* recalculate the crc for this event */ uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET); ha_checksum crc= my_checksum(0L, NULL, 0); DBUG_ASSERT(data_len == LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN); crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len - BINLOG_CHECKSUM_LEN); int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc); } static user_var_entry * get_binlog_checksum_uservar(THD * thd) { LEX_STRING name= { C_STRING_WITH_LEN("master_binlog_checksum")}; user_var_entry *entry= (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, name.length); return entry; } /** Function for calling in mysql_binlog_send to check if slave initiated checksum-handshake. @param[in] thd THD to access a user variable @return TRUE if handshake took place, FALSE otherwise */ static bool is_slave_checksum_aware(THD * thd) { DBUG_ENTER("is_slave_checksum_aware"); user_var_entry *entry= get_binlog_checksum_uservar(thd); DBUG_RETURN(entry? true : false); } /** Function for calling in mysql_binlog_send to get the value of @@binlog_checksum of the master at time of checksum-handshake. The value tells the master whether to compute or not, and the slave to verify or not the first artificial Rotate event's checksum. @param[in] thd THD to access a user variable @return value of @@binlog_checksum alg according to @c enum enum_binlog_checksum_alg */ static uint8 get_binlog_checksum_value_at_connect(THD * thd) { uint8 ret; DBUG_ENTER("get_binlog_checksum_value_at_connect"); user_var_entry *entry= get_binlog_checksum_uservar(thd); if (!entry) { ret= BINLOG_CHECKSUM_ALG_UNDEF; } else { DBUG_ASSERT(entry->type() == STRING_RESULT); String str; uint dummy_errors; str.copy(entry->ptr(), entry->length(), &my_charset_bin, &my_charset_bin, &dummy_errors); ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1; DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg } DBUG_RETURN(ret); } static binlog_comp_event compress_event(NET *net, const String *packet) { uchar* event= (uchar*) packet->ptr(); size_t event_len= packet->length(); size_t comp_event_len= packet->length(); static bool error_logged= false; std::shared_ptr<uchar> comp_event((uchar*) my_malloc(event_len + COMP_EVENT_HEADER_SIZE, MYF(MY_WME)), my_free); // case: malloc failed if (unlikely(!comp_event)) return binlog_comp_event(); // copy the original event in the buffer, compression will happen in place memcpy(comp_event.get() + COMP_EVENT_HEADER_SIZE, event, event_len); // compress the event! if (my_compress(net, comp_event.get() + COMP_EVENT_HEADER_SIZE, &comp_event_len, &event_len, net_compression_level)) { if (!error_logged) { sql_print_error("Event compression failed in the dump thread, sending " "uncompressed event. Out of memory. Event compression " "can be disabled by turning off " "slave_compressed_event_protocol. This error will not be " "displayed anymore."); error_logged= true; } // error so no compression happened, see below event_len= 0; } // store the magic number in the first byte to indicate a compressed event // packet *comp_event= COMP_EVENT_MAGIC_NUMBER; // store the length of the uncompressed event, we store 0 if no compression // takes place int3store(comp_event.get() + 1, event_len); comp_event_len+= COMP_EVENT_HEADER_SIZE; return binlog_comp_event(comp_event, comp_event_len); } static void evict_compressed_events(ulonglong shard, ulonglong max_cache_shard_size) { auto& comp_cache= comp_event_cache_list[shard]; auto& comp_queue= comp_event_queue_list[shard]; auto& comp_event_cache_size= comp_event_cache_size_list[shard]; // case: the cache is not full yet short-circuit if (likely(comp_event_cache_size <= max_cache_shard_size)) return; auto threshold= max_cache_shard_size * ((double) opt_compressed_event_cache_evict_threshold / 100); // old event eviction while ((comp_event_cache_size > threshold) && !comp_queue.empty()) { ulonglong key; size_t size; std::tie(key, size)= comp_queue.front(); comp_queue.pop(); #ifdef HAVE_JUNCTION auto value= comp_cache.erase(key); DBUG_ASSERT(value->len == size); DBUG_ASSERT(value != NULL); junction::DefaultQSBR.enqueue(&binlog_comp_event::destroy, value); #else DBUG_ASSERT(comp_cache.count(key)); auto value= comp_cache.at(key); DBUG_ASSERT(value->len == size); DBUG_ASSERT(value != NULL); comp_cache.erase(key); delete value; #endif DBUG_ASSERT(comp_event_cache_size >= size); comp_event_cache_size-= size; } } static void update_comp_event_cache_counters() { // case: one minute is up since the last stats update, re-calculate if (unlikely(difftime(my_time(0), cache_stats_timer) >= 60)) { auto local_hit_count= cache_hit_count.load(); auto local_miss_count= cache_miss_count.load(); if (unlikely(local_hit_count + local_miss_count == 0)) { comp_event_cache_hit_ratio= 0; } else { comp_event_cache_hit_ratio= (double) local_hit_count / (local_hit_count + local_miss_count); } cache_hit_count= cache_miss_count= 0; cache_stats_timer= my_time(0); } } #ifdef HAVE_JUNCTION static binlog_comp_event get_compressed_event(NET *net, const LOG_POS_COORD *coord, const String *packet, bool is_semi_sync_slave, bool cache) { // this shared ptr is used to call update() before we leave this function, it // doesn't maintain any actual pointer std::shared_ptr<uchar> quiescent(NULL, [net] (void *ptr) { DBUG_ASSERT(net->qsbr_context != NULL); junction::DefaultQSBR.update( *static_cast<junction::QSBR::Context*>(net->qsbr_context)); }); if (unlikely(!cache)) return compress_event(net, packet); ulong file_num= strtoul(strrchr(coord->file_name, '.') + 1, NULL, 10); // case: file num can't fit in 20 bits or pos can't fit in 41 bits or comp lib // can't fit in 2 bits, so we cannot create a 64 bit integer key for this // event if (unlikely(file_num >= ((ulonglong) 1 << 21) || coord->pos >= ((ulonglong) 1 << 42) || (ulonglong)net->comp_lib >= ((ulonglong) 1 << 3))) { sql_print_warning("Not caching binlog event %s:%llu because the cache key " "is out of bounds", coord->file_name, coord->pos); return compress_event(net, packet); } // format of the key from MSB to LSB: file_num (20), pos (41), semi sync (1), // comp_lib (2), total 64 bits ulonglong ev_key= net->comp_lib; ev_key |= ((ulonglong) is_semi_sync_slave << 2); ev_key |= ((ulonglong) coord->pos << (2 + 1)); ev_key |= ((ulonglong) file_num << (2 + 1 + 41)); binlog_comp_event comp_event; // get cache, queue and corresponding lock for our shard auto shard= coord->pos % COMP_EVENT_CACHE_NUM_SHARDS; auto& comp_cache= comp_event_cache_list[shard]; auto& comp_queue= comp_event_queue_list[shard]; auto& comp_event_cache_size= comp_event_cache_size_list[shard]; auto lock= &LOCK_comp_event_cache[shard]; // see if the event already exists in the cache if (auto elem= comp_cache.get(ev_key)) { ++cache_hit_count; auto ret= *elem; update_comp_event_cache_counters(); return ret; } // event not found in the cache, take write lock but check for existance again // before writing because we might not be the first one to get this write lock mysql_mutex_lock(lock); auto mutator= comp_cache.insertOrFind(ev_key); auto elem= mutator.getValue(); // case: event not found once again, now we have a write lock so we can write // to the cache if (elem == NULL) { comp_event= compress_event(net, packet); if (unlikely(!comp_event.buff)) goto err; const ulonglong max_cache_size_bytes= (1 << 20) * opt_max_compressed_event_cache_size; const ulonglong max_cache_shard_size= max_cache_size_bytes / COMP_EVENT_CACHE_NUM_SHARDS; if (likely(comp_event.len < max_cache_shard_size)) { comp_event_cache_size+= comp_event.len; evict_compressed_events(shard, max_cache_shard_size); auto comp_event_ptr= new binlog_comp_event(comp_event.buff, comp_event.len); mutator.assignValue(comp_event_ptr); comp_queue.push(std::make_pair(ev_key, comp_event.len)); } ++cache_miss_count; } else { comp_event= *elem; ++cache_hit_count; } update_comp_event_cache_counters(); err: mysql_mutex_unlock(lock); return comp_event; } #else // if !HAVE_JUNCTION static binlog_comp_event get_compressed_event(NET *net, const LOG_POS_COORD *coord, const String *packet, bool is_semi_sync_slave, bool cache) { if (unlikely(!cache)) return compress_event(net, packet); ulong file_num= strtoul(strrchr(coord->file_name, '.') + 1, NULL, 10); // case: file num can't fit in 20 bits or pos can't fit in 41 bits or comp lib // can't fit in 2 bits, so we cannot create a 64 bit integer key for this // event if (unlikely(file_num >= ((ulonglong) 1 << 21) || coord->pos >= ((ulonglong) 1 << 42) || net->comp_lib >= ((ulonglong) 1 << 3))) { sql_print_warning("Not caching binlog event %s:%llu because the cache key " "is out of bounds", coord->file_name, coord->pos); return compress_event(net, packet); } // format of the key from MSB to LSB: file_num (20), pos (41), semi sync (1), // comp_lib (2), total 64 bits ulonglong ev_key= net->comp_lib; ev_key |= ((ulonglong) is_semi_sync_slave << 2); ev_key |= ((ulonglong) coord->pos << (2 + 1)); ev_key |= ((ulonglong) file_num << (2 + 1 + 41)); binlog_comp_event comp_event; // get cache, queue and corresponding lock for our shard auto shard= coord->pos % COMP_EVENT_CACHE_NUM_SHARDS; auto& comp_cache= comp_event_cache_list[shard]; auto& comp_queue= comp_event_queue_list[shard]; auto& comp_event_cache_size= comp_event_cache_size_list[shard]; auto lock= &LOCK_comp_event_cache[coord->pos % COMP_EVENT_CACHE_NUM_SHARDS]; // see if the event already exists in the cache mysql_rwlock_rdlock(lock); auto elem= comp_cache.find(ev_key); // case: found if (elem != comp_cache.end()) { ++cache_hit_count; auto retval= *elem->second; mysql_rwlock_unlock(lock); update_comp_event_cache_counters(); return retval; } mysql_rwlock_unlock(lock); // event not found in the cache, take write lock but check for existance again // before writing because we might not be the first one to get this write lock mysql_rwlock_wrlock(lock); bool inserted= false; comp_event_cache::iterator kv; // iterator to existing elem or inserted elem std::tie(kv, inserted)= comp_cache.insert({ev_key, new binlog_comp_event()}); // case: event not found once again, now we have a write lock so we can write // to the cache if (inserted) { comp_event= compress_event(net, packet); if (unlikely(!comp_event.buff)) goto err; const ulonglong max_cache_size_bytes= (1 << 20) * opt_max_compressed_event_cache_size; const ulonglong max_cache_shard_size= max_cache_size_bytes / COMP_EVENT_CACHE_NUM_SHARDS; if (likely(comp_event.len < max_cache_shard_size)) { comp_event_cache_size+= comp_event.len; evict_compressed_events(shard, max_cache_shard_size); // this inserts the event into the cache kv->second->buff= comp_event.buff; kv->second->len= comp_event.len; comp_queue.push(std::make_pair(ev_key, comp_event.len)); } ++cache_miss_count; } else { comp_event= *kv->second; ++cache_hit_count; } update_comp_event_cache_counters(); err: mysql_rwlock_unlock(lock); return comp_event; } #endif static int my_net_write_event(NET *net, const LOG_POS_COORD *coord, const String *packet, const char** errmsg, int* errnum, bool is_semi_sync_slave, bool cache, bool wait_for_ack, ulonglong hb_period) { uchar* buff= (uchar*) packet->ptr(); size_t buff_len= packet->length(); binlog_comp_event comp_event; // case: wait for ack from semi-sync acker before sending the event if (unlikely(wait_for_ack && !is_semi_sync_slave && !wait_for_semi_sync_ack(coord, net, hb_period))) { if (errmsg) *errmsg = "Error while waiting for semi-sync ACK"; if (errnum) *errnum= ER_UNKNOWN_ERROR; sql_print_error("Error while waiting for semi-sync ACK on dump thread"); return 1; } DBUG_ASSERT(!(net->compress_event && net->compress)); if (net->compress_event) { comp_event= get_compressed_event(net, coord, packet, is_semi_sync_slave, cache); if (unlikely(!comp_event.buff)) { if (errmsg) *errmsg = "Couldn't compress binlog event, out of memory"; if (errnum) *errnum= ER_OUT_OF_RESOURCES; return 1; } buff= comp_event.buff.get(); buff_len= comp_event.len; } if (unlikely(my_net_write(net, buff, buff_len))) { if (errmsg) *errmsg = "Failed on my_net_write()"; if (errnum) *errnum= ER_UNKNOWN_ERROR; return 1; } return 0; } /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to send to the slave (because the slave may not know it if it just asked for MASTER_LOG_FILE='', MASTER_LOG_POS=4). < 4.0.14, fake_rotate_event() was called only if the requested pos was 4. After this version we always call it, so that a 3.23.58 slave can rely on it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has zeros in the good positions which, by chance, make it possible for the 3.23 slave to detect that this event is unexpected) (this is luck which happens because the master and slave disagree on the size of the header of Log_event). Relying on the event length of the Rotate event instead of these well-placed zeros was not possible as Rotate events have a variable-length part. */ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, ulonglong position, const char** errmsg, bool is_semi_sync_slave, uint8 checksum_alg_arg, ulonglong hb_period) { DBUG_ENTER("fake_rotate_event"); char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100]; /* this Rotate is to be sent with checksum if and only if slave's get_master_version_and_clock time handshake value of master's @@global.binlog_checksum was TRUE */ my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; /* 'when' (the timestamp) is set to 0 so that slave could distinguish between real and fake Rotate events (if necessary) */ memset(header, 0, 4); header[EVENT_TYPE_OFFSET] = ROTATE_EVENT; char* p = log_file_name+dirname_length(log_file_name); uint ident_len = (uint) strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); // TODO: check what problems this may cause and fix them int4store(header + LOG_POS_OFFSET, 0); packet->append(header, sizeof(header)); int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); packet->append(p, ident_len); if (do_checksum) { char b[BINLOG_CHECKSUM_LEN]; ha_checksum crc= my_checksum(0L, NULL, 0); crc= my_checksum(crc, (uchar*)header, sizeof(header)); crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN); crc= my_checksum(crc, (uchar*)p, ident_len); int4store(b, crc); packet->append(b, sizeof(b)); } LOG_POS_COORD coord { log_file_name, 0 }; if (my_net_write_event(net, &coord, packet, errmsg, NULL, is_semi_sync_slave, false, false, hb_period)) { DBUG_RETURN(-1); } DBUG_RETURN(0); } /* Reset a transmit packet buffer for event sending. This function uses a pre-allocated buffer for the transmit packet. This function allocates header bytes for event transmission, and should be called before store the event data to the packet buffer. */ static int reset_transmit_packet(THD *thd, ushort flags, ulong *ev_offset, const char **errmsg, bool observe_transmission, String *packet, char *packet_buffer, ulong packet_buffer_size, bool semi_sync_slave) { int ret= 0; /* reserve and set default header */ /* use the pre-allocated buffer. Realloc in string class don't free external buffer (packet_buffer), it allocs its own buffer first (see String::realloc()) before realloc. */ packet->set(packet_buffer, (uint32) packet_buffer_size, &my_charset_bin); packet->length(0); packet->append("\0", 1); // semi-sync is called only when raft is disabled if (!enable_raft_plugin && observe_transmission && semi_sync_slave && RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) { *errmsg= "Failed to run hook 'reserve_header'"; my_errno= ER_UNKNOWN_ERROR; ret= 1; } *ev_offset= packet->length(); DBUG_PRINT("info", ("rpl_master.cc:reset_transmit_packet returns %d", ret)); return ret; } static int send_file(THD *thd) { NET* net = thd->get_net(); int fd = -1, error = 1; size_t bytes; char fname[FN_REFLEN+1]; const char *errmsg = 0; timeout_t old_timeout; unsigned long packet_len; uchar buf[IO_SIZE]; // It's safe to alloc this DBUG_ENTER("send_file"); /* The client might be slow loading the data, give him wait_timeout to do the job */ old_timeout= net->read_timeout; my_net_set_read_timeout( net, timeout_from_seconds(thd->variables.net_wait_timeout_seconds)); /* We need net_flush here because the client will not know it needs to send us the file name until it has processed the load event entry */ if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error) { errmsg = "while reading file name"; goto err; } // terminate with \0 for fn_format *((char*)net->read_pos + packet_len) = 0; fn_format(fname, (char*) net->read_pos + 1, "", "", 4); // this is needed to make replicate-ignore-db if (!strcmp(fname,"/dev/null")) goto end; if ((fd= mysql_file_open(key_file_send_file, fname, O_RDONLY, MYF(0))) < 0) { errmsg = "on open of file"; goto err; } while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0) { if (my_net_write(net, buf, bytes)) { errmsg = "while writing data to client"; goto err; } } end: if (my_net_write(net, (uchar*) "", 0) || net_flush(net) || (my_net_read(net) == packet_error)) { errmsg = "while negotiating file transfer close"; goto err; } error = 0; err: my_net_set_read_timeout(net, old_timeout); if (fd >= 0) mysql_file_close(fd, MYF(0)); if (errmsg) { sql_print_error("Failed in send_file() %s", errmsg); DBUG_PRINT("error", ("%s", errmsg)); } DBUG_RETURN(error); } int test_for_non_eof_log_read_errors(int error, const char **errmsg) { if (error == LOG_READ_EOF || error == LOG_READ_BINLOG_LAST_VALID_POS) return 0; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; switch (error) { case LOG_READ_BOGUS: *errmsg = "bogus data in log event"; break; case LOG_READ_TOO_LARGE: *errmsg = "log event entry exceeded max_allowed_packet; \ Increase max_allowed_packet on master"; break; case LOG_READ_IO: *errmsg = "I/O error reading log event"; break; case LOG_READ_MEM: *errmsg = "memory allocation failed reading log event"; break; case LOG_READ_TRUNC: *errmsg = "binlog truncated in the middle of event; consider out of disk space on master"; break; case LOG_READ_CHECKSUM_FAILURE: *errmsg = "event read from binlog did not pass crc check"; break; default: *errmsg = "unknown error reading log event on the master"; break; } return error; } /** An auxiliary function for calling in mysql_binlog_send to initialize the heartbeat timeout in waiting for a binlogged event. @param[in] thd THD to access a user variable @return heartbeat period an ulonglong of nanoseconds or zero if heartbeat was not demanded by slave */ static ulonglong get_heartbeat_period(THD * thd) { my_bool null_value; LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")}; user_var_entry *entry= (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, name.length); return entry? entry->val_int(&null_value) : 0; } static ulonglong get_dump_thread_wait_sleep(THD * thd) { my_bool null_value; LEX_STRING name= { C_STRING_WITH_LEN("dump_thread_wait_sleep_usec")}; user_var_entry *entry= (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, name.length); return entry? entry->val_int(&null_value) : 0; } /* Function prepares and sends repliation heartbeat event. @param net net object of THD @param packet buffer to store the heartbeat instance @param event_coordinates binlog file name and position of the last real event master sent from binlog @param send_timestamp flag enables sending the HB event with the current timestamp: time(). @note Among three essential pieces of heartbeat data Log_event::when is computed locally. The error to send is serious and should force terminating the dump thread. */ static int send_heartbeat_event(NET* net, String* packet, const struct event_coordinates *coord, bool is_semi_sync_slave, uint8 checksum_alg_arg, bool send_timestamp, ulonglong hb_period) { DBUG_ENTER("send_heartbeat_event"); char header[LOG_EVENT_HEADER_LEN]; my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; time_t ts= 0; if (send_timestamp) { // NOTE: if @last_master_timestamp is provided we're a slave and we use this // value in the HB event otherwise we use now() ts= mysql_bin_log.last_master_timestamp.load(); if (!ts) ts= time(0); } memcpy(header, &ts, 4); header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT; char* p= coord->file_name + dirname_length(coord->file_name); uint ident_len = strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos packet->append(header, sizeof(header)); packet->append(p, ident_len); // log_file_name if (do_checksum) { char b[BINLOG_CHECKSUM_LEN]; ha_checksum crc= my_checksum(0L, NULL, 0); crc= my_checksum(crc, (uchar*) header, sizeof(header)); crc= my_checksum(crc, (uchar*) p, ident_len); int4store(b, crc); packet->append(b, sizeof(b)); } if (my_net_write_event(net, coord, packet, NULL, NULL, is_semi_sync_slave, false, false, hb_period) || net_flush(net)) { DBUG_RETURN(-1); } DBUG_EXECUTE_IF("delay_dump_thread_after_hb", static int counter = 0; if (!counter++) my_sleep(1000 * 1000 * 10); ); DBUG_RETURN(0); } /** Reset and send a heartbeat event to the slave. This function is *only* used to send heartbeat events which carry binary log position of the *last skipped transaction*. Since thd->packet is used to send events to the slave and the packet currently holds an event, packet state is stored first. A heartbeat event is sent to the slave and the state is restored later. Note that, the caller has to send the last skipped coordinates to this function. Note that this function will not send HeartBeat events that carry the current timestamp. This this because this function is ONLY used to send HB update when the previous transaction has been skipped. Note that HB events' timestamps will be used by the slave to calculate the seconds behind master. The only place that makes sense for the event to carry a timestamp is when the master is waiting on the update and keeping sending HB events to keep the connection. @param[in] net This master-slave network handler @param[in] packet packet that is to be sent to the slave. @param[in] last_skip_coord coordinates for last skipped transaction @param[in] ev_offset event offset @param[in] checksum_alg_arg checksum alg used @param[in,out] errmsg generated error message @retval 0 ok @retval -1 error */ static int send_last_skip_group_heartbeat(THD *thd, NET* net, String *packet, const struct event_coordinates *last_skip_coord, ulong *ev_offset, uint8 checksum_alg_arg, const char **errmsg, bool observe_transmission, bool semi_sync_slave, char *packet_buffer, ulong packet_buffer_size, ulonglong hb_period) { DBUG_ENTER("send_last_skip_group_heartbeat"); String save_packet; int save_offset= *ev_offset; /* Save the current read packet */ save_packet.swap(*packet); if (reset_transmit_packet(thd, 0, ev_offset, errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) DBUG_RETURN(-1); /** * Send heart beat event to the slave to update slave threads coordinates * Note that we will not send timestamp in these heart beat events */ if (send_heartbeat_event(net, packet, last_skip_coord, semi_sync_slave, checksum_alg_arg, false, hb_period)) { *errmsg= "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; DBUG_RETURN(-1); } /* Restore the packet and event offset */ packet->swap(save_packet); *ev_offset= save_offset; DBUG_PRINT("info", ("rpl_master.cc:send_last_skip_group_heartbeat returns 0")); DBUG_RETURN(0); } /** If there are less than BYTES bytes left to read in the packet, report error. */ #define CHECK_PACKET_SIZE(BYTES) \ do { \ if (packet_bytes_todo < BYTES) \ goto error_malformed_packet; \ } while (0) /** Auxiliary macro used to define READ_INT and READ_STRING. Check that there are at least BYTES more bytes to read, then read the bytes using the given DECODER, then advance the reading position. */ #define READ(DECODE, BYTES) \ do { \ CHECK_PACKET_SIZE(BYTES); \ DECODE; \ packet_position+= BYTES; \ packet_bytes_todo-= BYTES; \ } while (0) #define SKIP(BYTES) READ((void)(0), BYTES) /** Check that there are at least BYTES more bytes to read, then read the bytes and decode them into the given integer VAR, then advance the reading position. */ #define READ_INT(VAR, BYTES) \ READ(VAR= uint ## BYTES ## korr(packet_position), BYTES) /** Check that there are at least BYTES more bytes to read and that BYTES+1 is not greater than BUFFER_SIZE, then read the bytes into the given variable VAR, then advance the reading position. */ #define READ_STRING(VAR, BYTES, BUFFER_SIZE) \ do { \ if (BUFFER_SIZE <= BYTES) \ goto error_malformed_packet; \ READ(memcpy(VAR, packet_position, BYTES), BYTES); \ VAR[BYTES]= '\0'; \ } while (0) bool com_binlog_dump(THD *thd, char *packet, uint packet_length) { DBUG_ENTER("com_binlog_dump"); ulong pos; ushort flags= 0; const uchar* packet_position= (uchar *) packet; uint packet_bytes_todo= packet_length; NET* net = thd->get_net(); status_var_increment(thd->status_var.com_other); thd->enable_slow_log= opt_log_slow_admin_statements; if (check_global_access(thd, REPL_SLAVE_ACL)) DBUG_RETURN(false); /* 4 bytes is too little, but changing the protocol would break compatibility. This has been fixed in the new protocol. @see com_binlog_dump_gtid(). */ READ_INT(pos, 4); READ_INT(flags, 2); READ_INT(thd->server_id, 4); DBUG_PRINT("info", ("pos=%lu flags=%d server_id=%d", pos, flags, thd->server_id)); kill_zombie_dump_threads(thd); general_log_print(thd, thd->get_command(), "Log: '%s' Pos: %ld", packet + 10, (long) pos); my_atomic_add32(&thread_binlog_client, 1); if (net->compress_event) my_atomic_add32(&thread_binlog_comp_event_client, 1); dec_thread_running(); #ifdef HAVE_JUNCTION if (net->compress_event) { net->qsbr_context= new junction::QSBR::Context( junction::DefaultQSBR.createContext()); } #endif mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, NULL, flags); inc_thread_running(); my_atomic_add32(&thread_binlog_client, -1); if (net->compress_event) my_atomic_add32(&thread_binlog_comp_event_client, -1); #ifdef HAVE_JUNCTION if (net->compress_event) { auto qsbr_context= static_cast<junction::QSBR::Context*>(net->qsbr_context); DBUG_ASSERT(qsbr_context != NULL); junction::DefaultQSBR.destroyContext(*qsbr_context); delete qsbr_context; } #endif unregister_slave(thd, true, true/*need_lock_slave_list=true*/); /* fake COM_QUIT -- if we get here, the thread needs to terminate */ DBUG_RETURN(true); error_malformed_packet: my_error(ER_MALFORMED_PACKET, MYF(0)); DBUG_RETURN(true); } bool com_binlog_dump_gtid(THD *thd, char *packet, uint packet_length) { DBUG_ENTER("com_binlog_dump_gtid"); /* Before going GA, we need to make this protocol extensible without breaking compatitibilty. /Alfranio. */ ushort flags= 0; uint32 data_size= 0; uint64 pos= 0; char name[FN_REFLEN + 1]; uint32 name_size= 0; char* gtid_string= NULL; const uchar* packet_position= (uchar *) packet; uint packet_bytes_todo= packet_length; Sid_map sid_map(NULL/*no sid_lock because this is a completely local object*/); Gtid_set slave_gtid_executed(&sid_map); uint error; NET* net = thd->get_net(); status_var_increment(thd->status_var.com_other); thd->enable_slow_log= opt_log_slow_admin_statements; if (check_global_access(thd, REPL_SLAVE_ACL)) DBUG_RETURN(false); READ_INT(flags, 2); READ_INT(thd->server_id, 4); READ_INT(name_size, 4); READ_STRING(name, name_size, sizeof(name)); READ_INT(pos, 8); DBUG_PRINT("info", ("pos=%llu flags=%d server_id=%d", pos, flags, thd->server_id)); READ_INT(data_size, 4); CHECK_PACKET_SIZE(data_size); if (slave_gtid_executed.add_gtid_encoding(packet_position, data_size) != RETURN_STATUS_OK) DBUG_RETURN(true); gtid_string= slave_gtid_executed.to_string(); DBUG_PRINT("info", ("Slave %d requested to read %s at position %llu gtid set " "'%s'.", thd->server_id, name, pos, gtid_string)); kill_zombie_dump_threads(thd); general_log_print(thd, thd->get_command(), "Log: '%s' Pos: %llu GTIDs: '%s'", name, pos, gtid_string); my_atomic_add32(&thread_binlog_client, 1); if (net->compress_event) my_atomic_add32(&thread_binlog_comp_event_client, 1); dec_thread_running(); #ifdef HAVE_JUNCTION if (net->compress_event) { net->qsbr_context= new junction::QSBR::Context( junction::DefaultQSBR.createContext()); } #endif if ((flags & USING_START_GTID_PROTOCOL)) { if ((error = find_gtid_position_helper(gtid_string, name, pos))) { my_error(error, MYF(0)); my_free(gtid_string); DBUG_RETURN(true); } mysql_binlog_send(thd, name, (my_off_t) pos, NULL, flags); } else mysql_binlog_send(thd, name, (my_off_t) pos, &slave_gtid_executed, flags); inc_thread_running(); my_atomic_add32(&thread_binlog_client, -1); if (net->compress_event) my_atomic_add32(&thread_binlog_comp_event_client, -1); #ifdef HAVE_JUNCTION if (net->compress_event) { auto qsbr_context= static_cast<junction::QSBR::Context*>(net->qsbr_context); DBUG_ASSERT(qsbr_context != NULL); junction::DefaultQSBR.destroyContext(*qsbr_context); delete qsbr_context; } #endif my_free(gtid_string); unregister_slave(thd, true, true/*need_lock_slave_list=true*/); /* fake COM_QUIT -- if we get here, the thread needs to terminate */ DBUG_RETURN(true); error_malformed_packet: my_error(ER_MALFORMED_PACKET, MYF(0)); DBUG_RETURN(true); } /* Show processlist command dump the binlog state. Input: output_info - (OUT) the output proc_info output_len - (IN) output proc_info's length thd - (IN) the thread input_msg - (IN) the input proc_info log_file_name - (IN) binlog file name log_pos - (IN) binlog position skip_slave_update - (IN/OUT) determines whether update is skipped */ static void processlist_slave_offset(char *output_info, int output_len, THD *thd, const char *log_file_name, my_off_t log_pos, int *skip_state_update, bool semi_sync_slave) { int len; DBUG_ENTER("processlist_show_binlog_state"); DBUG_ASSERT(*skip_state_update >= 0); if (skip_state_update) { if (*skip_state_update) { --(*skip_state_update); DBUG_VOID_RETURN; } else { *skip_state_update= 10; } } len= snprintf(output_info, output_len, "%s slave offset: %s %lld", semi_sync_slave ? "Semisync" : "Async", log_file_name + dirname_length(log_file_name), (long long int)log_pos); if (len > 0) thd->set_query(output_info, len); DBUG_VOID_RETURN; } static void repl_cleanup(String *packet, char *packet_buffer) { if (packet_buffer != NULL) { /* Make sure it does not reference packet_buffer */ packet->free(); /* Free the fixed packet buffer. */ my_free(packet_buffer); } } /* We need to count how many disk reads are performed by mysql_binlog_send() and export it to user_stats. When mysql_binlog_send() calls Log_event::read_log_event(), and thus read_log_event() calls my_b_read() to read binlog file, my_b_read() determines whether the data is ready in memory; otherwise, it calls read_function in IO_CACHE to read data from disk. We want to count how many times the read_function is called. So we added counting_read_function() which increases the counter and calls the original read_function and let my_b_read() call counting_read_function instead. counting_read_function() expects an IO_CACHE_EX instance as the input which has the original read_function pointer and THD pointer for updating user_stats. */ static int counting_read_function(IO_CACHE *info, uchar *buffer, size_t count); struct IO_CACHE_EX : public IO_CACHE { int (*original_read_function)(struct st_io_cache *,uchar *,size_t); THD* thd; void extend(THD *thd) { this->thd = thd; /* Must check if extend() has been done. Otherwise we may go into endless recursion! */ if (this->read_function != counting_read_function) { this->original_read_function = this->read_function; this->read_function = counting_read_function; } } }; static int counting_read_function(IO_CACHE *info, uchar *buffer, size_t count) { IO_CACHE_EX *info_ex = static_cast<IO_CACHE_EX*>(info); USER_STATS *us = thd_get_user_stats(info_ex->thd); us->binlog_disk_reads.inc(); return info_ex->original_read_function(info, buffer, count); } static bool get_dscp_value(THD *thd, int& ret_val) { ret_val = 0; auto dscp_it= thd->connection_attrs_map.find("dscp_on_socket"); if (thd->variables.dscp_on_socket == 0 && dscp_it == thd->connection_attrs_map.end()) return false; int dscp_val; if ((dscp_val= thd->variables.dscp_on_socket) != 0) { if (dscp_val >= 64 || dscp_val < 0) { // NO_LINT_DEBUG sql_print_warning("Invalid DSCP_QOS value in session var: %d", dscp_val); return false; } ret_val= dscp_val; return true; } DBUG_ASSERT(dscp_it != thd->connection_attrs_map.end()); const char *dscp_str= dscp_it->second.c_str(); char *tmp= (char*)(dscp_str + MY_MIN(3, dscp_it->second.length())); int res= 0; dscp_val= (int) my_strtoll10(dscp_str, &tmp, &res); if (res != 0 || dscp_val < 0 || dscp_val >= 64) { // NO_LINT_DEBUG sql_print_warning("Invalid DSCP_QOS value in conn attribs: %s", dscp_str); return false; } ret_val= dscp_val; return true; } // set the DSCP parameters on the binlog socket. static bool set_dscp(THD *thd) { const NET* net = thd->get_net(); int dscp_val= 0; bool dscp_set= get_dscp_value(thd, dscp_val); if (!dscp_set) return true; int tos= dscp_val << 2; // figure out what domain is the socket in uint16_t test_family; socklen_t len= sizeof(test_family); int res= mysql_socket_getsockopt(net->vio->mysql_socket, SOL_SOCKET, SO_DOMAIN, (void*)&test_family, &len); // Lets fail, if we can't determine IPV6 vs IPV4 if (res != 0) { // NO_LINT_DEBUG sql_print_warning("Failed to get socket domain " "while adjusting DSCP_QOS (error: %s)", strerror(errno)); return false; } #ifdef HAVE_IPV6 if (test_family == AF_INET6) { res= mysql_socket_setsockopt(net->vio->mysql_socket, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof(tos)); } else #endif if (test_family == AF_INET) { res= mysql_socket_setsockopt(net->vio->mysql_socket, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)); } else { // NO_LINT_DEBUG sql_print_warning("Failed to get socket family %d", test_family); return false; } if (res != 0) { // NO_LINT_DEBUG sql_print_warning("Failed to set TOS/TCLASS " "with (error: %s) DSCP: %d.", strerror(errno), tos); return false; } return true; } void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, const Gtid_set* slave_gtid_executed, int flags) { /** @todo: Clean up loop so that, among other things, we only have one call to send_file(). This is WL#5721. */ #define GOTO_ERR \ do { \ DBUG_PRINT("info", ("mysql_binlog_send fails; goto err from line %d", \ __LINE__)); \ goto err; \ } while (0) LOG_INFO linfo(dump_log.is_relay_log(), /* is_used_by_dump_thd = */ true); char *log_file_name = linfo.log_file_name; char search_file_name[FN_REFLEN], *name; ulong ev_offset; bool using_gtid_protocol= slave_gtid_executed != NULL; bool searching_first_gtid= using_gtid_protocol; bool skip_group= false; bool binlog_has_previous_gtids_log_event= false; bool has_transmit_started= false; bool gtid_event_logged = false; Sid_map *sid_map= slave_gtid_executed ? slave_gtid_executed->get_sid_map() : NULL; USER_STATS *us= thd_get_user_stats(thd); ulonglong cur_timer = my_timer_now(); IO_CACHE_EX log; File file = -1; /* Use a local string here to avoid disturbing the contents of thd->packet. Note that calling thd->packet->free() here will make code elsewhere crash */ String packet_str; String* packet = &packet_str; struct timespec last_event_sent_ts; set_timespec_nsec(last_event_sent_ts, 0); bool time_for_hb_event= false; int error= 0; const char *errmsg = "Unknown error"; char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message() NET* net = thd->get_net(); if (rpl_send_buffer_size && (setsockopt(net->vio->mysql_socket.fd, SOL_SOCKET, SO_SNDBUF, &rpl_send_buffer_size, sizeof(rpl_send_buffer_size)) == -1 #ifdef UNIV_LINUX || setsockopt(net->vio->mysql_socket.fd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &rpl_send_buffer_size, sizeof(rpl_send_buffer_size)) == -1 #endif )) sql_print_warning("Failed to set SO_SNDBUF with (error: %s).", strerror(errno)); (void)set_dscp(thd); mysql_cond_t *log_cond; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; Format_description_log_event fdle(BINLOG_VERSION), *p_fdle= &fdle; Gtid first_gtid; #ifndef DBUG_OFF int left_events = max_binlog_dump_events; #endif int old_max_allowed_packet= thd->variables.max_allowed_packet; /* This buffer should be enough for "slave offset: file_name file_pos". set_query(state_msg, ...) might be called so set_query("", 0) must be called at function end to avoid bogus memory references. */ char state_msg[FN_REFLEN + 100]; int state_msg_len = FN_REFLEN + 100; char* orig_query= thd->query(); uint32 orig_query_length= thd->query_length(); /* The number of times to skip calls to processlist_slave_offset */ int skip_state_update; /* Preallocate fixed buffer for event packets. If an event is more than the size, String class will re-allocate memory and we will reset the packet memory for the next packet creation command. This reduces calls to malloc and free. */ const ulong packet_buffer_size = rpl_event_buffer_size; char *packet_buffer = NULL; const ulong heartbeat_packet_buffer_size = rpl_event_buffer_size; char *heartbeat_packet_buffer = NULL; /* Dump thread sends ER_MASTER_FATAL_ERROR_READING_BINLOG instead of the real errors happend on master to slave when erorr is encountered. So set a temporary Diagnostics_area to thd. The low level error is always set into the temporary Diagnostics_area and be ingored. The original Diagnostics_area will be restored at the end of this function. ER_MASTER_FATAL_ERROR_READING_BINLOG will be set to the original Diagnostics_area. */ Diagnostics_area temp_da; Diagnostics_area *saved_da= thd->get_stmt_da(); thd->set_stmt_da(&temp_da); bool was_killed_by_duplicate_slave_id= false; DBUG_ENTER("mysql_binlog_send"); DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); memset(&log, 0, sizeof(log)); /* heartbeat_period from @master_heartbeat_period user variable */ ulonglong heartbeat_period= get_heartbeat_period(thd); struct timespec heartbeat_buf; struct timespec *heartbeat_ts= NULL; const LOG_POS_COORD start_coord= { log_ident, pos }, *p_start_coord= &start_coord; LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE }, *p_coord= &coord_buf; /* We use the following variables to send a HB event when groups are skipped during a GTID protocol. */ /* Flag to check if last transaction was skipped */ bool last_skip_group= skip_group; /* File name where last skip group is present */ char last_skip_log_name[FN_REFLEN+1]; /* Coordinates of the last skip group */ LOG_POS_COORD last_skip_coord_buf= {last_skip_log_name, BIN_LOG_HEADER_SIZE}, *p_last_skip_coord= &last_skip_coord_buf; bool observe_transmission= false; bool semi_sync_slave = false; if (heartbeat_period != LL(0)) { heartbeat_ts= &heartbeat_buf; set_timespec_nsec(*heartbeat_ts, 0); } ulonglong dump_thread_wait_sleep_usec= get_dump_thread_wait_sleep(thd); if (block_dump_threads) { sql_print_error("Binlog dump threads are blocked!"); GOTO_ERR; } #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) { errmsg = "Master fails in COM_BINLOG_DUMP because of --opt-sporadic-binlog-dump-fail"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } #endif if (!dump_log.is_open()) { errmsg = "Binary log is not open"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } if (!server_id_supplied) { errmsg = "Misconfigured master - server_id was not set"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } packet_buffer = (char*) my_malloc(packet_buffer_size, MYF(MY_WME)); if (packet_buffer == NULL) { errmsg = "Master failed pre-allocate event fixed buffer"; my_errno= ER_OUTOFMEMORY; GOTO_ERR; } heartbeat_packet_buffer = (char*) my_malloc(heartbeat_packet_buffer_size, MYF(MY_WME)); if (heartbeat_packet_buffer == NULL) { errmsg = "Master failed pre-allocate heartbeat event fixed buffer"; my_errno= ER_OUTOFMEMORY; GOTO_ERR; } name= search_file_name; if (log_ident[0]) dump_log.make_log_name(search_file_name, log_ident); else { if (using_gtid_protocol) { /* In normal scenarios, it is not possible that Slave will contain more gtids than Master with resepctive to Master's UUID. But it could be possible case if Master's binary log is truncated(due to raid failure) or Master's binary log is deleted but GTID_PURGED was not set properly. That scenario needs to be validated, i.e., it should *always* be the case that Slave's gtid executed set (+retrieved set) is a subset of Master's gtid executed set with respective to Master's UUID. If it happens, dump thread will be stopped during the handshake with Slave (thus the Slave's I/O thread will be stopped with the error. Otherwise, it can lead to data inconsistency between Master and Slave. */ Sid_map* slave_sid_map= slave_gtid_executed->get_sid_map(); DBUG_ASSERT(slave_sid_map); global_sid_lock->wrlock(); const rpl_sid &server_sid= gtid_state->get_server_sid(); rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid); if (!slave_gtid_executed->is_subset_for_sid(gtid_state->get_logged_gtids(), gtid_state->get_server_sidno(), subset_sidno)) { errmsg= ER(ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER); my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; global_sid_lock->unlock(); GOTO_ERR; } /* Setting GTID_PURGED (when GTID_EXECUTED set is empty i.e., when previous_gtids are also empty) will make binlog rotate. That leaves first binary log with empty previous_gtids and second binary log's previous_gtids with the value of gtid_purged. In find_first_log_not_in_gtid_set() while we search for a binary log whose previous_gtid_set is subset of slave_gtid_executed, in this particular case, server will always find the first binary log with empty previous_gtids which is subset of any given slave_gtid_executed. Thus Master thinks that it found the first binary log which is actually not correct and unable to catch this error situation. Hence adding below extra if condition to check the situation. Slave should know about Master's purged GTIDs. If Slave's GTID executed + retrieved set does not contain Master's complete purged GTID list, that means Slave is requesting(expecting) GTIDs which were purged by Master. We should let Slave know about the situation. i.e., throw error if slave's GTID executed set is not a superset of Master's purged GTID set. The other case, where user deleted binary logs manually (without using 'PURGE BINARY LOGS' command) but gtid_purged is not set by the user, the following if condition cannot catch it. But that is not a problem because in find_first_log_not_in_gtid_set() while checking for subset previous_gtids binary log, the logic will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS is thrown from there. */ bool lost_gtids_is_subset= false; if (!enable_raft_plugin) { const Gtid_set *lost_gtids= gtid_state->get_lost_gtids(); lost_gtids_is_subset= lost_gtids->is_subset(slave_gtid_executed); global_sid_lock->unlock(); } else { global_sid_lock->unlock(); /** In raft mode we calculate lost gtids from the binlog/relaylog index * file instead of using the global state that is always based on the * apply side binlogs. Apply logs are purged on election so global state * is currently incorrect wrt raft logs. * * TODO: Remove this hack after global gtid state is fixed wrt to raft * logs */ Sid_map gtids_lost_sid_map(nullptr); Gtid_set gtids_lost(&gtids_lost_sid_map); dump_log.get_lost_gtids(&gtids_lost); lost_gtids_is_subset= gtids_lost.is_subset(slave_gtid_executed); } if (!lost_gtids_is_subset) { errmsg= ER(ER_MASTER_HAS_PURGED_REQUIRED_GTIDS); my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } first_gtid.clear(); if (dump_log.find_first_log_not_in_gtid_set(name, slave_gtid_executed, &first_gtid, &errmsg)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } } else name= 0; // Find first log } linfo.index_file_offset= 0; if (dump_log.find_log_pos(&linfo, name, 1)) { errmsg = "Could not find first log file name in binary log index file"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } mutex_lock_shard(SHARDED(&LOCK_thread_count), thd); thd->current_linfo = &linfo; mutex_unlock_shard(SHARDED(&LOCK_thread_count), thd); if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } /* open_binlog_file() calls init_io_cache() to initialize the read_function in IO_CACHE. We need to replace the read_function with our counting_read_function() in order to count how many times the read_function is called. */ log.extend(thd); if (pos < BIN_LOG_HEADER_SIZE) { errmsg= "Client requested master to start replication from position < 4"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } if (pos > my_b_filelength(&log)) { errmsg= "Client requested master to start replication from position > file size"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } if (log_warnings > 1) sql_print_information("Start binlog_dump to master_thread_id(%u) slave_server(%u), pos(%s, %lu)", thd->thread_id(), thd->server_id, log_ident, (ulong)pos); semi_sync_slave = is_semi_sync_slave(thd); // call into semi-sync only when raft is disabled if (semi_sync_slave && !enable_raft_plugin && RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos, &observe_transmission))) { errmsg= "Failed to run hook 'transmit_start'"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } has_transmit_started= true; /* reset transmit packet for the fake rotate event below */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) GOTO_ERR; /* Tell the client about the log name with a fake Rotate event; this is needed even if we also send a Format_description_log_event just after, because that event does not contain the binlog's name. Note that as this Rotate event is sent before Format_description_log_event, the slave cannot have any info to understand this event's format, so the header len of Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter than other events except FORMAT_DESCRIPTION_EVENT). Before 4.0.14 we called fake_rotate_event below only if (pos == BIN_LOG_HEADER_SIZE), because if this is false then the slave already knows the binlog's name. Since, we always call fake_rotate_event; if the slave already knew the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is useless but does not harm much. It is nice for 3.23 (>=.58) slaves which test Rotate events to see if the master is 4.0 (then they choose to stop because they can't replicate 4.0); by always calling fake_rotate_event we are sure that 3.23.58 and newer will detect the problem as soon as replication starts (BUG#198). Always calling fake_rotate_event makes sending of normal (=from-binlog) Rotate events a priori unneeded, but it is not so simple: the 2 Rotate events are not equivalent, the normal one is before the Stop event, the fake one is after. If we don't send the normal one, then the Stop event will be interpreted (by existing 4.0 slaves) as "the master stopped", which is wrong. So for safety, given that we want minimum modification of 4.0, we send the normal and fake Rotates. */ if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg, semi_sync_slave, get_binlog_checksum_value_at_connect(current_thd), heartbeat_period)) { /* This error code is not perfect, as fake_rotate_event() does not read anything from the binlog; if it fails it's because of an error in my_net_write(), fortunately it will say so in errmsg. */ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } /* Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become this larger than the corresponding packet (query) sent from client to master. */ thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; p_coord->pos= pos; // the first hb matches the slave's last seen value if (pos > BIN_LOG_HEADER_SIZE) { /* reset transmit packet for the event read from binary log file */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) GOTO_ERR; /* Try to find a Format_description_log_event at the beginning of the binlog */ if (!(error = Log_event::read_log_event(&log, packet, 0, log_file_name))) { DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__)); /* The packet has offsets equal to the normal offsets in a binlog event + ev_offset (the first ev_offset characters are the header (default \0)). */ DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %s", Log_event::get_type_str((Log_event_type)(*packet)[EVENT_TYPE_OFFSET + ev_offset]))); if ((*packet)[EVENT_TYPE_OFFSET + ev_offset] == FORMAT_DESCRIPTION_EVENT) { current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, packet->length() - ev_offset); DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); if (!is_slave_checksum_aware(thd) && current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Slave can not handle replication events with the checksum " "that master is configured to log"; sql_print_warning("Master is configured to log replication events " "with checksum, but will not send such events to " "slaves that cannot process them"); GOTO_ERR; } (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave should not increment master's binlog position (rli->group_master_log_pos) */ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); /* if reconnect master sends FD event with `created' as 0 to avoid destroying temp tables. */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* fix the checksum due to latest changes in header */ if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) fix_checksum(packet, ev_offset); if (my_net_write_event(net, p_coord, packet, &errmsg, &my_errno, semi_sync_slave, false, false, heartbeat_period)) { GOTO_ERR; } /* No need to save this event. We are only doing simple reads (no real parsing of the events) so we don't need it. And so we don't need the artificial Format_description_log_event of 3.23&4.x. */ } } else { if (test_for_non_eof_log_read_errors(error, &errmsg)) GOTO_ERR; /* It's EOF, nothing to do, go on reading next events, the Format_description_log_event will be found naturally if it is written. */ } } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ else { /* The Format_description_log_event event will be found naturally. */ } /* seek to the requested position, to start the requested dump */ my_b_seek(&log, pos); // Seek will done on next read skip_state_update= 0; while (!net->error && net->vio != 0 && !thd->killed) { Log_event_type event_type= UNKNOWN_EVENT; bool goto_next_binlog= false; /* reset the transmit packet for the event read from binary log file */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) GOTO_ERR; bool is_active_binlog= false; while (!thd->killed && !(error= Log_event::read_log_event(&log, packet, current_checksum_alg, log_file_name, &is_active_binlog))) { DBUG_EXECUTE_IF("simulate_dump_thread_kill", { thd->killed= THD::KILL_CONNECTION; }); DBUG_EXECUTE_IF("hold_dump_thread_inside_inner_loop", { const char act[]= "now " "signal signal_inside_inner_loop " "wait_for signal_continue"; DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); DBUG_ASSERT(thd->killed); };); if (us) { ulonglong n_timer = my_timer_now(); double micro_secs = my_timer_to_microseconds(n_timer - cur_timer); us->microseconds_wall.inc(micro_secs); cur_timer = n_timer; } time_t created; DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__)); #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) { net_flush(net); errmsg = "Debugging binlog dump abort"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } #endif /* log's filename does not change while it's active */ p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { if (event_type == XID_EVENT) { net_flush(net); const char act[]= "now " "wait_for signal.continue"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); } }); processlist_slave_offset(state_msg, state_msg_len, thd, log_file_name, my_b_tell(&log), &skip_state_update, semi_sync_slave); switch (event_type) { case FORMAT_DESCRIPTION_EVENT: skip_group= false; current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, packet->length() - ev_offset); DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); if (!is_slave_checksum_aware(thd) && current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Slave can not handle replication events with the checksum " "that master is configured to log"; sql_print_warning("Master is configured to log replication events " "with checksum, but will not send such events to " "slaves that cannot process them"); GOTO_ERR; } (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; created= uint4korr(packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ ST_CREATED_OFFSET+ev_offset); if (using_gtid_protocol && created > 0) { if (first_gtid.sidno >= 1 && first_gtid.gno >= 1 && slave_gtid_executed->contains_gtid(first_gtid.sidno, first_gtid.gno)) { /* As we are skipping at least the first transaction of the binlog, we must clear the "created" field of the FD event (set it to 0) to avoid cleaning up temp tables on slave. */ DBUG_PRINT("info",("setting 'created' to 0 before sending FD event")); int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* Fix the checksum due to latest changes in header */ if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) fix_checksum(packet, ev_offset); first_gtid.clear(); } } /* Fixes the information on the checksum algorithm when a new format description is read. Notice that this only necessary when we need to filter out some transactions which were already processed. */ p_fdle->checksum_alg= current_checksum_alg; break; case ANONYMOUS_GTID_LOG_EVENT: /* do nothing */ break; case GTID_LOG_EVENT: if (using_gtid_protocol) { /* The current implementation checks if the GTID was not processed by the slave. This means that everytime a GTID is read, one needs to check if it was already processed by the slave. If this is the case, the group is not sent. Otherwise, it must be sent. I think we can do better than that. /Alfranio. */ ulonglong checksum_size= ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? BINLOG_CHECKSUM_LEN + ev_offset : ev_offset); /** @todo: use a local sid_map to avoid the lookup in the global one here /Sven */ Gtid_log_event gtid_ev(packet->ptr() + ev_offset, packet->length() - checksum_size, p_fdle); skip_group= slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map), gtid_ev.get_gno()); searching_first_gtid= skip_group; DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) skip group(%d) " "searching gtid(%d).", gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(), skip_group, searching_first_gtid)); gtid_event_logged = true; } break; case STOP_EVENT: case INCIDENT_EVENT: skip_group= searching_first_gtid; break; case PREVIOUS_GTIDS_LOG_EVENT: binlog_has_previous_gtids_log_event= true; /* FALLTHROUGH */ case ROTATE_EVENT: skip_group= false; break; default: if (!binlog_has_previous_gtids_log_event && using_gtid_protocol) /* If we come here, it means we are seeing a 'normal' DML/DDL event (e.g. query_log_event) without having seen any Previous_gtid_log_event. That means we are in an old binlog (no previous_gtids_log_event). When using the GTID protocol, that means we must skip the entire binary log and jump to the next one. */ goto_next_binlog= true; if (!gtid_event_logged && using_gtid_protocol) { /* Skip groups in the binlogs which don't have any gtid event logged before them. When read_only is ON, the server doesn't generate GTID and so no gtid_event is logged before binlog events. But when read_only is OFF, the server starts writing gtid_events in the middle of active binlog. When slave connects with gtid_protocol, master needs to skip binlog events which don't have corresponding gtid_event. */ skip_group = true; } break; } if (event_type == XID_EVENT) { gtid_event_logged = false; } if (goto_next_binlog) // stop reading from this binlog break; DBUG_PRINT("info", ("EVENT_TYPE %d SEARCHING %d SKIP_GROUP %d file %s pos %lld\n", event_type, searching_first_gtid, skip_group, log_file_name, my_b_tell(&log))); pos = my_b_tell(&log); // semi-sync is called only when raft is disabled if (!enable_raft_plugin && observe_transmission && semi_sync_slave && RUN_HOOK(binlog_transmit, before_send_event, (thd, flags, packet, log_file_name, pos))) { my_errno= ER_UNKNOWN_ERROR; errmsg= "run 'before_send_event' hook failed"; GOTO_ERR; } /* The present event was skipped, so store the event coordinates */ if (skip_group) { p_last_skip_coord->pos= p_coord->pos; strcpy(p_last_skip_coord->file_name, p_coord->file_name); /* If we have not send any event from past 'heartbeat_period' time period, then it is time to send a packet before skipping this group. */ DBUG_EXECUTE_IF("inject_2sec_sleep_when_skipping_an_event", { my_sleep(2000000); }); struct timespec cur_clock; set_timespec_nsec(cur_clock, 0); DBUG_ASSERT(cmp_timespec(cur_clock, last_event_sent_ts) >= 0); // Both diff_timespec() and heartbeat_period are in nano seconds. time_for_hb_event= (diff_timespec(cur_clock, last_event_sent_ts) >= heartbeat_period); DBUG_EXECUTE_IF("send_zero_hb_event", { time_for_hb_event= true; }); } if ((!skip_group && last_skip_group && event_type != FORMAT_DESCRIPTION_EVENT) || time_for_hb_event) { /* Dump thread is ready to send it's first transaction after one or more skipped transactions or dump thread did not send any event from past 'heartbeat_period' time frame (busy skipping gtid groups). Send a heart beat event to update slave IO thread coordinates before that happens. Notice that for a new binary log file, FORMAT_DESCRIPTION_EVENT is the first event to be sent to the slave. In this case, it is no need to send a HB event (which might have coordinates of previous binlog file). This heartbeat event sending only happens when the master is in the middle or has just completed skipping all the trxs. Therefore, HB shouldn't carry Timestamp. */ if (send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord, &ev_offset, current_checksum_alg, &errmsg, observe_transmission, semi_sync_slave, heartbeat_packet_buffer, heartbeat_packet_buffer_size, heartbeat_period)) { GOTO_ERR; } set_timespec_nsec(last_event_sent_ts, 0); last_skip_group= time_for_hb_event= false; } else { last_skip_group= skip_group; } if (skip_group == false) { if (my_net_write_event(net, p_coord, packet, &errmsg, &my_errno, semi_sync_slave, event_type != FORMAT_DESCRIPTION_EVENT && event_type != ROTATE_EVENT, (rpl_semi_sync_master_enabled || enable_raft_plugin) && rpl_wait_for_semi_sync_ack, heartbeat_period)) { GOTO_ERR; } set_timespec_nsec(last_event_sent_ts, 0); } DBUG_EXECUTE_IF("dump_thread_wait_after_send_write_rows", { if (event_type == WRITE_ROWS_EVENT) { net_flush(net); const char act[]= "now " "wait_for signal.continue"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); } }); DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { if (event_type == XID_EVENT) { net_flush(net); } }); DBUG_PRINT("info", ("log event code %d", event_type)); if (skip_group == false && event_type == LOAD_EVENT) { if (send_file(thd)) { errmsg = "failed in send_file()"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } } if (!enable_raft_plugin && observe_transmission && semi_sync_slave && RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet, log_file_name, skip_group ? pos : 0))) { errmsg= "Failed to run hook 'after_send_event'"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } // case: this was a skipped group (i.e. the semi-sync slave already has // this transaction); update last acked coordinates if (semi_sync_slave && rpl_wait_for_semi_sync_ack && searching_first_gtid) { signal_semi_sync_ack(p_coord); } /* reset transmit packet for next loop */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) GOTO_ERR; } /* If the above while is killed due to thd->killed flag and not due to read_log_event error, then do nothing.*/ if (thd->killed) goto end; DBUG_EXECUTE_IF("wait_after_binlog_EOF", { const char act[]= "now wait_for signal.rotate_finished no_clear_event"; DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); /* TODO: now that we are logging the offset, check to make sure the recorded offset and the actual match. Guilhem 2003-06: this is not true if this master is a slave <4.0.15 running with --log-slave-updates, because then log_pos may be the offset in the-master-of-this-master's binlog. */ if (test_for_non_eof_log_read_errors(error, &errmsg)) GOTO_ERR; if (!is_active_binlog) goto_next_binlog= true; /* When read_log_event in the above loop returns LOG_READ_BINLOG_LAST_ VALID_POS instead of normal EOF, we cannot open next binlog file which may result in skipping of the events in current file. Instead check for error value and try to read an event inside this if statement. LOG_READ_EOF confirms that we reached the end of current file. */ if (error != LOG_READ_EOF && !goto_next_binlog) { /* Block until there is more data in the log */ if (net_flush(net)) { errmsg = "failed on net_flush()"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } /* We may have missed the update broadcast from the log that has just happened, let's try to catch it if it did. If we did not miss anything, we just wait for other threads to signal us. */ { log.error=0; bool read_packet = 0; #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) { errmsg = "Debugging binlog dump abort"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } #endif /* reset the transmit packet for the event read from binary log file */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) GOTO_ERR; /* No one will update the log while we are reading now, but we'll be quick and just read one record TODO: Add an counter that is incremented for each time we update the binary log. We can avoid the following read if the counter has not been updated since last read. */ switch (error= Log_event::read_log_event(&log, packet, current_checksum_alg, log_file_name)) { case 0: DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__)); /* we read successfully, so we'll need to send it to the slave */ read_packet = 1; p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); DBUG_ASSERT(event_type != FORMAT_DESCRIPTION_EVENT); break; case LOG_READ_EOF: goto_next_binlog = true; break; case LOG_READ_BINLOG_LAST_VALID_POS: { /* Take lock_binlog_pos to ensure that we read everything in binlog file. If there is nothing left to read in binlog file, wait until we get a signal from other threads that binlog is updated. */ // case: sleep before locking and waiting for new data if (unlikely(dump_thread_wait_sleep_usec != 0)) { DBUG_EXECUTE_IF("reached_dump_thread_wait_sleep", { const char act[]= "now " "signal reached " "wait_for continue"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT( !debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); usleep(dump_thread_wait_sleep_usec); } // Note: This part is tricky and should be touched if you really know // what you're doing. We're locking dump log to get the raw log // pointer, then we're locking end log pos before unlocking the dump // log. We're taking the lock in the same sequence as when log is // switched in binlog_change_to_binlog() and binlog_change_to_apply() // to avoid deadlocks. This locking pattern ensures that we're working // with the correct raw log and that there is no race between getting // the raw log and log switching. Log switching will be blocked until // we release the binlog end pos lock before waiting for signal in // wait_for_update_bin_log(). const bool is_dump_log_locked= dump_log.lock(); MYSQL_BIN_LOG* raw_log= dump_log.get_log(false); raw_log->lock_binlog_end_pos(); dump_log.unlock(is_dump_log_locked); /* No need to wait if the the current log is not active or we haven't reached binlog_end_pos. Note that is_active may be false positive, but binlog_end_pos is valid here. If rotate thread is about to rotate the log, we will get a singal_update() in open_binlog() which will eventually unblock us and checking is_active() later in read_log_event() will give the valid value. */ if (!raw_log->is_active(log_file_name) || my_b_tell(&log) < raw_log->get_binlog_end_pos()) { raw_log->unlock_binlog_end_pos(); break; } /* There are two ways to tell the server to not block: - Set the BINLOG_DUMP_NON_BLOCK flag. This is official, documented, not used by any mysql client, but used by some external users. - Set server_id=0. This is unofficial, undocumented, and used by mysqlbinlog -R since the beginning of time. When mysqlbinlog --stop-never is used, it sets a 'fake' server_id that defaults to 1 but can be set to anything else using stop-never-slave-server-id. This has the drawback that if the server_id conflicts with any other running slave, or with any other instance of mysqlbinlog --stop-never, then that other instance will be killed. It is also an unnecessary burden on the user to have to specify a server_id different from all other server_ids just to avoid conflicts. As of MySQL 5.6.20 and 5.7.5, mysqlbinlog redundantly sets the BINLOG_DUMP_NONBLOCK flag when one or both of the following holds: - the --stop-never option is *not* specified In a far future, this means we can remove the unofficial functionality that server_id=0 implies nonblocking behavior. That will allow mysqlbinlog to use server_id=0 always. That has the advantage that mysqlbinlog --stop-never cannot cause any running dump threads to be killed. */ if (thd->server_id == 0 || ((flags & BINLOG_DUMP_NON_BLOCK) != 0)) { DBUG_PRINT("info", ("stopping dump thread because server_id==0 or the BINLOG_DUMP_NON_BLOCK flag is set: server_id=%u flags=%d", thd->server_id, flags)); raw_log->unlock_binlog_end_pos(); goto end; } int ret; ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); #ifndef DBUG_OFF ulong hb_info_counter= 0; #endif PSI_stage_info old_stage; signal_cnt= raw_log->signal_cnt; do { if (heartbeat_period != 0) { DBUG_ASSERT(heartbeat_ts); set_timespec_nsec(*heartbeat_ts, heartbeat_period); } log_cond= raw_log->get_log_cond(); thd->ENTER_COND(log_cond, raw_log->get_binlog_end_pos_lock(), &stage_master_has_sent_all_binlog_to_slave, &old_stage); /* When using GTIDs, if the dump thread has reached the end of the binary log and the last transaction is skipped, send one heartbeat event even when the heartbeat is off. If the heartbeat is on, it is better to send a heartbeat event as the time_out of certain functions (Ex: master_pos_wait() or semi sync ack timeout) might be less than heartbeat period. This can happen when the slave has exact same GTID as the master. In this case, we still send the first HB with 0 timestamp to be safe since the follow up HB with carry the right timestamp */ if (skip_group) { /* TODO: Number of HB events sent from here can be reduced by checking whehter it is time to send a HB event or not. (i.e., using the flag time_for_hb_event) */ if (send_last_skip_group_heartbeat(thd, net, packet, p_coord, &ev_offset, current_checksum_alg, &errmsg, observe_transmission, semi_sync_slave, heartbeat_packet_buffer, heartbeat_packet_buffer_size, heartbeat_period)) { thd->EXIT_COND(&old_stage); GOTO_ERR; } last_skip_group= false; /*A HB for this pos has been sent. */ } ret= raw_log->wait_for_update_bin_log(thd, heartbeat_ts); DBUG_ASSERT(ret == 0 || (heartbeat_period != 0)); if (ret == ETIMEDOUT || ret == ETIME) { #ifndef DBUG_OFF if (hb_info_counter < 3) { sql_print_information("master sends heartbeat message"); hb_info_counter++; if (hb_info_counter == 3) sql_print_information("the rest of heartbeat info skipped ..."); } #endif /* reset transmit packet for the heartbeat event */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) { thd->EXIT_COND(&old_stage); GOTO_ERR; } /** * HB events sent here are essentially when the master is waiting * on updates and send HB to keep the connection connected. * * This is the only place that sending HB with Timestamp makes * sense and won't break monotonic TS calculated SBM */ if (send_heartbeat_event(net, packet, p_coord, semi_sync_slave, current_checksum_alg, true, heartbeat_period)) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; thd->EXIT_COND(&old_stage); GOTO_ERR; } } else { DBUG_PRINT("wait",("binary log received update or a broadcast signal caught")); } } while (signal_cnt == raw_log->signal_cnt && !thd->killed); thd->EXIT_COND(&old_stage); } break; default: test_for_non_eof_log_read_errors(error, &errmsg); GOTO_ERR; } if (read_packet) { processlist_slave_offset(state_msg, state_msg_len, thd, log_file_name, my_b_tell(&log), &skip_state_update, semi_sync_slave); switch (event_type) { case ANONYMOUS_GTID_LOG_EVENT: /* do nothing */ break; case GTID_LOG_EVENT: if (using_gtid_protocol) { ulonglong checksum_size= ((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? BINLOG_CHECKSUM_LEN + ev_offset : ev_offset); Gtid_log_event gtid_ev(packet->ptr() + ev_offset, packet->length() - checksum_size, p_fdle); skip_group= slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map), gtid_ev.get_gno()); searching_first_gtid= skip_group; DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) " "skip group(%d) searching gtid(%d).", gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(), skip_group, searching_first_gtid)); gtid_event_logged = true; } break; case STOP_EVENT: case INCIDENT_EVENT: skip_group= searching_first_gtid; break; case PREVIOUS_GTIDS_LOG_EVENT: binlog_has_previous_gtids_log_event= true; /* FALLTHROUGH */ case ROTATE_EVENT: skip_group= false; break; default: if (!binlog_has_previous_gtids_log_event && using_gtid_protocol) /* If we come here, it means we are seeing a 'normal' DML/DDL event (e.g. query_log_event) without having seen any Previous_gtid_log_event. That means we are in an old binlog (no previous_gtids_log_event). When using the GTID protocol, that means we must skip the entire binary log and jump to the next one. */ goto_next_binlog= true; if (!gtid_event_logged && using_gtid_protocol) { /* Skip groups in the binlogs which don't have any gtid event logged before them. */ skip_group = true; } break; } if (event_type == XID_EVENT) { gtid_event_logged = false; } /* The present event was skipped in a GTID protocol, store the coordinates */ if (skip_group) { p_last_skip_coord->pos= p_coord->pos; strcpy(p_last_skip_coord->file_name, p_coord->file_name); } if (!skip_group && !goto_next_binlog) { /** * If the last group was skipped, send a HB event, * similarly, HB sent here should not carry TS since the last * trx is skipped and we are not sure if we are waiting on update */ if (last_skip_group && send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord, &ev_offset, current_checksum_alg, &errmsg, observe_transmission, semi_sync_slave, heartbeat_packet_buffer, heartbeat_packet_buffer_size, heartbeat_period)) { GOTO_ERR; } THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave); pos = my_b_tell(&log); // semi-sync is called only when raft is disabled if (!enable_raft_plugin && observe_transmission && semi_sync_slave && RUN_HOOK(binlog_transmit, before_send_event, (thd, flags, packet, log_file_name, pos))) { my_errno= ER_UNKNOWN_ERROR; errmsg= "run 'before_send_event' hook failed"; GOTO_ERR; } if (my_net_write_event(net, p_coord, packet, &errmsg, &my_errno, semi_sync_slave, (event_type != FORMAT_DESCRIPTION_EVENT && event_type != ROTATE_EVENT), (rpl_semi_sync_master_enabled || enable_raft_plugin) && rpl_wait_for_semi_sync_ack, heartbeat_period)) { GOTO_ERR; } set_timespec_nsec(last_event_sent_ts, 0); if (event_type == LOAD_EVENT) { if (send_file(thd)) { errmsg = "failed in send_file()"; my_errno= ER_UNKNOWN_ERROR; GOTO_ERR; } } } if(!goto_next_binlog) { // semi-sync is called only when raft is disabled if (!enable_raft_plugin && observe_transmission && semi_sync_slave && RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet, log_file_name, skip_group ? pos : 0))) { my_errno= ER_UNKNOWN_ERROR; errmsg= "Failed to run hook 'after_send_event'"; GOTO_ERR; } // case: this was a skipped group (i.e. the semi-sync slave already // has this transaction); update last acked coordinates if (semi_sync_slave && rpl_wait_for_semi_sync_ack && searching_first_gtid) { signal_semi_sync_ack(p_coord); } } /* Save the skip group for next iteration */ last_skip_group= skip_group; } log.error=0; } } if (goto_next_binlog) { skip_state_update = 0; // clear flag because we open a new binlog binlog_has_previous_gtids_log_event= false; THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog); DBUG_EXECUTE_IF("dump_wait_before_find_next_log", { const char act[]= "now signal signal.reached wait_for signal.done"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act))); };); switch (dump_log.find_next_log(&linfo, 1)) { case 0: break; default: errmsg = "could not find next log"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } end_io_cache(&log); mysql_file_close(file, MYF(MY_WME)); /* reset transmit packet for the possible fake rotate event */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg, observe_transmission, packet, packet_buffer, packet_buffer_size, semi_sync_slave)) GOTO_ERR; /* Call fake_rotate_event() in case the previous log (the one which we have just finished reading) did not contain a Rotate event. There are at least two cases when this can happen: - The previous binary log was the last one before the master was shutdown and restarted. - The previous binary log was GTID-free (did not contain a Previous_gtids_log_event) and the slave is connecting using the GTID protocol. This way we tell the slave about the new log's name and position. If the binlog is 5.0 or later, the next event we are going to read and send is Format_description_log_event. */ if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0 || fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, &errmsg, semi_sync_slave, current_checksum_alg, heartbeat_period)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; GOTO_ERR; } /* hook read_function again, since we got a new bin file from open_binlog_file above: */ log.extend(thd); p_coord->file_name= log_file_name; // reset to the next } } end: /* If the dump thread was killed because of a duplicate slave UUID we will fail throwing an error to the slave so it will not try to reconnect anymore. */ mysql_mutex_lock(&thd->LOCK_thd_data); was_killed_by_duplicate_slave_id= thd->duplicate_slave_id; mysql_mutex_unlock(&thd->LOCK_thd_data); if (was_killed_by_duplicate_slave_id) { errmsg= "A slave with the same server_uuid/server_id as this slave " "has connected to the master"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } thd->set_stmt_da(saved_da); end_io_cache(&log); mysql_file_close(file, MYF(MY_WME)); // semi-sync is called only when raft is disabled if (!enable_raft_plugin && has_transmit_started && semi_sync_slave) (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); mutex_lock_shard(SHARDED(&LOCK_thread_count), thd); thd->current_linfo = 0; mutex_unlock_shard(SHARDED(&LOCK_thread_count), thd); thd->variables.max_allowed_packet= old_max_allowed_packet; /* Undo any calls done by processlist_slave_offset */ thd->set_query(orig_query, orig_query_length); if (heartbeat_packet_buffer != NULL) my_free(heartbeat_packet_buffer); repl_cleanup(packet, packet_buffer); if (us) { ulonglong n_timer = my_timer_now(); us->microseconds_wall.inc(my_timer_to_microseconds(n_timer - cur_timer)); } DBUG_VOID_RETURN; err: THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log)) { /* detailing the fatal error message with coordinates of the last position read. */ my_snprintf(error_text, sizeof(error_text), "%s; the first event '%s' at %lld, " "the last event read from '%s' at %lld, " "the last byte read from '%s' at %lld.", errmsg, p_start_coord->file_name, p_start_coord->pos, p_coord->file_name, p_coord->pos, log_file_name, my_b_tell(&log)); } else { strncpy(error_text, errmsg, sizeof(error_text)); error_text[sizeof(error_text) - 1]= '\0'; } end_io_cache(&log); // semi-sync is called only when raft is disabled if (!enable_raft_plugin && has_transmit_started && semi_sync_slave) (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through thread list and update thd->current_linfo->index_file_offset this mutex will make sure that it never tried to update our linfo after we return from this stack frame */ mutex_lock_shard(SHARDED(&LOCK_thread_count), thd); thd->current_linfo = 0; mutex_unlock_shard(SHARDED(&LOCK_thread_count), thd); if (file >= 0) mysql_file_close(file, MYF(MY_WME)); thd->variables.max_allowed_packet= old_max_allowed_packet; thd->set_stmt_da(saved_da); my_message(my_errno, error_text, MYF(0)); /* Undo any calls done by processlist_slave_offset */ thd->set_query(orig_query, orig_query_length); if (heartbeat_packet_buffer != NULL) my_free(heartbeat_packet_buffer); repl_cleanup(packet, packet_buffer); if (us) { ulonglong n_timer = my_timer_now(); us->microseconds_wall.inc(my_timer_to_microseconds(n_timer - cur_timer)); } DBUG_VOID_RETURN; } /** An auxiliary function extracts slave UUID. @param[in] thd THD to access a user variable @param[out] value String to return UUID value. @return if success value is returned else NULL is returned. NOTE: Please make sure this method is in sync with ReplSemiSyncMaster::get_slave_uuid */ String *get_slave_uuid(THD *thd, String *value) { uchar name[]= "slave_uuid"; if (value == NULL) return NULL; user_var_entry *entry= (user_var_entry*) my_hash_search(&thd->user_vars, name, sizeof(name)-1); if (entry && entry->length() > 0) { value->copy(entry->ptr(), entry->length(), NULL); return value; } else return NULL; } /* Kill all Binlog_dump threads which previously talked to the same slave ("same" means with the same UUID(for slave versions >= 5.6) or same server id (for slave versions < 5.6). Indeed, if the slave stops, if the Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it will keep existing until a query is written to the binlog. If the master is idle, then this could last long, and if the slave reconnects, we could have 2 Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP, the master kills any existing thread with the slave's UUID/server id (if this id is not zero; it will be true for real slaves, but false for mysqlbinlog when it sends COM_BINLOG_DUMP to get a remote binlog dump). SYNOPSIS kill_zombie_dump_threads() @param thd newly connected dump thread object */ void kill_zombie_dump_threads(THD *thd) { String slave_uuid; get_slave_uuid(thd, &slave_uuid); if (slave_uuid.length() == 0 && thd->server_id == 0) return; /* take copy of global_thread_list instead of holding LOCK_thread_count */ std::set<THD*> global_thread_list_copy; mutex_lock_all_shards(SHARDED(&LOCK_thd_remove)); copy_global_thread_list(&global_thread_list_copy); THD *tmp= NULL; bool is_zombie_thread= false; std::set<THD*>::iterator it= global_thread_list_copy.begin(); std::set<THD*>::iterator end= global_thread_list_copy.end(); for (; it != end; ++it) { if ((*it) != thd && ((*it)->get_command() == COM_BINLOG_DUMP || (*it)->get_command() == COM_BINLOG_DUMP_GTID)) { String tmp_uuid; get_slave_uuid((*it), &tmp_uuid); if (slave_uuid.length()) { is_zombie_thread= (tmp_uuid.length() && !strncmp(slave_uuid.c_ptr(), tmp_uuid.c_ptr(), UUID_LENGTH)); } else { /* Check if it is a 5.5 slave's dump thread i.e., server_id should be same && dump thread should not contain 'UUID'. */ is_zombie_thread= (((*it)->server_id == thd->server_id) && !tmp_uuid.length()); } if (is_zombie_thread) { tmp= *it; mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete break; } } } mutex_unlock_all_shards(SHARDED(&LOCK_thd_remove)); if (tmp) { /* Here we do not call kill_one_thread() as it will be slow because it will iterate through the list again. We just to do kill the thread ourselves. */ if (log_warnings > 1) { if (slave_uuid.length()) { sql_print_information("While initializing dump thread for slave with " "UUID <%s>, found a zombie dump thread with the " "same UUID. Master is killing the zombie dump " "thread(%u).", slave_uuid.c_ptr(), tmp->thread_id()); } else { sql_print_information("While initializing dump thread for slave with " "server_id <%u>, found a zombie dump thread with the " "same server_id. Master is killing the zombie dump " "thread(%u).", thd->server_id, tmp->thread_id()); } } tmp->duplicate_slave_id= true; tmp->awake(THD::KILL_QUERY); mysql_mutex_unlock(&tmp->LOCK_thd_data); } } /* Kill all Binlog_dump threads. */ void kill_all_dump_threads() { THD *tmp= NULL; /* take copy of global_thread_list instead of holding LOCK_thread_count */ std::set<THD*> global_thread_list_copy; mutex_lock_all_shards(SHARDED(&LOCK_thd_remove)); copy_global_thread_list(&global_thread_list_copy); std::set<THD*>::iterator it= global_thread_list_copy.begin(); std::set<THD*>::iterator end= global_thread_list_copy.end(); for (; it != end; ++it) { if ((*it)->get_command() == COM_BINLOG_DUMP || (*it)->get_command() == COM_BINLOG_DUMP_GTID) { tmp= *it; mysql_mutex_lock(&tmp->LOCK_thd_data); tmp->awake(THD::KILL_CONNECTION); mysql_mutex_unlock(&tmp->LOCK_thd_data); } } mutex_unlock_all_shards(SHARDED(&LOCK_thd_remove)); } /** Execute a RESET MASTER statement. @param thd Pointer to THD object of the client thread executing the statement. @retval 0 success @retval 1 error */ int reset_master(THD* thd, bool force) { if (enable_raft_plugin) { if (!force && !override_enable_raft_check) { // NO_LINT_DEBUG sql_print_information( "Did not allow reset_master as enable_raft_plugin is ON"); my_error(ER_RAFT_OPERATION_INCOMPATIBLE, MYF(0), "reset master not allowed when enable_raft_plugin is ON"); return 1; } else { // NO_LINT_DEBUG sql_print_information( "Allow reset_master in enable_raft_plugin mode as force is set"); } } if (!mysql_bin_log.is_open()) { my_message(ER_FLUSH_MASTER_BINLOG_CLOSED, ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG)); return 1; } if (mysql_bin_log.reset_logs(thd)) return 1; // semi-sync is called only when raft is disabled if (!enable_raft_plugin) (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); reset_semi_sync_last_acked(); return 0; } /** Output of START TRANSACTION WITH CONSISTENT INNODB SNAPSHOT statement. @param thd Pointer to THD object for the client thread executing the statement. @param ss_info Snapshot context that contains binlog file/pos, executed gtids and snapshot id @param need_ok [out] Whether caller needs to call my_ok vs it having been done in this function via my_eof. @retval FALSE success @retval TRUE failure */ bool show_master_offset(THD* thd, snapshot_info_st &ss_info, bool *need_ok) { Protocol *protocol= thd->protocol; DBUG_ENTER("show_master_offset"); List<Item> field_list; field_list.push_back(new Item_empty_string("File", FN_REFLEN)); field_list.push_back(new Item_return_int("Position",20, MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_empty_string("Gtid_executed", ss_info.gtid_executed.length())); if (ss_info.snapshot_id) { field_list.push_back(new Item_return_int("Snapshot_ID",20, MYSQL_TYPE_LONGLONG)); } if (ss_info.snapshot_hlc != 0) { field_list.push_back( new Item_return_int("Snapshot_HLC",20, MYSQL_TYPE_LONGLONG)); } if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); protocol->prepare_for_resend(); int dir_len = dirname_length(ss_info.binlog_file.c_str()); protocol->store(ss_info.binlog_file.c_str() + dir_len, &my_charset_bin); protocol->store(ss_info.binlog_pos); protocol->store(ss_info.gtid_executed.c_str(), &my_charset_bin); if (ss_info.snapshot_id) { protocol->store(ss_info.snapshot_id); } if (ss_info.snapshot_hlc) { protocol->store(ss_info.snapshot_hlc); } protocol->update_checksum(); if (protocol->write()) DBUG_RETURN(TRUE); my_eof(thd); if (need_ok) *need_ok = false; DBUG_RETURN(FALSE); } /** Execute a SHOW MASTER STATUS statement. @param thd Pointer to THD object for the client thread executing the statement. @retval FALSE success @retval TRUE failure */ bool show_master_status(THD* thd) { Protocol *protocol= thd->protocol; char* gtid_set_buffer= NULL; int gtid_set_size= 0; List<Item> field_list; DBUG_ENTER("show_binlog_info"); global_sid_lock->wrlock(); const Gtid_set* gtid_set= gtid_state->get_logged_gtids(); if ((gtid_set_size= gtid_set->to_string(&gtid_set_buffer)) < 0) { global_sid_lock->unlock(); my_eof(thd); my_free(gtid_set_buffer); DBUG_RETURN(true); } global_sid_lock->unlock(); field_list.push_back(new Item_empty_string("File", FN_REFLEN)); field_list.push_back(new Item_return_int("Position",20, MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_empty_string("Binlog_Do_DB",255)); field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255)); field_list.push_back(new Item_empty_string("Executed_Gtid_Set", gtid_set_size)); if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) { my_free(gtid_set_buffer); DBUG_RETURN(true); } protocol->prepare_for_resend(); if (mysql_bin_log.is_open()) { LOG_INFO li; mysql_mutex_lock(mysql_bin_log.get_binlog_end_pos_lock()); mysql_bin_log.get_current_log_without_lock_log(&li); mysql_mutex_unlock(mysql_bin_log.get_binlog_end_pos_lock()); int dir_len = dirname_length(li.log_file_name); protocol->store(li.log_file_name + dir_len, &my_charset_bin); protocol->store((ulonglong) li.pos); protocol->store(binlog_filter->get_do_db()); protocol->store(binlog_filter->get_ignore_db()); protocol->store(gtid_set_buffer, &my_charset_bin); protocol->update_checksum(); if (protocol->write()) { my_free(gtid_set_buffer); DBUG_RETURN(true); } } my_eof(thd); my_free(gtid_set_buffer); DBUG_RETURN(false); } /** Execute a SHOW BINARY LOGS statement. @param thd Pointer to THD object for the client thread executing the statement. @param with_gtid Whether to include previous_gtid_set (default false) @retval FALSE success @retval TRUE failure */ bool show_binlogs(THD* thd, bool with_gtid) { IO_CACHE *index_file; LOG_INFO cur; File file; char file_name_and_gtid_set_length[FN_REFLEN + 22]; List<Item> field_list; uint length; int cur_dir_len; Protocol *protocol= thd->protocol; DBUG_ENTER("show_binlogs"); if (!mysql_bin_log.is_open()) { my_error(ER_NO_BINARY_LOGGING, MYF(0)); DBUG_RETURN(TRUE); } field_list.push_back(new Item_empty_string("Log_name", 255)); field_list.push_back(new Item_return_int("File_size", 20, MYSQL_TYPE_LONGLONG)); if (with_gtid) field_list.push_back(new Item_empty_string("Prev_gtid_set", 0)); // max_size seems not to matter if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); mysql_mutex_lock(mysql_bin_log.get_binlog_end_pos_lock()); DEBUG_SYNC(thd, "show_binlogs_after_lock_log_before_lock_index"); mysql_bin_log.get_current_log_without_lock_log(&cur); mysql_mutex_unlock(mysql_bin_log.get_binlog_end_pos_lock()); mysql_bin_log.lock_index(); index_file=mysql_bin_log.get_index_file(); cur_dir_len= dirname_length(cur.log_file_name); reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); /* The file ends with EOF or empty line */ while ((length=my_b_gets(index_file, file_name_and_gtid_set_length, FN_REFLEN + 22)) > 1) { int dir_len; ulonglong file_length= 0; // Length if open fails file_name_and_gtid_set_length[length - 1] = 0; uint gtid_set_length = split_file_name_and_gtid_set_length(file_name_and_gtid_set_length); if (gtid_set_length) { my_b_seek(index_file, my_b_tell(index_file) + gtid_set_length + 1); } char *fname = file_name_and_gtid_set_length; length = strlen(fname); protocol->prepare_for_resend(); dir_len= dirname_length(fname); length-= dir_len; protocol->store(fname + dir_len, length, &my_charset_bin); if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length))) file_length= cur.pos; /* The active log, use the active position */ else { /* this is an old log, open it and find the size */ if ((file= mysql_file_open(key_file_binlog, fname, O_RDONLY | O_SHARE | O_BINARY, MYF(0))) >= 0) { file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0)); mysql_file_close(file, MYF(0)); } } protocol->store(file_length); if (with_gtid) { auto previous_gtid_set_map = mysql_bin_log.get_previous_gtid_set_map(); Sid_map sid_map(NULL); Gtid_set gtid_set(&sid_map, NULL); auto gtid_str = previous_gtid_set_map->at(std::string(fname)); if (!gtid_str.empty()) // if GTID enabled { gtid_set.add_gtid_encoding((const uchar*)gtid_str.c_str(), gtid_str.length(), NULL); char *buf; gtid_set.to_string(&buf, &Gtid_set::commented_string_format); protocol->store(buf, strlen(buf), &my_charset_bin); free(buf); } else { protocol->store("", 0, &my_charset_bin); } } protocol->update_checksum(); if (protocol->write()) { DBUG_PRINT("info", ("stopping dump thread because protocol->write failed at line %d", __LINE__)); goto err; } } if(index_file->error == -1) goto err; mysql_bin_log.unlock_index(); my_eof(thd); DBUG_RETURN(FALSE); err: mysql_bin_log.unlock_index(); DBUG_RETURN(TRUE); } /* Finds the binlog file name and starting position of corresponding Gtid_log_event of gtid_string and store in parameters log_name and pos respectively @param[in] gtid_string Gtid in string format. @param[out] log_name Binlog file name where the gtid is physically located. @param[out] pos Position of gtid in log_name binlog. @return >0 Failure 0 Success */ uint find_gtid_position_helper(const char* gtid_string, char *log_name, my_off_t &gtid_pos) { DBUG_ENTER("find_gtid_position_helper"); Gtid gtid; Sid_map sid_map(NULL); uint error = ER_UNKNOWN_ERROR; int dir_len; Gtid_set previous_gtid_set(&sid_map); const Gtid_set_map *previous_gtid_set_map; if (gtid.parse(&sid_map, gtid_string) != RETURN_STATUS_OK) { goto err; } mysql_bin_log.lock_index(); previous_gtid_set_map = mysql_bin_log.get_previous_gtid_set_map(); for (auto rit = previous_gtid_set_map->rbegin(); rit != previous_gtid_set_map->rend(); ++rit) { previous_gtid_set.add_gtid_encoding((const uchar*)rit->second.c_str(), rit->second.length()); if (!previous_gtid_set.contains_gtid(gtid)) { /* Unlock index here since we don't iterate over the previous_gtid_set_map anymore. */ mysql_bin_log.unlock_index(); gtid_pos = find_gtid_pos_in_log(rit->first.c_str(), gtid, &sid_map); if (!gtid_pos) { error = ER_REQUESTED_GTID_NOT_IN_EXECUTED_SET; goto err; } dir_len = dirname_length(rit->first.c_str()); strcpy(log_name, rit->first.c_str() + dir_len); DBUG_RETURN(0); } previous_gtid_set.clear(); } mysql_bin_log.unlock_index(); error = ER_REQUESTED_PURGED_GTID; err: DBUG_RETURN(error); } /** Execute FIND BINLOG GTID statement. @param thd Pointer to THD object for the client thread executing the statement. @retval false Success @retval true Failure */ bool find_gtid_position(THD *thd) { DBUG_ENTER("find_gtid_position"); char log_name[FN_REFLEN]; my_off_t gtid_pos = 0; uint error = ER_UNKNOWN_ERROR; Protocol *protocol = thd->protocol; List<Item> field_list; field_list.push_back(new Item_empty_string("Log_name", 255)); field_list.push_back(new Item_return_int("Position", 20, MYSQL_TYPE_LONGLONG)); if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) goto err; error = find_gtid_position_helper(thd->lex->gtid_string, log_name, gtid_pos); if (error) goto err; protocol->prepare_for_resend(); protocol->store(log_name, strlen(log_name), &my_charset_bin); protocol->store(gtid_pos); protocol->update_checksum(); if (protocol->write()) { DBUG_PRINT("info", ("protocol->write failed inf find_gtid_position()")); goto err; } my_eof(thd); DBUG_RETURN(false); err: DBUG_PRINT("info", ("error: %u", error)); my_error(error, MYF(0), "Unknown error occured while finding gtid position"); DBUG_RETURN(true); } #endif /* HAVE_REPLICATION */