in sql/rpl_master.cc [1983:3490]
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
const Gtid_set* slave_gtid_executed, int flags)
{
/**
@todo: Clean up loop so that, among other things, we only have one
call to send_file(). This is WL#5721.
*/
#define GOTO_ERR \
do { \
DBUG_PRINT("info", ("mysql_binlog_send fails; goto err from line %d", \
__LINE__)); \
goto err; \
} while (0)
LOG_INFO linfo(dump_log.is_relay_log(), /* is_used_by_dump_thd = */ true);
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
ulong ev_offset;
bool using_gtid_protocol= slave_gtid_executed != NULL;
bool searching_first_gtid= using_gtid_protocol;
bool skip_group= false;
bool binlog_has_previous_gtids_log_event= false;
bool has_transmit_started= false;
bool gtid_event_logged = false;
Sid_map *sid_map= slave_gtid_executed ? slave_gtid_executed->get_sid_map() : NULL;
USER_STATS *us= thd_get_user_stats(thd);
ulonglong cur_timer = my_timer_now();
IO_CACHE_EX log;
File file = -1;
/*
Use a local string here to avoid disturbing the
contents of thd->packet. Note that calling thd->packet->free() here will
make code elsewhere crash
*/
String packet_str;
String* packet = &packet_str;
struct timespec last_event_sent_ts;
set_timespec_nsec(last_event_sent_ts, 0);
bool time_for_hb_event= false;
int error= 0;
const char *errmsg = "Unknown error";
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
NET* net = thd->get_net();
if (rpl_send_buffer_size &&
(setsockopt(net->vio->mysql_socket.fd, SOL_SOCKET, SO_SNDBUF,
&rpl_send_buffer_size, sizeof(rpl_send_buffer_size)) == -1
#ifdef UNIV_LINUX
||
setsockopt(net->vio->mysql_socket.fd, IPPROTO_TCP, TCP_WINDOW_CLAMP,
&rpl_send_buffer_size, sizeof(rpl_send_buffer_size)) == -1
#endif
))
sql_print_warning("Failed to set SO_SNDBUF with (error: %s).",
strerror(errno));
(void)set_dscp(thd);
mysql_cond_t *log_cond;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
Format_description_log_event fdle(BINLOG_VERSION), *p_fdle= &fdle;
Gtid first_gtid;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
#endif
int old_max_allowed_packet= thd->variables.max_allowed_packet;
/* This buffer should be enough for "slave offset: file_name file_pos".
set_query(state_msg, ...) might be called so set_query("", 0) must be
called at function end to avoid bogus memory references.
*/
char state_msg[FN_REFLEN + 100];
int state_msg_len = FN_REFLEN + 100;
char* orig_query= thd->query();
uint32 orig_query_length= thd->query_length();
/* The number of times to skip calls to processlist_slave_offset */
int skip_state_update;
/*
Preallocate fixed buffer for event packets. If an event is more
than the size, String class will re-allocate memory and we will
reset the packet memory for the next packet creation command.
This reduces calls to malloc and free.
*/
const ulong packet_buffer_size = rpl_event_buffer_size;
char *packet_buffer = NULL;
const ulong heartbeat_packet_buffer_size = rpl_event_buffer_size;
char *heartbeat_packet_buffer = NULL;
/*
Dump thread sends ER_MASTER_FATAL_ERROR_READING_BINLOG instead of the real
errors happend on master to slave when erorr is encountered.
So set a temporary Diagnostics_area to thd. The low level error is always
set into the temporary Diagnostics_area and be ingored. The original
Diagnostics_area will be restored at the end of this function.
ER_MASTER_FATAL_ERROR_READING_BINLOG will be set to the original
Diagnostics_area.
*/
Diagnostics_area temp_da;
Diagnostics_area *saved_da= thd->get_stmt_da();
thd->set_stmt_da(&temp_da);
bool was_killed_by_duplicate_slave_id= false;
DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
memset(&log, 0, sizeof(log));
/*
heartbeat_period from @master_heartbeat_period user variable
*/
ulonglong heartbeat_period= get_heartbeat_period(thd);
struct timespec heartbeat_buf;
struct timespec *heartbeat_ts= NULL;
const LOG_POS_COORD start_coord= { log_ident, pos },
*p_start_coord= &start_coord;
LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
*p_coord= &coord_buf;
/*
We use the following variables to send a HB event
when groups are skipped during a GTID protocol.
*/
/* Flag to check if last transaction was skipped */
bool last_skip_group= skip_group;
/* File name where last skip group is present */
char last_skip_log_name[FN_REFLEN+1];
/* Coordinates of the last skip group */
LOG_POS_COORD last_skip_coord_buf= {last_skip_log_name, BIN_LOG_HEADER_SIZE},
*p_last_skip_coord= &last_skip_coord_buf;
bool observe_transmission= false;
bool semi_sync_slave = false;
if (heartbeat_period != LL(0))
{
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
ulonglong dump_thread_wait_sleep_usec= get_dump_thread_wait_sleep(thd);
if (block_dump_threads)
{
sql_print_error("Binlog dump threads are blocked!");
GOTO_ERR;
}
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
errmsg = "Master fails in COM_BINLOG_DUMP because of --opt-sporadic-binlog-dump-fail";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
#endif
if (!dump_log.is_open())
{
errmsg = "Binary log is not open";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
if (!server_id_supplied)
{
errmsg = "Misconfigured master - server_id was not set";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
packet_buffer = (char*) my_malloc(packet_buffer_size, MYF(MY_WME));
if (packet_buffer == NULL) {
errmsg = "Master failed pre-allocate event fixed buffer";
my_errno= ER_OUTOFMEMORY;
GOTO_ERR;
}
heartbeat_packet_buffer = (char*) my_malloc(heartbeat_packet_buffer_size,
MYF(MY_WME));
if (heartbeat_packet_buffer == NULL) {
errmsg = "Master failed pre-allocate heartbeat event fixed buffer";
my_errno= ER_OUTOFMEMORY;
GOTO_ERR;
}
name= search_file_name;
if (log_ident[0])
dump_log.make_log_name(search_file_name, log_ident);
else
{
if (using_gtid_protocol)
{
/*
In normal scenarios, it is not possible that Slave will
contain more gtids than Master with resepctive to Master's
UUID. But it could be possible case if Master's binary log
is truncated(due to raid failure) or Master's binary log is
deleted but GTID_PURGED was not set properly. That scenario
needs to be validated, i.e., it should *always* be the case that
Slave's gtid executed set (+retrieved set) is a subset of
Master's gtid executed set with respective to Master's UUID.
If it happens, dump thread will be stopped during the handshake
with Slave (thus the Slave's I/O thread will be stopped with the
error. Otherwise, it can lead to data inconsistency between Master
and Slave.
*/
Sid_map* slave_sid_map= slave_gtid_executed->get_sid_map();
DBUG_ASSERT(slave_sid_map);
global_sid_lock->wrlock();
const rpl_sid &server_sid= gtid_state->get_server_sid();
rpl_sidno subset_sidno= slave_sid_map->sid_to_sidno(server_sid);
if (!slave_gtid_executed->is_subset_for_sid(gtid_state->get_logged_gtids(),
gtid_state->get_server_sidno(),
subset_sidno))
{
errmsg= ER(ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER);
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
global_sid_lock->unlock();
GOTO_ERR;
}
/*
Setting GTID_PURGED (when GTID_EXECUTED set is empty i.e., when
previous_gtids are also empty) will make binlog rotate. That
leaves first binary log with empty previous_gtids and second
binary log's previous_gtids with the value of gtid_purged.
In find_first_log_not_in_gtid_set() while we search for a binary
log whose previous_gtid_set is subset of slave_gtid_executed,
in this particular case, server will always find the first binary
log with empty previous_gtids which is subset of any given
slave_gtid_executed. Thus Master thinks that it found the first
binary log which is actually not correct and unable to catch
this error situation. Hence adding below extra if condition
to check the situation. Slave should know about Master's purged GTIDs.
If Slave's GTID executed + retrieved set does not contain Master's
complete purged GTID list, that means Slave is requesting(expecting)
GTIDs which were purged by Master. We should let Slave know about the
situation. i.e., throw error if slave's GTID executed set is not
a superset of Master's purged GTID set.
The other case, where user deleted binary logs manually
(without using 'PURGE BINARY LOGS' command) but gtid_purged
is not set by the user, the following if condition cannot catch it.
But that is not a problem because in find_first_log_not_in_gtid_set()
while checking for subset previous_gtids binary log, the logic
will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS
is thrown from there.
*/
bool lost_gtids_is_subset= false;
if (!enable_raft_plugin)
{
const Gtid_set *lost_gtids= gtid_state->get_lost_gtids();
lost_gtids_is_subset= lost_gtids->is_subset(slave_gtid_executed);
global_sid_lock->unlock();
}
else
{
global_sid_lock->unlock();
/** In raft mode we calculate lost gtids from the binlog/relaylog index
* file instead of using the global state that is always based on the
* apply side binlogs. Apply logs are purged on election so global state
* is currently incorrect wrt raft logs.
*
* TODO: Remove this hack after global gtid state is fixed wrt to raft
* logs
*/
Sid_map gtids_lost_sid_map(nullptr);
Gtid_set gtids_lost(>ids_lost_sid_map);
dump_log.get_lost_gtids(>ids_lost);
lost_gtids_is_subset= gtids_lost.is_subset(slave_gtid_executed);
}
if (!lost_gtids_is_subset)
{
errmsg= ER(ER_MASTER_HAS_PURGED_REQUIRED_GTIDS);
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
first_gtid.clear();
if (dump_log.find_first_log_not_in_gtid_set(name,
slave_gtid_executed,
&first_gtid,
&errmsg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
}
else
name= 0; // Find first log
}
linfo.index_file_offset= 0;
if (dump_log.find_log_pos(&linfo, name, 1))
{
errmsg = "Could not find first log file name in binary log index file";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
mutex_lock_shard(SHARDED(&LOCK_thread_count), thd);
thd->current_linfo = &linfo;
mutex_unlock_shard(SHARDED(&LOCK_thread_count), thd);
if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
/*
open_binlog_file() calls init_io_cache() to initialize the read_function
in IO_CACHE. We need to replace the read_function with our
counting_read_function() in order to count how many times the
read_function is called.
*/
log.extend(thd);
if (pos < BIN_LOG_HEADER_SIZE)
{
errmsg= "Client requested master to start replication from position < 4";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
if (pos > my_b_filelength(&log))
{
errmsg= "Client requested master to start replication from position > file size";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
if (log_warnings > 1)
sql_print_information("Start binlog_dump to master_thread_id(%u) slave_server(%u), pos(%s, %lu)",
thd->thread_id(), thd->server_id, log_ident, (ulong)pos);
semi_sync_slave = is_semi_sync_slave(thd);
// call into semi-sync only when raft is disabled
if (semi_sync_slave && !enable_raft_plugin &&
RUN_HOOK(binlog_transmit, transmit_start,
(thd, flags, log_ident, pos, &observe_transmission)))
{
errmsg= "Failed to run hook 'transmit_start'";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
has_transmit_started= true;
/* reset transmit packet for the fake rotate event below */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer, packet_buffer_size,
semi_sync_slave))
GOTO_ERR;
/*
Tell the client about the log name with a fake Rotate event;
this is needed even if we also send a Format_description_log_event
just after, because that event does not contain the binlog's name.
Note that as this Rotate event is sent before
Format_description_log_event, the slave cannot have any info to
understand this event's format, so the header len of
Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
than other events except FORMAT_DESCRIPTION_EVENT).
Before 4.0.14 we called fake_rotate_event below only if (pos ==
BIN_LOG_HEADER_SIZE), because if this is false then the slave
already knows the binlog's name.
Since, we always call fake_rotate_event; if the slave already knew
the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
useless but does not harm much. It is nice for 3.23 (>=.58) slaves
which test Rotate events to see if the master is 4.0 (then they
choose to stop because they can't replicate 4.0); by always calling
fake_rotate_event we are sure that 3.23.58 and newer will detect the
problem as soon as replication starts (BUG#198).
Always calling fake_rotate_event makes sending of normal
(=from-binlog) Rotate events a priori unneeded, but it is not so
simple: the 2 Rotate events are not equivalent, the normal one is
before the Stop event, the fake one is after. If we don't send the
normal one, then the Stop event will be interpreted (by existing 4.0
slaves) as "the master stopped", which is wrong. So for safety,
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
semi_sync_slave, get_binlog_checksum_value_at_connect(current_thd),
heartbeat_period))
{
/*
This error code is not perfect, as fake_rotate_event() does not
read anything from the binlog; if it fails it's because of an
error in my_net_write(), fortunately it will say so in errmsg.
*/
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
from client to master.
*/
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
p_coord->pos= pos; // the first hb matches the slave's last seen value
if (pos > BIN_LOG_HEADER_SIZE)
{
/* reset transmit packet for the event read from binary log
file */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer, packet_buffer_size,
semi_sync_slave))
GOTO_ERR;
/*
Try to find a Format_description_log_event at the beginning of
the binlog
*/
if (!(error = Log_event::read_log_event(&log, packet, 0,
log_file_name)))
{
DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__));
/*
The packet has offsets equal to the normal offsets in a
binlog event + ev_offset (the first ev_offset characters are
the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %s",
Log_event::get_type_str((Log_event_type)(*packet)[EVENT_TYPE_OFFSET + ev_offset])));
if ((*packet)[EVENT_TYPE_OFFSET + ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
"that master is configured to log";
sql_print_warning("Master is configured to log replication events "
"with checksum, but will not send such events to "
"slaves that cannot process them");
GOTO_ERR;
}
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* fix the checksum due to latest changes in header */
if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
if (my_net_write_event(net, p_coord, packet, &errmsg,
&my_errno, semi_sync_slave, false,
false, heartbeat_period))
{
GOTO_ERR;
}
/*
No need to save this event. We are only doing simple reads
(no real parsing of the events) so we don't need it. And so
we don't need the artificial Format_description_log_event of
3.23&4.x.
*/
}
}
else
{
if (test_for_non_eof_log_read_errors(error, &errmsg))
GOTO_ERR;
/*
It's EOF, nothing to do, go on reading next events, the
Format_description_log_event will be found naturally if it is written.
*/
}
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
/* The Format_description_log_event event will be found naturally. */
}
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
skip_state_update= 0;
while (!net->error && net->vio != 0 && !thd->killed)
{
Log_event_type event_type= UNKNOWN_EVENT;
bool goto_next_binlog= false;
/* reset the transmit packet for the event read from binary log
file */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer, packet_buffer_size,
semi_sync_slave))
GOTO_ERR;
bool is_active_binlog= false;
while (!thd->killed &&
!(error= Log_event::read_log_event(&log, packet,
current_checksum_alg,
log_file_name,
&is_active_binlog)))
{
DBUG_EXECUTE_IF("simulate_dump_thread_kill",
{
thd->killed= THD::KILL_CONNECTION;
});
DBUG_EXECUTE_IF("hold_dump_thread_inside_inner_loop",
{
const char act[]= "now "
"signal signal_inside_inner_loop "
"wait_for signal_continue";
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
DBUG_ASSERT(thd->killed);
};);
if (us)
{
ulonglong n_timer = my_timer_now();
double micro_secs = my_timer_to_microseconds(n_timer - cur_timer);
us->microseconds_wall.inc(micro_secs);
cur_timer = n_timer;
}
time_t created;
DBUG_PRINT("info", ("read_log_event returned 0 on line %d", __LINE__));
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
net_flush(net);
errmsg = "Debugging binlog dump abort";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
#endif
/*
log's filename does not change while it's active
*/
p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
if (event_type == XID_EVENT)
{
net_flush(net);
const char act[]=
"now "
"wait_for signal.continue";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
}
});
processlist_slave_offset(state_msg, state_msg_len, thd,
log_file_name, my_b_tell(&log),
&skip_state_update, semi_sync_slave);
switch (event_type)
{
case FORMAT_DESCRIPTION_EVENT:
skip_group= false;
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
"that master is configured to log";
sql_print_warning("Master is configured to log replication events "
"with checksum, but will not send such events to "
"slaves that cannot process them");
GOTO_ERR;
}
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
created= uint4korr(packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+ev_offset);
if (using_gtid_protocol && created > 0)
{
if (first_gtid.sidno >= 1 && first_gtid.gno >= 1 &&
slave_gtid_executed->contains_gtid(first_gtid.sidno,
first_gtid.gno))
{
/*
As we are skipping at least the first transaction of the binlog,
we must clear the "created" field of the FD event (set it to 0)
to avoid cleaning up temp tables on slave.
*/
DBUG_PRINT("info",("setting 'created' to 0 before sending FD event"));
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* Fix the checksum due to latest changes in header */
if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
first_gtid.clear();
}
}
/*
Fixes the information on the checksum algorithm when a new
format description is read. Notice that this only necessary
when we need to filter out some transactions which were
already processed.
*/
p_fdle->checksum_alg= current_checksum_alg;
break;
case ANONYMOUS_GTID_LOG_EVENT:
/* do nothing */
break;
case GTID_LOG_EVENT:
if (using_gtid_protocol)
{
/*
The current implementation checks if the GTID was not processed
by the slave. This means that everytime a GTID is read, one needs
to check if it was already processed by the slave. If this is the
case, the group is not sent. Otherwise, it must be sent.
I think we can do better than that. /Alfranio.
*/
ulonglong checksum_size=
((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
BINLOG_CHECKSUM_LEN + ev_offset : ev_offset);
/**
@todo: use a local sid_map to avoid the lookup in the
global one here /Sven
*/
Gtid_log_event gtid_ev(packet->ptr() + ev_offset,
packet->length() - checksum_size,
p_fdle);
skip_group= slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map),
gtid_ev.get_gno());
searching_first_gtid= skip_group;
DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) skip group(%d) "
"searching gtid(%d).",
gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
skip_group, searching_first_gtid));
gtid_event_logged = true;
}
break;
case STOP_EVENT:
case INCIDENT_EVENT:
skip_group= searching_first_gtid;
break;
case PREVIOUS_GTIDS_LOG_EVENT:
binlog_has_previous_gtids_log_event= true;
/* FALLTHROUGH */
case ROTATE_EVENT:
skip_group= false;
break;
default:
if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
/*
If we come here, it means we are seeing a 'normal' DML/DDL
event (e.g. query_log_event) without having seen any
Previous_gtid_log_event. That means we are in an old
binlog (no previous_gtids_log_event). When using the GTID
protocol, that means we must skip the entire binary log
and jump to the next one.
*/
goto_next_binlog= true;
if (!gtid_event_logged && using_gtid_protocol)
{
/*
Skip groups in the binlogs which don't have any gtid event
logged before them. When read_only is ON, the server
doesn't generate GTID and so no gtid_event is logged before binlog
events. But when read_only is OFF, the server starts
writing gtid_events in the middle of active binlog. When slave
connects with gtid_protocol, master needs to skip binlog events
which don't have corresponding gtid_event.
*/
skip_group = true;
}
break;
}
if (event_type == XID_EVENT)
{
gtid_event_logged = false;
}
if (goto_next_binlog)
// stop reading from this binlog
break;
DBUG_PRINT("info", ("EVENT_TYPE %d SEARCHING %d SKIP_GROUP %d file %s pos %lld\n",
event_type, searching_first_gtid, skip_group, log_file_name,
my_b_tell(&log)));
pos = my_b_tell(&log);
// semi-sync is called only when raft is disabled
if (!enable_raft_plugin && observe_transmission && semi_sync_slave &&
RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "run 'before_send_event' hook failed";
GOTO_ERR;
}
/* The present event was skipped, so store the event coordinates */
if (skip_group)
{
p_last_skip_coord->pos= p_coord->pos;
strcpy(p_last_skip_coord->file_name, p_coord->file_name);
/*
If we have not send any event from past 'heartbeat_period' time
period, then it is time to send a packet before skipping this group.
*/
DBUG_EXECUTE_IF("inject_2sec_sleep_when_skipping_an_event",
{
my_sleep(2000000);
});
struct timespec cur_clock;
set_timespec_nsec(cur_clock, 0);
DBUG_ASSERT(cmp_timespec(cur_clock, last_event_sent_ts) >= 0);
// Both diff_timespec() and heartbeat_period are in nano seconds.
time_for_hb_event= (diff_timespec(cur_clock, last_event_sent_ts) >=
heartbeat_period);
DBUG_EXECUTE_IF("send_zero_hb_event",
{
time_for_hb_event= true;
});
}
if ((!skip_group && last_skip_group
&& event_type != FORMAT_DESCRIPTION_EVENT) || time_for_hb_event)
{
/*
Dump thread is ready to send it's first transaction after
one or more skipped transactions or dump thread did not
send any event from past 'heartbeat_period' time frame
(busy skipping gtid groups). Send a heart beat event
to update slave IO thread coordinates before that happens.
Notice that for a new binary log file, FORMAT_DESCRIPTION_EVENT
is the first event to be sent to the slave. In this case, it is
no need to send a HB event (which might have coordinates
of previous binlog file).
This heartbeat event sending only happens when the master is in the
middle or has just completed skipping all the trxs. Therefore, HB
shouldn't carry Timestamp.
*/
if (send_last_skip_group_heartbeat(thd, net, packet, p_last_skip_coord,
&ev_offset, current_checksum_alg,
&errmsg, observe_transmission,
semi_sync_slave,
heartbeat_packet_buffer,
heartbeat_packet_buffer_size,
heartbeat_period))
{
GOTO_ERR;
}
set_timespec_nsec(last_event_sent_ts, 0);
last_skip_group= time_for_hb_event= false;
}
else
{
last_skip_group= skip_group;
}
if (skip_group == false)
{
if (my_net_write_event(net, p_coord, packet, &errmsg, &my_errno,
semi_sync_slave,
event_type != FORMAT_DESCRIPTION_EVENT &&
event_type != ROTATE_EVENT,
(rpl_semi_sync_master_enabled ||
enable_raft_plugin) &&
rpl_wait_for_semi_sync_ack, heartbeat_period))
{
GOTO_ERR;
}
set_timespec_nsec(last_event_sent_ts, 0);
}
DBUG_EXECUTE_IF("dump_thread_wait_after_send_write_rows",
{
if (event_type == WRITE_ROWS_EVENT)
{
net_flush(net);
const char act[]=
"now "
"wait_for signal.continue";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
}
});
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
if (event_type == XID_EVENT)
{
net_flush(net);
}
});
DBUG_PRINT("info", ("log event code %d", event_type));
if (skip_group == false && event_type == LOAD_EVENT)
{
if (send_file(thd))
{
errmsg = "failed in send_file()";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
}
if (!enable_raft_plugin && observe_transmission && semi_sync_slave &&
RUN_HOOK(binlog_transmit, after_send_event,
(thd, flags, packet, log_file_name, skip_group ? pos : 0)))
{
errmsg= "Failed to run hook 'after_send_event'";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
// case: this was a skipped group (i.e. the semi-sync slave already has
// this transaction); update last acked coordinates
if (semi_sync_slave && rpl_wait_for_semi_sync_ack && searching_first_gtid)
{
signal_semi_sync_ack(p_coord);
}
/* reset transmit packet for next loop */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer, packet_buffer_size,
semi_sync_slave))
GOTO_ERR;
}
/* If the above while is killed due to thd->killed flag and not
due to read_log_event error, then do nothing.*/
if (thd->killed)
goto end;
DBUG_EXECUTE_IF("wait_after_binlog_EOF",
{
const char act[]= "now wait_for signal.rotate_finished no_clear_event";
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
};);
/*
TODO: now that we are logging the offset, check to make sure
the recorded offset and the actual match.
Guilhem 2003-06: this is not true if this master is a slave
<4.0.15 running with --log-slave-updates, because then log_pos may
be the offset in the-master-of-this-master's binlog.
*/
if (test_for_non_eof_log_read_errors(error, &errmsg))
GOTO_ERR;
if (!is_active_binlog)
goto_next_binlog= true;
/*
When read_log_event in the above loop returns LOG_READ_BINLOG_LAST_
VALID_POS instead of normal EOF, we cannot open next binlog file which
may result in skipping of the events in current file. Instead check for
error value and try to read an event inside this if statement.
LOG_READ_EOF confirms that we reached the end of current file.
*/
if (error != LOG_READ_EOF && !goto_next_binlog)
{
/*
Block until there is more data in the log
*/
if (net_flush(net))
{
errmsg = "failed on net_flush()";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
/*
We may have missed the update broadcast from the log
that has just happened, let's try to catch it if it did.
If we did not miss anything, we just wait for other threads
to signal us.
*/
{
log.error=0;
bool read_packet = 0;
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
errmsg = "Debugging binlog dump abort";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
#endif
/* reset the transmit packet for the event read from binary log
file */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer, packet_buffer_size,
semi_sync_slave))
GOTO_ERR;
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
TODO:
Add an counter that is incremented for each time we update the
binary log. We can avoid the following read if the counter
has not been updated since last read.
*/
switch (error= Log_event::read_log_event(&log, packet,
current_checksum_alg,
log_file_name)) {
case 0:
DBUG_PRINT("info", ("read_log_event returned 0 on line %d",
__LINE__));
/* we read successfully, so we'll need to send it to the slave */
read_packet = 1;
p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
DBUG_ASSERT(event_type != FORMAT_DESCRIPTION_EVENT);
break;
case LOG_READ_EOF:
goto_next_binlog = true;
break;
case LOG_READ_BINLOG_LAST_VALID_POS:
{
/*
Take lock_binlog_pos to ensure that we read everything in
binlog file. If there is nothing left to read in binlog file,
wait until we get a signal from other threads that binlog is
updated.
*/
// case: sleep before locking and waiting for new data
if (unlikely(dump_thread_wait_sleep_usec != 0))
{
DBUG_EXECUTE_IF("reached_dump_thread_wait_sleep",
{
const char act[]= "now "
"signal reached "
"wait_for continue";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(
!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
};);
usleep(dump_thread_wait_sleep_usec);
}
// Note: This part is tricky and should be touched if you really know
// what you're doing. We're locking dump log to get the raw log
// pointer, then we're locking end log pos before unlocking the dump
// log. We're taking the lock in the same sequence as when log is
// switched in binlog_change_to_binlog() and binlog_change_to_apply()
// to avoid deadlocks. This locking pattern ensures that we're working
// with the correct raw log and that there is no race between getting
// the raw log and log switching. Log switching will be blocked until
// we release the binlog end pos lock before waiting for signal in
// wait_for_update_bin_log().
const bool is_dump_log_locked= dump_log.lock();
MYSQL_BIN_LOG* raw_log= dump_log.get_log(false);
raw_log->lock_binlog_end_pos();
dump_log.unlock(is_dump_log_locked);
/*
No need to wait if the the current log is not active or
we haven't reached binlog_end_pos.
Note that is_active may be false positive, but binlog_end_pos
is valid here. If rotate thread is about to rotate the log,
we will get a singal_update() in open_binlog() which will eventually
unblock us and checking is_active() later in read_log_event() will
give the valid value.
*/
if (!raw_log->is_active(log_file_name) ||
my_b_tell(&log) < raw_log->get_binlog_end_pos())
{
raw_log->unlock_binlog_end_pos();
break;
}
/*
There are two ways to tell the server to not block:
- Set the BINLOG_DUMP_NON_BLOCK flag.
This is official, documented, not used by any mysql
client, but used by some external users.
- Set server_id=0.
This is unofficial, undocumented, and used by
mysqlbinlog -R since the beginning of time.
When mysqlbinlog --stop-never is used, it sets a 'fake'
server_id that defaults to 1 but can be set to anything
else using stop-never-slave-server-id. This has the
drawback that if the server_id conflicts with any other
running slave, or with any other instance of mysqlbinlog
--stop-never, then that other instance will be killed. It
is also an unnecessary burden on the user to have to
specify a server_id different from all other server_ids
just to avoid conflicts.
As of MySQL 5.6.20 and 5.7.5, mysqlbinlog redundantly sets
the BINLOG_DUMP_NONBLOCK flag when one or both of the
following holds:
- the --stop-never option is *not* specified
In a far future, this means we can remove the unofficial
functionality that server_id=0 implies nonblocking
behavior. That will allow mysqlbinlog to use server_id=0
always. That has the advantage that mysqlbinlog
--stop-never cannot cause any running dump threads to be
killed.
*/
if (thd->server_id == 0 || ((flags & BINLOG_DUMP_NON_BLOCK) != 0))
{
DBUG_PRINT("info", ("stopping dump thread because server_id==0 or the BINLOG_DUMP_NON_BLOCK flag is set: server_id=%u flags=%d", thd->server_id, flags));
raw_log->unlock_binlog_end_pos();
goto end;
}
int ret;
ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
#ifndef DBUG_OFF
ulong hb_info_counter= 0;
#endif
PSI_stage_info old_stage;
signal_cnt= raw_log->signal_cnt;
do
{
if (heartbeat_period != 0)
{
DBUG_ASSERT(heartbeat_ts);
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
}
log_cond= raw_log->get_log_cond();
thd->ENTER_COND(log_cond, raw_log->get_binlog_end_pos_lock(),
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
/*
When using GTIDs, if the dump thread has reached the end of the
binary log and the last transaction is skipped,
send one heartbeat event even when the heartbeat is off.
If the heartbeat is on, it is better to send a heartbeat
event as the time_out of certain functions (Ex: master_pos_wait()
or semi sync ack timeout) might be less than heartbeat period.
This can happen when the slave has exact same GTID as the master.
In this case, we still send the first HB with 0 timestamp to be
safe since the follow up HB with carry the right timestamp
*/
if (skip_group)
{
/*
TODO: Number of HB events sent from here can be reduced
by checking whehter it is time to send a HB event or not.
(i.e., using the flag time_for_hb_event)
*/
if (send_last_skip_group_heartbeat(thd, net, packet,
p_coord, &ev_offset,
current_checksum_alg, &errmsg,
observe_transmission,
semi_sync_slave,
heartbeat_packet_buffer,
heartbeat_packet_buffer_size,
heartbeat_period))
{
thd->EXIT_COND(&old_stage);
GOTO_ERR;
}
last_skip_group= false; /*A HB for this pos has been sent. */
}
ret= raw_log->wait_for_update_bin_log(thd, heartbeat_ts);
DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
if (ret == ETIMEDOUT || ret == ETIME)
{
#ifndef DBUG_OFF
if (hb_info_counter < 3)
{
sql_print_information("master sends heartbeat message");
hb_info_counter++;
if (hb_info_counter == 3)
sql_print_information("the rest of heartbeat info skipped ...");
}
#endif
/* reset transmit packet for the heartbeat event */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer,
packet_buffer_size, semi_sync_slave))
{
thd->EXIT_COND(&old_stage);
GOTO_ERR;
}
/**
* HB events sent here are essentially when the master is waiting
* on updates and send HB to keep the connection connected.
*
* This is the only place that sending HB with Timestamp makes
* sense and won't break monotonic TS calculated SBM
*/
if (send_heartbeat_event(net,
packet,
p_coord,
semi_sync_slave,
current_checksum_alg,
true,
heartbeat_period))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
thd->EXIT_COND(&old_stage);
GOTO_ERR;
}
}
else
{
DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
}
} while (signal_cnt == raw_log->signal_cnt && !thd->killed);
thd->EXIT_COND(&old_stage);
}
break;
default:
test_for_non_eof_log_read_errors(error, &errmsg);
GOTO_ERR;
}
if (read_packet)
{
processlist_slave_offset(state_msg, state_msg_len, thd,
log_file_name, my_b_tell(&log),
&skip_state_update, semi_sync_slave);
switch (event_type)
{
case ANONYMOUS_GTID_LOG_EVENT:
/* do nothing */
break;
case GTID_LOG_EVENT:
if (using_gtid_protocol)
{
ulonglong checksum_size=
((p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
p_fdle->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
BINLOG_CHECKSUM_LEN + ev_offset : ev_offset);
Gtid_log_event gtid_ev(packet->ptr() + ev_offset,
packet->length() - checksum_size,
p_fdle);
skip_group=
slave_gtid_executed->contains_gtid(gtid_ev.get_sidno(sid_map),
gtid_ev.get_gno());
searching_first_gtid= skip_group;
DBUG_PRINT("info", ("Dumping GTID sidno(%d) gno(%lld) "
"skip group(%d) searching gtid(%d).",
gtid_ev.get_sidno(sid_map), gtid_ev.get_gno(),
skip_group, searching_first_gtid));
gtid_event_logged = true;
}
break;
case STOP_EVENT:
case INCIDENT_EVENT:
skip_group= searching_first_gtid;
break;
case PREVIOUS_GTIDS_LOG_EVENT:
binlog_has_previous_gtids_log_event= true;
/* FALLTHROUGH */
case ROTATE_EVENT:
skip_group= false;
break;
default:
if (!binlog_has_previous_gtids_log_event && using_gtid_protocol)
/*
If we come here, it means we are seeing a 'normal' DML/DDL
event (e.g. query_log_event) without having seen any
Previous_gtid_log_event. That means we are in an old
binlog (no previous_gtids_log_event). When using the GTID
protocol, that means we must skip the entire binary log
and jump to the next one.
*/
goto_next_binlog= true;
if (!gtid_event_logged && using_gtid_protocol)
{
/*
Skip groups in the binlogs which don't have any gtid event
logged before them.
*/
skip_group = true;
}
break;
}
if (event_type == XID_EVENT)
{
gtid_event_logged = false;
}
/* The present event was skipped in a GTID protocol, store the coordinates */
if (skip_group)
{
p_last_skip_coord->pos= p_coord->pos;
strcpy(p_last_skip_coord->file_name, p_coord->file_name);
}
if (!skip_group && !goto_next_binlog)
{
/**
* If the last group was skipped, send a HB event,
* similarly, HB sent here should not carry TS since the last
* trx is skipped and we are not sure if we are waiting on update
*/
if (last_skip_group &&
send_last_skip_group_heartbeat(thd, net, packet,
p_last_skip_coord, &ev_offset,
current_checksum_alg, &errmsg,
observe_transmission,
semi_sync_slave,
heartbeat_packet_buffer,
heartbeat_packet_buffer_size,
heartbeat_period))
{
GOTO_ERR;
}
THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
pos = my_b_tell(&log);
// semi-sync is called only when raft is disabled
if (!enable_raft_plugin && observe_transmission &&
semi_sync_slave &&
RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "run 'before_send_event' hook failed";
GOTO_ERR;
}
if (my_net_write_event(net, p_coord, packet, &errmsg, &my_errno,
semi_sync_slave,
(event_type != FORMAT_DESCRIPTION_EVENT &&
event_type != ROTATE_EVENT),
(rpl_semi_sync_master_enabled ||
enable_raft_plugin) &&
rpl_wait_for_semi_sync_ack,
heartbeat_period))
{
GOTO_ERR;
}
set_timespec_nsec(last_event_sent_ts, 0);
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
errmsg = "failed in send_file()";
my_errno= ER_UNKNOWN_ERROR;
GOTO_ERR;
}
}
}
if(!goto_next_binlog)
{
// semi-sync is called only when raft is disabled
if (!enable_raft_plugin && observe_transmission &&
semi_sync_slave &&
RUN_HOOK(binlog_transmit, after_send_event,
(thd, flags, packet, log_file_name,
skip_group ? pos : 0)))
{
my_errno= ER_UNKNOWN_ERROR;
errmsg= "Failed to run hook 'after_send_event'";
GOTO_ERR;
}
// case: this was a skipped group (i.e. the semi-sync slave already
// has this transaction); update last acked coordinates
if (semi_sync_slave && rpl_wait_for_semi_sync_ack &&
searching_first_gtid)
{
signal_semi_sync_ack(p_coord);
}
}
/* Save the skip group for next iteration */
last_skip_group= skip_group;
}
log.error=0;
}
}
if (goto_next_binlog)
{
skip_state_update = 0;
// clear flag because we open a new binlog
binlog_has_previous_gtids_log_event= false;
THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
DBUG_EXECUTE_IF("dump_wait_before_find_next_log",
{
const char act[]= "now signal signal.reached wait_for signal.done";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
};);
switch (dump_log.find_next_log(&linfo, 1)) {
case 0:
break;
default:
errmsg = "could not find next log";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
/* reset transmit packet for the possible fake rotate event */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg,
observe_transmission,
packet, packet_buffer, packet_buffer_size,
semi_sync_slave))
GOTO_ERR;
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event.
There are at least two cases when this can happen:
- The previous binary log was the last one before the master was
shutdown and restarted.
- The previous binary log was GTID-free (did not contain a
Previous_gtids_log_event) and the slave is connecting using
the GTID protocol.
This way we tell the slave about the new log's name and
position. If the binlog is 5.0 or later, the next event we
are going to read and send is Format_description_log_event.
*/
if ((file=open_binlog_file(&log, log_file_name, &errmsg)) < 0 ||
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
&errmsg, semi_sync_slave, current_checksum_alg,
heartbeat_period))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
GOTO_ERR;
}
/* hook read_function again, since we got a new bin file from
open_binlog_file above: */
log.extend(thd);
p_coord->file_name= log_file_name; // reset to the next
}
}
end:
/*
If the dump thread was killed because of a duplicate slave UUID we
will fail throwing an error to the slave so it will not try to
reconnect anymore.
*/
mysql_mutex_lock(&thd->LOCK_thd_data);
was_killed_by_duplicate_slave_id= thd->duplicate_slave_id;
mysql_mutex_unlock(&thd->LOCK_thd_data);
if (was_killed_by_duplicate_slave_id)
{
errmsg= "A slave with the same server_uuid/server_id as this slave "
"has connected to the master";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
thd->set_stmt_da(saved_da);
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
// semi-sync is called only when raft is disabled
if (!enable_raft_plugin && has_transmit_started && semi_sync_slave)
(void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
mutex_lock_shard(SHARDED(&LOCK_thread_count), thd);
thd->current_linfo = 0;
mutex_unlock_shard(SHARDED(&LOCK_thread_count), thd);
thd->variables.max_allowed_packet= old_max_allowed_packet;
/* Undo any calls done by processlist_slave_offset */
thd->set_query(orig_query, orig_query_length);
if (heartbeat_packet_buffer != NULL)
my_free(heartbeat_packet_buffer);
repl_cleanup(packet, packet_buffer);
if (us)
{
ulonglong n_timer = my_timer_now();
us->microseconds_wall.inc(my_timer_to_microseconds(n_timer - cur_timer));
}
DBUG_VOID_RETURN;
err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
{
/*
detailing the fatal error message with coordinates
of the last position read.
*/
my_snprintf(error_text, sizeof(error_text),
"%s; the first event '%s' at %lld, "
"the last event read from '%s' at %lld, "
"the last byte read from '%s' at %lld.",
errmsg,
p_start_coord->file_name, p_start_coord->pos,
p_coord->file_name, p_coord->pos,
log_file_name, my_b_tell(&log));
}
else
{
strncpy(error_text, errmsg, sizeof(error_text));
error_text[sizeof(error_text) - 1]= '\0';
}
end_io_cache(&log);
// semi-sync is called only when raft is disabled
if (!enable_raft_plugin && has_transmit_started && semi_sync_slave)
(void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
thread list and update thd->current_linfo->index_file_offset
this mutex will make sure that it never tried to update our linfo
after we return from this stack frame
*/
mutex_lock_shard(SHARDED(&LOCK_thread_count), thd);
thd->current_linfo = 0;
mutex_unlock_shard(SHARDED(&LOCK_thread_count), thd);
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
thd->set_stmt_da(saved_da);
my_message(my_errno, error_text, MYF(0));
/* Undo any calls done by processlist_slave_offset */
thd->set_query(orig_query, orig_query_length);
if (heartbeat_packet_buffer != NULL)
my_free(heartbeat_packet_buffer);
repl_cleanup(packet, packet_buffer);
if (us)
{
ulonglong n_timer = my_timer_now();
us->microseconds_wall.inc(my_timer_to_microseconds(n_timer - cur_timer));
}
DBUG_VOID_RETURN;
}