plugin/group_replication/src/certifier.cc (1,529 lines of code) (raw):
/* Copyright (c) 2014, 2025, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include <assert.h>
#include <signal.h>
#include <time.h>
#include <map>
#include <mysql/components/services/log_builtins.h>
#include "mutex_lock.h"
#include "my_dbug.h"
#include "my_systime.h"
#include "mysql/gtid/tsid.h"
#include "plugin/group_replication/include/certifier.h"
#include "plugin/group_replication/include/observer_trans.h"
#include "plugin/group_replication/include/opt_tracker.h"
#include "plugin/group_replication/include/plugin.h"
#include "plugin/group_replication/include/plugin_handlers/metrics_handler.h"
#include "plugin/group_replication/include/plugin_messages/recovery_metadata_message_compressed_parts.h"
#include "plugin/group_replication/include/services/system_variable/get_system_variable.h"
#include "scope_guard.h"
using namespace gr;
const std::string Certifier::GTID_EXTRACTED_NAME = "gtid_extracted";
const std::string Certifier::CERTIFICATION_INFO_ERROR_NAME =
"certification_info_error";
static void *launch_broadcast_thread(void *arg) {
Certifier_broadcast_thread *handler = (Certifier_broadcast_thread *)arg;
handler->dispatcher();
return nullptr;
}
Certifier_broadcast_thread::Certifier_broadcast_thread()
: aborted(false),
broadcast_thd_state(),
broadcast_counter(0),
broadcast_gtid_executed_period(BROADCAST_GTID_EXECUTED_PERIOD) {
DBUG_EXECUTE_IF("group_replication_certifier_broadcast_thread_big_period",
{ broadcast_gtid_executed_period = 600; });
DBUG_EXECUTE_IF("group_replication_certifier_broadcast_thread_short_period",
{ broadcast_gtid_executed_period = 1; });
mysql_mutex_init(key_GR_LOCK_cert_broadcast_run, &broadcast_run_lock,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_GR_COND_cert_broadcast_run, &broadcast_run_cond);
mysql_mutex_init(key_GR_LOCK_cert_broadcast_dispatcher_run,
&broadcast_dispatcher_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_GR_COND_cert_broadcast_dispatcher_run,
&broadcast_dispatcher_cond);
}
Certifier_broadcast_thread::~Certifier_broadcast_thread() {
mysql_mutex_destroy(&broadcast_run_lock);
mysql_cond_destroy(&broadcast_run_cond);
mysql_mutex_destroy(&broadcast_dispatcher_lock);
mysql_cond_destroy(&broadcast_dispatcher_cond);
}
int Certifier_broadcast_thread::initialize() {
DBUG_TRACE;
mysql_mutex_lock(&broadcast_run_lock);
if (broadcast_thd_state.is_thread_alive()) {
mysql_mutex_unlock(&broadcast_run_lock); /* purecov: inspected */
return 0; /* purecov: inspected */
}
aborted = false;
if ((mysql_thread_create(key_GR_THD_cert_broadcast, &broadcast_pthd,
get_connection_attrib(), launch_broadcast_thread,
(void *)this))) {
/* purecov: begin inspected */
mysql_mutex_unlock(&broadcast_run_lock);
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CERT_BROADCAST_THREAD_CREATE_FAILED);
return 1;
/* purecov: end */
}
broadcast_thd_state.set_created();
while (broadcast_thd_state.is_alive_not_running()) {
DBUG_PRINT("sleep", ("Waiting for certifier broadcast thread to start"));
struct timespec abstime;
set_timespec(&abstime, 1);
mysql_cond_timedwait(&broadcast_run_cond, &broadcast_run_lock, &abstime);
}
mysql_mutex_unlock(&broadcast_run_lock);
return 0;
}
void Certifier_broadcast_thread::terminate() {
DBUG_TRACE;
mysql_mutex_lock(&broadcast_run_lock);
if (broadcast_thd_state.is_thread_dead()) {
mysql_mutex_unlock(&broadcast_run_lock);
return;
}
aborted = true;
while (broadcast_thd_state.is_thread_alive()) {
DBUG_PRINT("loop", ("killing certifier broadcast thread"));
mysql_mutex_lock(&broadcast_thd->LOCK_thd_data);
// awake the cycle
mysql_mutex_lock(&broadcast_dispatcher_lock);
mysql_cond_broadcast(&broadcast_dispatcher_cond);
mysql_mutex_unlock(&broadcast_dispatcher_lock);
broadcast_thd->awake(THD::NOT_KILLED);
mysql_mutex_unlock(&broadcast_thd->LOCK_thd_data);
struct timespec abstime;
set_timespec(&abstime, 1);
mysql_cond_timedwait(&broadcast_run_cond, &broadcast_run_lock, &abstime);
}
mysql_mutex_unlock(&broadcast_run_lock);
}
void Certifier_broadcast_thread::dispatcher() {
DBUG_TRACE;
// Thread context operations
THD *thd = new THD;
my_thread_init();
thd->set_new_thread_id();
thd->thread_stack = (char *)&thd;
thd->store_globals();
global_thd_manager_add_thd(thd);
broadcast_thd = thd;
mysql_mutex_lock(&broadcast_run_lock);
broadcast_thd_state.set_running();
mysql_cond_broadcast(&broadcast_run_cond);
mysql_mutex_unlock(&broadcast_run_lock);
LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_CERT_BROADCAST_THREAD_STARTED);
while (!aborted) {
// Increase Group Replication feature usage every 10 minutes.
if (broadcast_counter % 600 == 0 ||
DBUG_EVALUATE_IF("rpl_opt_tracker_small_tracking_period", true,
false)) {
++opt_option_tracker_usage_group_replication_plugin;
}
// Broadcast Transaction identifiers every 30 seconds
if (broadcast_counter % 30 == 0) {
applier_module->get_pipeline_stats_member_collector()
->set_send_transaction_identifiers();
if (applier_module->is_applier_thread_waiting()) {
applier_module->get_pipeline_stats_member_collector()
->clear_transactions_waiting_apply();
}
}
applier_module->run_flow_control_step();
if (broadcast_counter % broadcast_gtid_executed_period == 0) {
broadcast_gtid_executed();
}
Certification_handler *cert = applier_module->get_certification_handler();
Certifier_interface *cert_module = (cert ? cert->get_certifier() : nullptr);
// garbage_collect() is capable to identify if all information required
// for it to run is already delivered to this member.
if (cert_module) {
cert_module->garbage_collect();
}
mysql_mutex_lock(&broadcast_dispatcher_lock);
if (aborted) {
mysql_mutex_unlock(&broadcast_dispatcher_lock); /* purecov: inspected */
break; /* purecov: inspected */
}
struct timespec abstime;
set_timespec(&abstime, 1);
mysql_cond_timedwait(&broadcast_dispatcher_cond, &broadcast_dispatcher_lock,
&abstime);
mysql_mutex_unlock(&broadcast_dispatcher_lock);
broadcast_counter++;
}
Gcs_interface_factory::cleanup_thread_communication_resources(
Gcs_operations::get_gcs_engine());
thd->release_resources();
global_thd_manager_remove_thd(thd);
delete thd;
my_thread_end();
mysql_mutex_lock(&broadcast_run_lock);
broadcast_thd_state.set_terminated();
mysql_cond_broadcast(&broadcast_run_cond);
mysql_mutex_unlock(&broadcast_run_lock);
LogPluginErr(SYSTEM_LEVEL, ER_GRP_RPL_CERT_BROADCAST_THREAD_STOPPED);
my_thread_exit(nullptr);
}
int Certifier_broadcast_thread::broadcast_gtid_executed() {
DBUG_TRACE;
/*
Member may be still joining group so we need to check if:
1) communication interfaces are ready to be used;
2) member is ONLINE, that is, distributed recovery is complete.
*/
if (local_member_info == nullptr) return 0; /* purecov: inspected */
Group_member_info::Group_member_status member_status =
local_member_info->get_recovery_status();
if (member_status != Group_member_info::MEMBER_ONLINE &&
member_status != Group_member_info::MEMBER_IN_RECOVERY)
return 0;
int error = 0;
uchar *encoded_gtid_executed = nullptr;
size_t length;
get_server_encoded_gtid_executed(&encoded_gtid_executed, &length);
Gtid_Executed_Message gtid_executed_message;
std::vector<uchar> encoded_gtid_executed_message;
gtid_executed_message.append_gtid_executed(encoded_gtid_executed, length);
enum enum_gcs_error send_err =
gcs_module->send_message(gtid_executed_message, true);
if (send_err == GCS_MESSAGE_TOO_BIG) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_BROADCAST_COMMIT_MSSG_TOO_BIG); /* purecov: inspected */
error = 1; /* purecov: inspected */
} else if (send_err == GCS_NOK) {
LogPluginErr(
INFORMATION_LEVEL,
ER_GRP_RPL_BROADCAST_COMMIT_TRANS_MSSG_FAILED); /* purecov: inspected */
error = 1; /* purecov: inspected */
}
#if !defined(NDEBUG)
char *encoded_gtid_executed_string =
encoded_gtid_set_to_string(encoded_gtid_executed, length);
DBUG_PRINT("info", ("Certifier broadcast executed_set: %s",
encoded_gtid_executed_string));
my_free(encoded_gtid_executed_string);
#endif
my_free(encoded_gtid_executed);
return error;
}
Certifier::Certifier()
: initialized(false),
certification_info(
Malloc_allocator<std::pair<const std::string, Gtid_set_ref *>>(
key_certification_info)),
positive_cert(0),
negative_cert(0),
parallel_applier_last_committed_global(1),
parallel_applier_last_sequence_number(1),
parallel_applier_sequence_number(2),
certifying_already_applied_transactions(false),
conflict_detection_enable(!local_member_info->in_primary_mode()) {
last_conflict_free_transaction.clear();
#if !defined(NDEBUG)
certifier_garbage_collection_block = false;
/*
Debug flag to block the garbage collection and discard incoming stable
set messages while garbage collection is on going.
*/
DBUG_EXECUTE_IF("certifier_garbage_collection_block",
certifier_garbage_collection_block = true;);
same_member_message_discarded = false;
/*
Debug flag to check for similar member sending multiple messages.
*/
DBUG_EXECUTE_IF("certifier_inject_duplicate_certifier_data_message",
same_member_message_discarded = true;);
#endif
certification_info_tsid_map = new Tsid_map(nullptr);
incoming = new Synchronized_queue<Data_packet *>(key_certification_data_gc);
stable_gtid_set_lock = new Checkable_rwlock(
#ifdef HAVE_PSI_INTERFACE
key_GR_RWLOCK_cert_stable_gtid_set
#endif
);
stable_tsid_map = new Tsid_map(stable_gtid_set_lock);
stable_gtid_set = new Gtid_set(stable_tsid_map, stable_gtid_set_lock);
broadcast_thread = new Certifier_broadcast_thread();
group_gtid_tsid_map = new Tsid_map(nullptr);
group_gtid_executed = new Gtid_set(group_gtid_tsid_map, nullptr);
group_gtid_extracted = new Gtid_set(group_gtid_tsid_map, nullptr);
mysql_mutex_init(key_GR_LOCK_certification_info, &LOCK_certification_info,
MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_GR_LOCK_cert_members, &LOCK_members, MY_MUTEX_INIT_FAST);
}
Certifier::~Certifier() {
mysql_mutex_lock(&LOCK_certification_info);
initialized = false;
broadcast_thread->terminate();
delete broadcast_thread;
clear_certification_info();
delete certification_info_tsid_map;
delete stable_gtid_set;
delete stable_tsid_map;
delete stable_gtid_set_lock;
delete group_gtid_executed;
delete group_gtid_extracted;
delete group_gtid_tsid_map;
mysql_mutex_unlock(&LOCK_certification_info);
mysql_mutex_lock(&LOCK_members);
clear_members();
clear_incoming();
mysql_mutex_unlock(&LOCK_members);
delete incoming;
mysql_mutex_destroy(&LOCK_certification_info);
mysql_mutex_destroy(&LOCK_members);
}
int Certifier::initialize_server_gtid_set(bool get_server_gtid_retrieved) {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_certification_info);
int error = 0;
Get_system_variable *get_system_variable = nullptr;
std::string gtid_executed;
std::string applier_retrieved_gtids;
gr::Gtid_tsid group_tsid;
const char *group_name = get_group_name_var();
gr::Gtid_tsid view_tsid;
const char *view_uuid = get_view_change_uuid_var();
if (group_tsid.from_cstring(group_name) == 0) {
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_GROUP_NAME_PARSE_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
group_gtid_tsid_map_group_sidno = group_gtid_tsid_map->add_tsid(group_tsid);
if (group_gtid_tsid_map_group_sidno < 0) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_ADD_GRPSID_TO_GRPGTIDSID_MAP_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
if (group_gtid_executed->ensure_sidno(group_gtid_tsid_map_group_sidno) !=
RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_UPDATE_GRPGTID_EXECUTED_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
if (group_gtid_extracted->ensure_sidno(group_gtid_tsid_map_group_sidno) !=
RETURN_STATUS_OK) {
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_DONOR_TRANS_INFO_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
if (strcmp(view_uuid, "AUTOMATIC") == 0) {
views_sidno_group_representation = group_gtid_tsid_map_group_sidno;
views_sidno_server_representation = get_group_sidno();
} else {
if (view_tsid.from_cstring(view_uuid) == 0) {
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_VIEW_CHANGE_UUID_PARSE_ERROR);
error = 1;
goto end;
/* purecov: end */
}
views_sidno_group_representation = group_gtid_tsid_map->add_tsid(view_tsid);
if (views_sidno_group_representation < 0) {
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_ADD_VIEW_CHANGE_UUID_TO_GRP_SID_MAP_ERROR);
error = 1;
goto end;
/* purecov: end */
}
views_sidno_server_representation = get_view_change_sidno();
if (group_gtid_executed->ensure_sidno(views_sidno_group_representation) !=
RETURN_STATUS_OK) {
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_UPDATE_GRPGTID_VIEW_CHANGE_UUID_EXECUTED_ERROR);
error = 1;
goto end;
/* purecov: end */
}
if (group_gtid_extracted->ensure_sidno(views_sidno_group_representation) !=
RETURN_STATUS_OK) {
/* purecov: begin inspected */
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_DONOR_VIEW_CHANGE_UUID_TRANS_INFO_ERROR);
error = 1;
goto end;
/* purecov: end */
}
}
get_system_variable = new Get_system_variable();
error = get_system_variable->get_global_gtid_executed(gtid_executed);
DBUG_EXECUTE_IF("gr_server_gtid_executed_extraction_error", error = 1;);
if (error) {
LogPluginErr(WARNING_LEVEL, ER_GRP_RPL_ERROR_FETCHING_GTID_EXECUTED_SET);
goto end;
}
if (group_gtid_executed->add_gtid_text(gtid_executed.c_str()) !=
RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_ADD_GTID_TO_GRPGTID_EXECUTED_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
if (get_server_gtid_retrieved) {
Replication_thread_api applier_channel("group_replication_applier");
if (applier_channel.get_retrieved_gtid_set(applier_retrieved_gtids)) {
LogPluginErr(WARNING_LEVEL,
ER_GRP_RPL_ERROR_FETCHING_GTID_SET); /* purecov: inspected */
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
if (group_gtid_executed->add_gtid_text(applier_retrieved_gtids.c_str()) !=
RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_ADD_RETRIEVED_SET_TO_GRP_GTID_EXECUTED_ERROR); /* purecov:
inspected
*/
error = 1; /* purecov: inspected */
goto end; /* purecov: inspected */
}
}
gtid_generator.recompute(*get_group_gtid_set());
end:
delete get_system_variable;
return error;
}
void Certifier::add_to_group_gtid_executed_internal(rpl_sidno sidno,
rpl_gno gno) {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_certification_info);
group_gtid_executed->_add_gtid(sidno, gno);
/*
We only need to track certified transactions on
group_gtid_extracted while:
1) certifier is handling already applied transactions
on distributed recovery procedure;
2) the transaction does have a group GTID.
3) the transactions use the view UUID
*/
if (certifying_already_applied_transactions &&
(sidno == group_gtid_tsid_map_group_sidno ||
sidno == views_sidno_group_representation))
group_gtid_extracted->_add_gtid(sidno, gno);
}
void Certifier::clear_certification_info() {
mysql_mutex_assert_owner(&LOCK_certification_info);
for (Certification_info::iterator it = certification_info.begin();
it != certification_info.end(); ++it) {
// We can only delete the last reference.
if (it->second->unlink() == 0) {
/*
Claim Gtid_set_ref used memory to
`thread/group_rpl/THD_certifier_broadcast` thread, since this is thread
that does release the memory.
*/
it->second->claim_memory_ownership(true);
delete it->second;
}
}
certification_info.clear();
}
void Certifier::clear_incoming() {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_members);
while (!this->incoming->empty()) {
Data_packet *packet = nullptr;
this->incoming->pop(&packet);
delete packet;
}
}
void Certifier::clear_members() {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_members);
members.clear();
}
int Certifier::initialize(ulonglong gtid_assignment_block_size) {
DBUG_TRACE;
int error = 0;
MUTEX_LOCK(guard, &LOCK_certification_info);
if (is_initialized()) {
return 1;
}
assert(gtid_assignment_block_size >= 1);
gtid_generator.initialize(gtid_assignment_block_size);
/*
We need to initialize group_gtid_executed from both GTID_EXECUTED
and applier retrieved GTID set to consider the already certified
but not yet applied GTIDs, that may exist on applier relay log when
this member is the one bootstrapping the group.
*/
if (initialize_server_gtid_set(true)) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CERTIFICATION_INITIALIZATION_FAILURE);
return 1;
}
error = broadcast_thread->initialize();
initialized = !error;
return error;
}
void Certifier::update_parallel_applier_indexes(
bool update_parallel_applier_last_committed_global,
bool increment_parallel_applier_sequence_number) {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_certification_info);
assert(parallel_applier_last_committed_global <
parallel_applier_sequence_number);
assert(parallel_applier_last_sequence_number <
parallel_applier_sequence_number);
assert(parallel_applier_last_committed_global <=
parallel_applier_last_sequence_number);
if (update_parallel_applier_last_committed_global) {
parallel_applier_last_committed_global =
(increment_parallel_applier_sequence_number
? parallel_applier_sequence_number
: parallel_applier_last_sequence_number);
}
if (increment_parallel_applier_sequence_number) {
parallel_applier_last_sequence_number = parallel_applier_sequence_number++;
}
assert(parallel_applier_last_committed_global <
parallel_applier_sequence_number);
assert(parallel_applier_last_sequence_number <
parallel_applier_sequence_number);
assert(parallel_applier_last_committed_global <=
parallel_applier_last_sequence_number);
}
namespace {
/// @brief This function will add a given tsid into the gtid_set
/// In case adding tsid fails, plugin will report error_code
/// @param tsid Tsid to be added into gtid_set
/// @param gtid_set Gtid set into which tsid will be added
std::pair<rpl_sidno, mysql::utils::Return_status>
add_tsid_to_gtid_set_and_sid_map(gr::Gtid_tsid &tsid, Gtid_set >id_set) {
// Add received transaction GTID tsid to TSID map in gtid_set
auto certification_state = mysql::utils::Return_status::ok;
auto sidno = gtid_set.get_tsid_map()->add_tsid(tsid);
if (sidno < 1) {
LogPluginErr(ERROR_LEVEL, ER_OUT_OF_RESOURCES);
certification_state = mysql::utils::Return_status::error;
sidno = 0;
}
if (gtid_set.ensure_sidno(sidno) != RETURN_STATUS_OK) {
LogPluginErr(ERROR_LEVEL, ER_OUT_OF_RESOURCES);
certification_state = mysql::utils::Return_status::error;
sidno = 0;
}
return std::make_pair(sidno, certification_state);
}
} // namespace
std::tuple<rpl_sidno, rpl_sidno, rpl_sidno, mysql::utils::Return_status>
Certifier::extract_sidno(Gtid_log_event &gle, bool is_gtid_specified,
Gtid_set &snapshot_gtid_set,
Gtid_set &group_gtid_set) {
std::tuple<rpl_sidno, rpl_sidno, rpl_sidno, mysql::utils::Return_status>
result = std::make_tuple(0, 0, 0, mysql::utils::Return_status::error);
// Get the tsid: either the specified one or the group's one.
gr::Gtid_tsid tsid;
rpl_sidno server_sidno;
if (is_gtid_specified) {
// SPECIFIED GTID
tsid = gle.get_tsid();
server_sidno = gle.get_sidno(true);
} else {
// AUTOMATIC tagged/untagged
const char *group_name = get_group_name_var();
server_sidno = get_group_sidno();
std::ignore = tsid.from_cstring(group_name);
if (gle.is_tagged()) {
tsid.set_tag(gle.get_tsid().get_tag());
server_sidno = get_sidno_from_global_tsid_map(tsid);
}
}
if (server_sidno == -1) {
LogPluginErr(ERROR_LEVEL, ER_OUT_OF_RESOURCES);
return result;
}
// get snapshot sidno
auto [snapshot_sidno, snapshot_add_code] =
add_tsid_to_gtid_set_and_sid_map(tsid, snapshot_gtid_set);
if (snapshot_add_code == mysql::utils::Return_status::error) {
return result;
}
// get group sidno
auto [group_sidno, group_add_code] =
add_tsid_to_gtid_set_and_sid_map(tsid, group_gtid_set);
if (group_add_code == mysql::utils::Return_status::error) {
return result;
}
return std::make_tuple(group_sidno, snapshot_sidno, server_sidno,
mysql::utils::Return_status::ok);
}
Certified_gtid Certifier::end_certification_result(
const rpl_sidno >id_global_sidno, const rpl_sidno >id_group_sidno,
const rpl_gno &generated_gno, bool is_gtid_specified,
bool local_transaction, const Certification_result &certification_result) {
rpl_gno gno = generated_gno;
if (certification_result == Certification_result::error) {
gno = -1;
} else if (certification_result == Certification_result::negative) {
gno = 0;
}
DBUG_PRINT(
"info",
("Group replication Certifier: certification result: %" PRId64, gno));
Gtid server_gtid, group_gtid;
server_gtid.clear();
group_gtid.clear();
server_gtid.sidno = gtid_global_sidno;
group_gtid.sidno = gtid_group_sidno;
server_gtid.gno = group_gtid.gno = gno;
return Certified_gtid(server_gtid, group_gtid, is_gtid_specified,
local_transaction, certification_result);
}
Certification_result Certifier::add_writeset_to_certification_info(
int64 &transaction_last_committed, Gtid_set *snapshot_version,
std::list<const char *> *write_set, bool local_transaction) {
// Only consider remote transactions for parallel applier indexes.
int64 transaction_sequence_number =
local_transaction ? -1 : parallel_applier_sequence_number;
Gtid_set_ref *snapshot_version_value = new Gtid_set_ref(
certification_info_tsid_map, transaction_sequence_number);
if (snapshot_version_value->add_gtid_set(snapshot_version) !=
RETURN_STATUS_OK) {
delete snapshot_version_value;
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_UPDATE_TRANS_SNAPSHOT_REF_VER_ERROR);
return Certification_result::error;
}
for (std::list<const char *>::iterator it = write_set->begin();
it != write_set->end(); ++it) {
int64 item_previous_sequence_number = -1;
add_item(*it, snapshot_version_value, &item_previous_sequence_number);
/*
Exclude previous sequence number that are smaller than global
last committed and that are the current sequence number.
transaction_last_committed is initialized with
parallel_applier_last_committed_global on the beginning of
"certify" method.
*/
if (item_previous_sequence_number > transaction_last_committed &&
item_previous_sequence_number != parallel_applier_sequence_number)
transaction_last_committed = item_previous_sequence_number;
}
/*
The memory used by Gtid_set_ref is allocated by
`thread/group_rpl/THD_applier_module_receiver`, though it will be released
by `thread/group_rpl/THD_certifier_broadcast` thread. To avoid untracked
memory release on `thread/group_rpl/THD_applier_module_receiver` we do
dissociate this used memory from this thread.
*/
snapshot_version_value->claim_memory_ownership(false);
return Certification_result::positive;
}
namespace {
/*
Only throw the error if the gtid is both on group_gtid_executed
and executed_gtids due to the following scenario(bug#34157846):
It is possible that gtid can be present in group_gtid_executed but
not in executed_gtids(i.e the gtid is not logged in the binary
log).
1)replica-worker - starts transaction execution,
slave_worker_exec_event()->..calls
group_replication_trans_before_commit.
2)gr-applier - certifies the transaction and add gtid to
group_gtid_executed.
3)replica-worker - proceeds to commit but commit order deadlock
occurred and rollbacked the transaction.
4)replica-worker - retries the transaction,
i) calls group_replication_trans_before_commit.
ii) gr-applier tries to certify again the retried
transaction.
iii) retry certification would fail, if there is no
check on gtid present in both executed_gtids
and group_gtid_executed, since gtid is already
added to group_gtid_executed as part of initial
try(step 2).
*/
[[NODISCARD]] Certification_result check_gtid_collision(
rpl_sidno gtid_group_sidno, rpl_sidno gtid_global_sidno, rpl_gno gno,
Gtid_set &group_gtid_executed, const std::string &sid_str) {
if (group_gtid_executed.contains_gtid(gtid_group_sidno, gno)) {
// sidno is relative to global_tsid_map.
Gtid gtid = {gtid_global_sidno, gno};
if (is_gtid_committed(gtid)) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_GTID_ALREADY_USED, sid_str.c_str(),
gno);
return Certification_result::negative;
}
}
return Certification_result::positive;
}
} // namespace
void Certifier::update_transaction_dependency_timestamps(
Gtid_log_event &gle, bool has_write_set, bool has_write_set_large_size,
int64 transaction_last_committed) {
bool update_parallel_applier_last_committed_global = false;
bool is_empty_transaction = false;
/*
'CREATE TABLE ... AS SELECT' is considered a DML, though in reality it
is DDL + DML, which write-sets do not capture all dependencies.
It is flagged through gle->last_committed and gle->sequence_number so
that it is only executed on parallel applier after all precedent
transactions like any other DDL.
*/
if (0 == gle.last_committed && 0 == gle.sequence_number) {
update_parallel_applier_last_committed_global = true;
}
/*
Empty transactions, despite not having write-set, can be
applied in parallel with any other transaction.
Empty transactions are assigned `last_committed = -1` by GR
before send.
*/
else if (!has_write_set && -1 == gle.last_committed) {
is_empty_transaction = true;
}
if (!is_empty_transaction &&
(!has_write_set || has_write_set_large_size ||
update_parallel_applier_last_committed_global)) {
/*
DDL does not have write-set, so we need to ensure that it
is applied without any other transaction in parallel.
*/
transaction_last_committed = parallel_applier_sequence_number - 1;
}
gle.last_committed = transaction_last_committed;
gle.sequence_number = parallel_applier_sequence_number;
assert(gle.last_committed >= 0);
assert(gle.sequence_number > 0);
assert(gle.last_committed < gle.sequence_number);
update_parallel_applier_indexes(
(!is_empty_transaction &&
(!has_write_set || has_write_set_large_size ||
update_parallel_applier_last_committed_global)),
true);
/*
Every Group Replication is started and the first remote transaction
is queued on replication_group_applier channel, we need to reset
applier internal previous sequence_number. Otherwise, if during the
start there was backlog to apply on replication_group_applier channel,
the previous sequence_number will be greater than the new one, which
is considered a error case.
Previously this reset was done by the View_change_log_event transaction,
but now that transaction may not be logged.
*/
if (is_first_remote_transaction_certified) {
is_first_remote_transaction_certified = false;
gle.last_committed = 0;
gle.sequence_number = 0;
}
}
#ifndef NDEBUG
void debug_print_group_gtid_sets(const Gtid_set &group_gtid_executed,
const Gtid_set &group_gtid_extracted,
bool set_value) {
char *group_gtid_executed_string = nullptr;
char *group_gtid_extracted_string = nullptr;
group_gtid_executed.to_string(&group_gtid_executed_string, true);
group_gtid_extracted.to_string(&group_gtid_extracted_string, true);
DBUG_PRINT(
"info",
("Set certifying_already_applied_transactions to %d. "
"group_gtid_executed: \"%s\"; group_gtid_extracted_string: \"%s\"",
set_value, group_gtid_executed_string, group_gtid_extracted_string));
my_free(group_gtid_executed_string);
my_free(group_gtid_extracted_string);
}
#endif // NDEBUG
Certified_gtid Certifier::certify(Gtid_set *snapshot_version,
std::list<const char *> *write_set,
bool is_gtid_specified,
const char *member_uuid, Gtid_log_event *gle,
bool local_transaction) {
DBUG_TRACE;
rpl_sidno gtid_group_sidno = 0, gtid_snapshot_sidno = 0,
gtid_global_sidno = 0;
rpl_gno gtid_gno = 0;
const bool has_write_set = !write_set->empty();
bool write_set_large_size = false;
auto end_certification = [
&is_gtid_specified, >id_global_sidno, >id_group_sidno, >id_gno,
local_transaction,
this
](Certification_result result) -> auto{
update_certified_transaction_count(result == Certification_result::positive,
local_transaction);
return end_certification_result(gtid_global_sidno, gtid_group_sidno,
gtid_gno, is_gtid_specified,
local_transaction, result);
};
if (!is_initialized()) {
return end_certification(Certification_result::error);
}
MUTEX_LOCK(guard, &LOCK_certification_info);
int64 transaction_last_committed = parallel_applier_last_committed_global;
DBUG_EXECUTE_IF("certifier_force_1_negative_certification", {
DBUG_SET("-d,certifier_force_1_negative_certification");
return end_certification(Certification_result::negative);
});
if (conflict_detection_enable) {
for (std::list<const char *>::iterator it = write_set->begin();
it != write_set->end(); ++it) {
Gtid_set *certified_write_set_snapshot_version =
get_certified_write_set_snapshot_version(*it);
/*
If the previous certified transaction snapshot version is not
a subset of the incoming transaction snapshot version, the current
transaction was executed on top of outdated data, so it will be
negatively certified. Otherwise, this transaction is marked
certified and goes into applier.
*/
if (certified_write_set_snapshot_version != nullptr &&
!certified_write_set_snapshot_version->is_subset(snapshot_version))
return end_certification(Certification_result::negative);
}
}
if (certifying_already_applied_transactions &&
!group_gtid_extracted->is_subset_not_equals(group_gtid_executed)) {
certifying_already_applied_transactions = false;
#ifndef NDEBUG
debug_print_group_gtid_sets(*group_gtid_executed, *group_gtid_extracted,
false);
#endif
}
mysql::utils::Return_status certification_state;
std::tie(gtid_group_sidno, gtid_snapshot_sidno, gtid_global_sidno,
certification_state) =
extract_sidno(*gle, is_gtid_specified, *snapshot_version,
*group_gtid_executed);
if (certification_state == mysql::utils::Return_status::error) {
return end_certification(Certification_result::error);
}
/*
If the current transaction doesn't have a specified GTID, one
for group UUID will be generated.
This situation happens when transactions are executed with
GTID_NEXT equal to AUTOMATIC_GTID (the default case).
*/
if (!is_gtid_specified) {
mysql::utils::Return_status gno_generation_result;
std::tie(gtid_gno, gno_generation_result) =
gtid_generator.get_next_available_gtid(member_uuid, gtid_group_sidno,
*get_group_gtid_set());
if (gno_generation_result != mysql::utils::Return_status::ok) {
return end_certification(Certification_result::error);
}
DBUG_PRINT("info", ("Group replication Certifier: generated transaction "
"identifier: %" PRId64,
gtid_gno));
} else {
gtid_gno = gle->get_gno();
auto tsid_str = gle->get_tsid().to_string();
auto gtid_collision_check_code =
check_gtid_collision(gtid_group_sidno, gtid_global_sidno, gtid_gno,
*group_gtid_executed, tsid_str);
if (gtid_collision_check_code == Certification_result::negative) {
return end_certification(Certification_result::negative);
}
DBUG_PRINT(
"info",
("Group replication Certifier: there was no transaction identifier "
"generated since transaction already had a GTID specified"));
}
// Add received transaction GTID to transaction snapshot version.
snapshot_version->_add_gtid(gtid_snapshot_sidno, gtid_gno);
// Store last conflict free transaction identification.
// sidno must be relative to group_gtid_sid_map.
last_conflict_free_transaction.set(gtid_group_sidno, gtid_gno);
/*
When the group is in single-primary mode and
group_replication_preemptive_garbage_collection is enabled, if the number
of write-sets on a transaction is equal or greater than
group_replication_preemptive_garbage_collection_rows_threshold, the
write-sets are not added to certification info and the last_committed
timestamps is incremented.
*/
if (get_single_primary_mode_var() &&
get_preemptive_garbage_collection_var() &&
write_set->size() >=
get_preemptive_garbage_collection_rows_threshold_var()) {
write_set_large_size = true;
}
/*
Add the transaction's write set to certification info.
*/
if (has_write_set && !write_set_large_size) {
auto add_writeset_code = add_writeset_to_certification_info(
transaction_last_committed, snapshot_version, write_set,
local_transaction);
if (add_writeset_code != Certification_result::positive) {
return end_certification(Certification_result::error);
}
}
// Update parallel applier indexes for local transactions
if (!local_transaction) {
update_transaction_dependency_timestamps(
*gle, has_write_set, write_set_large_size, transaction_last_committed);
}
return end_certification(Certification_result::positive);
}
int Certifier::add_gtid_to_group_gtid_executed(const Gtid >id) {
DBUG_TRACE;
if (!is_initialized()) {
return 1;
}
MUTEX_LOCK(guard, &LOCK_certification_info);
add_to_group_gtid_executed_internal(gtid.sidno, gtid.gno);
return 0;
}
const Gtid_set *Certifier::get_group_gtid_set() const {
return certifying_already_applied_transactions ? group_gtid_extracted
: group_gtid_executed;
}
Gtid_set *Certifier::get_group_gtid_set() {
return certifying_already_applied_transactions ? group_gtid_extracted
: group_gtid_executed;
}
void Certifier::gtid_intervals_computation() {
DBUG_TRACE;
if (!is_initialized()) {
return;
}
mysql_mutex_lock(&LOCK_certification_info);
if (gtid_generator.get_gtid_assignment_block_size() > 1) {
gtid_generator.recompute(*get_group_gtid_set());
}
mysql_mutex_unlock(&LOCK_certification_info);
}
bool Certifier::add_item(const char *item, Gtid_set_ref *snapshot_version,
int64 *item_previous_sequence_number) {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_certification_info);
bool error = true;
std::string key(item);
Certification_info::iterator it = certification_info.find(key);
snapshot_version->link();
if (it == certification_info.end()) {
std::pair<Certification_info::iterator, bool> ret =
certification_info.insert(
std::pair<std::string, Gtid_set_ref *>(key, snapshot_version));
error = !ret.second;
} else {
*item_previous_sequence_number =
it->second->get_parallel_applier_sequence_number();
if (it->second->unlink() == 0) {
/*
Claim Gtid_set_ref used memory to
`thread/group_rpl/THD_certifier_broadcast` thread, since this is thread
that does release the memory.
*/
it->second->claim_memory_ownership(true);
delete it->second;
}
it->second = snapshot_version;
error = false;
}
DBUG_EXECUTE_IF("group_replication_certifier_after_add_item", {
const char act[] =
"now signal "
"signal.group_replication_certifier_after_add_item_reached "
"wait_for "
"signal.group_replication_certifier_after_add_item_continue";
assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
});
return error;
}
Gtid_set *Certifier::get_certified_write_set_snapshot_version(
const char *item) {
DBUG_TRACE;
mysql_mutex_assert_owner(&LOCK_certification_info);
if (!is_initialized()) return nullptr; /* purecov: inspected */
Certification_info::iterator it;
std::string item_str(item);
it = certification_info.find(item_str);
if (it == certification_info.end())
return nullptr;
else
return it->second;
}
int Certifier::get_group_stable_transactions_set_string(char **buffer,
size_t *length) {
DBUG_TRACE;
int error = 1;
if (!is_initialized()) {
return 1;
}
/*
Stable transactions set may not be accurate during recovery,
thence we do not externalize it on
performance_schema.replication_group_member_stats table.
*/
if (local_member_info->get_recovery_status() ==
Group_member_info::MEMBER_IN_RECOVERY) {
return 0;
}
char *m_buffer = nullptr;
int m_length = stable_gtid_set->to_string(&m_buffer, true);
if (m_length >= 0) {
*buffer = m_buffer;
*length = static_cast<size_t>(m_length);
error = 0;
} else
my_free(m_buffer); /* purecov: inspected */
return error;
}
void Certifier::garbage_collect(Gtid_set *executed_gtid_set,
bool on_member_join) {
DBUG_TRACE;
bool update_metrics = false;
if (!is_initialized()) return; /* purecov: inspected */
/* Start garbage collection duration. */
const auto garbage_collection_begin = Metrics_handler::get_current_time();
if (!on_member_join) {
assert(nullptr == executed_gtid_set);
if (get_single_primary_mode_var() &&
get_preemptive_garbage_collection_var() &&
get_certification_info_size() >=
get_preemptive_garbage_collection_rows_threshold_var()) {
garbage_collect_internal(nullptr, true);
update_metrics = true;
}
if (intersect_members_gtid_executed_and_garbage_collect()) {
update_metrics = true;
}
} else {
/* executed_gtid_set only is empty when gtid_executed don't have
* any change, for example, when a group do boostrap without any
* GTID.
* To avoid don't have a increment on garbage collector counter on
* a view change we also do it when executed_gtid_set is empty.
*/
update_metrics = true;
if (!executed_gtid_set->is_empty()) {
garbage_collect_internal(executed_gtid_set);
}
}
if (update_metrics) {
/* Update garbage collection metrics. */
const auto garbage_collection_end = Metrics_handler::get_current_time();
metrics_handler->add_garbage_collection_run(garbage_collection_begin,
garbage_collection_end);
}
}
void Certifier::garbage_collect_internal(Gtid_set *executed_gtid_set,
bool preemptive) {
DBUG_TRACE;
if (!is_initialized()) {
return;
}
/*
This debug option works on every call to garbage_collect
by disabling the garbage collection.
Calls to garbage collect happen:
1) when a member joins.
2) periodically, using the the intersection of members gtid executed.
Period that can be controlled by the debug flag.
`group_replication_certifier_broadcast_thread_big_period`.
3) preemptively, please see option
`group_replication_preemptive_garbage_collection`.
*/
DBUG_EXECUTE_IF("group_replication_do_not_clear_certification_database",
{ return; };);
/*
If `executed_gtid_set` is already contained on `stable_gtid_set`,
no new transactions were committed on all members after the last
garbage collection run, thence there is nothing to garbage collect
with `executed_gtid_set`.
*/
if (!preemptive &&
update_stable_set(*executed_gtid_set) != Certifier::STABLE_SET_UPDATED) {
return;
}
/*
Data structures to hold a copy of certified gtids so that we can
use them without require to hold `LOCK_certification_info`.
*/
bool update_stable_set_after_preemptive_garbage_collection = false;
Tsid_map certified_gtids_copy_sid_map(nullptr);
Gtid_set certified_gtids_copy_set(&certified_gtids_copy_sid_map, nullptr);
{
MUTEX_LOCK(lock, &LOCK_certification_info);
if (preemptive) {
assert(nullptr == executed_gtid_set);
if (!get_single_primary_mode_var() ||
!get_preemptive_garbage_collection_var()) {
return;
}
/*
On preemptive garbage collect runs we use group_gtid_executed,
we are on single primary so if transactions are certified by
the group we can add to stable gtid set and clear all certification
info.
*/
clear_certification_info();
update_stable_set_after_preemptive_garbage_collection = true;
certified_gtids_copy_set.add_gtid_set(group_gtid_executed);
}
else {
/*
When a transaction "t" is applied to all group members and for all
ongoing, i.e., not yet committed or aborted transactions,
"t" was already committed when they executed (thus "t"
precedes them), then "t" is stable and can be removed from
the certification info.
*/
Certification_info::iterator it = certification_info.begin();
stable_gtid_set_lock->wrlock();
uint64 garbage_collector_counter =
metrics_handler->get_certification_garbage_collector_count();
DBUG_EXECUTE_IF("group_replication_garbage_collect_counter_overflow", {
DBUG_SET("-d,group_replication_garbage_collect_counter_overflow");
garbage_collector_counter = 0;
});
while (it != certification_info.end()) {
uint64 write_set_counter = it->second->get_garbage_collect_counter();
/*
we need to clear gtid_set_ref if marked with UINT64_MAX or
subset_not_equals of stable_gtid_set
*/
if (write_set_counter == UINT64_MAX ||
(write_set_counter < garbage_collector_counter &&
it->second->is_subset_not_equals(stable_gtid_set))) {
it->second->set_garbage_collect_counter(UINT64_MAX);
if (it->second->unlink() == 0) {
/*
Claim Gtid_set_ref used memory to
`thread/group_rpl/THD_certifier_broadcast` thread, since this is
thread that does release the memory.
*/
it->second->claim_memory_ownership(true);
delete it->second;
}
certification_info.erase(it++);
} else {
DBUG_EXECUTE_IF("group_replication_ci_rows_counter_high",
{ assert(write_set_counter > 0); });
it->second->set_garbage_collect_counter(garbage_collector_counter);
++it;
}
}
stable_gtid_set_lock->unlock();
}
/*
We need to update parallel applier indexes since we do not know
what write sets were purged, which may cause transactions
last committed to be incorrectly computed.
*/
update_parallel_applier_indexes(true, false);
#if !defined(NDEBUG)
/*
This part blocks the garbage collection process for 300 sec in order to
simulate the case that while garbage collection is going on, we should
skip the stable set messages round in order to prevent simultaneous
access to stable_gtid_set.
*/
if (certifier_garbage_collection_block) {
certifier_garbage_collection_block = false;
// my_sleep expects a given number of microseconds.
my_sleep(broadcast_thread->BROADCAST_GTID_EXECUTED_PERIOD * 1500000);
}
DBUG_EXECUTE_IF("group_replication_certifier_garbage_collection_ran", {
const char act[] =
"now signal "
"signal.group_replication_certifier_garbage_collection_finished";
assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
});
#endif
}
/*
Update stable set using a copy of certified gtids so that we dot not
require to hold `LOCK_certification_info`.
*/
if (preemptive && update_stable_set_after_preemptive_garbage_collection) {
update_stable_set(certified_gtids_copy_set);
}
/*
Applier channel received set does only contain the GTIDs of the
remote (committed by other members) transactions. On the long
term, the gaps may create performance issues on the received
set update. To avoid that, periodically, we update the received
set with the full set of transactions committed on the group,
closing the gaps.
*/
if (channel_add_executed_gtids_to_received_gtids(
applier_module_channel_name)) {
LogPluginErr(
WARNING_LEVEL,
ER_GRP_RPL_RECEIVED_SET_MISSING_GTIDS); /* purecov: inspected */
}
}
Certifier::enum_update_status Certifier::update_stable_set(
const Gtid_set &set) {
DBUG_TRACE;
Checkable_rwlock::Guard g(*stable_gtid_set_lock,
Checkable_rwlock::WRITE_LOCK);
if (set.is_subset(stable_gtid_set)) {
return STABLE_SET_ALREADY_CONTAINED;
}
if (stable_gtid_set->add_gtid_set(&set) != RETURN_STATUS_OK) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_SET_STABLE_TRANS_ERROR);
return STABLE_SET_ERROR;
}
return STABLE_SET_UPDATED;
}
int Certifier::handle_certifier_data(
const uchar *data, ulong len, const Gcs_member_identifier &gcs_member_id) {
DBUG_TRACE;
bool member_message_received = false;
if (!is_initialized()) return 1; /* purecov: inspected */
/*
On members recovering through clone the GTID_EXECUTED is only
updated after the server restart that finishes the procedure.
During that procedure they will periodically send the GTID_EXECUTED
that the server had once joined the group. This will restrain the
common set of transactions applied on all members, which in consequence
will render the certification garbage collection void.
As such, we only consider ONLINE members for the common set of
transactions applied on all members.
When recovering members change to ONLINE state, their certification
info will be updated with the one of the donor at the join, being
garbage collect on the future calls of this method.
*/
if (group_member_mgr->get_group_member_status_by_member_id(gcs_member_id) !=
Group_member_info::MEMBER_ONLINE) {
return 0;
}
mysql_mutex_lock(&LOCK_members);
std::string member_id = gcs_member_id.get_member_id();
#if !defined(NDEBUG)
if (same_member_message_discarded) {
/*
Injecting the member_id in the member's vector to simulate the case of
same member sending multiple messages.
*/
this->members.push_back(member_id);
}
#endif
const size_t number_of_members_online =
group_member_mgr->get_number_of_members_online();
if (this->members.size() != number_of_members_online) {
/*
We check for the member_id of the current message if it is present in
the member vector or not. If it is present, we will need to discard the
message. If not we will add the message in the incoming message
synchronized queue for stable set handling.
*/
std::vector<std::string>::iterator it;
it = std::find(members.begin(), members.end(), member_id);
if (it != members.end())
member_message_received = true;
else
this->members.push_back(member_id);
/*
Since member is not present we can queue this message.
*/
if (!member_message_received) {
this->incoming->push(
new Data_packet(data, len, key_certification_data_gc));
}
// else: ignore the message, no point in alerting the user about this.
}
#if !defined(NDEBUG)
if (same_member_message_discarded) {
/*
Clearing the flag here as the members vector is not cleaned above.
*/
same_member_message_discarded = false;
clear_members();
}
#endif
mysql_mutex_unlock(&LOCK_members);
return 0;
}
bool Certifier::intersect_members_gtid_executed_and_garbage_collect() {
DBUG_TRACE;
if (!is_initialized() || nullptr == group_member_mgr) {
return false;
}
/*
If the incoming message queue size is equal to the number of the ONLINE
members in the group, we are sure that each ONLINE member has sent
their gtid_executed. So we can go ahead with the stable set handling.
*/
mysql_mutex_lock(&LOCK_members);
const size_t incoming_size = this->incoming->size();
const size_t number_of_members_online =
group_member_mgr->get_number_of_members_online();
if (incoming_size < 1 || number_of_members_online < 1 ||
incoming_size != number_of_members_online) {
mysql_mutex_unlock(&LOCK_members);
return false;
}
Data_packet *packet = nullptr;
int error = 0;
Tsid_map tsid_map(nullptr);
Gtid_set executed_set(&tsid_map, nullptr);
/*
Compute intersection between all received sets.
*/
while (!error && !this->incoming->empty()) {
this->incoming->pop(&packet);
if (packet == nullptr) {
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_NULL_PACKET); /* purecov: inspected */
error = 1; /* purecov: inspected */
break; /* purecov: inspected */
}
uchar *payload = packet->payload;
Gtid_set member_set(&tsid_map, nullptr);
Gtid_set intersection_result(&tsid_map, nullptr);
if (member_set.add_gtid_encoding(payload, packet->len) !=
RETURN_STATUS_OK) {
LogPluginErr(ERROR_LEVEL,
ER_GRP_RPL_CANT_READ_GTID); /* purecov: inspected */
error = 1; /* purecov: inspected */
} else {
/*
First member set? If so we only need to add it to executed set.
*/
if (executed_set.is_empty()) {
if (executed_set.add_gtid_set(&member_set)) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_PROCESS_GTID_SET_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
}
} else {
/*
We have three sets:
member_set: the one sent from a given member;
executed_set: the one that contains the intersection of
the computed sets until now;
intersection_result: the intersection between set and
intersection_result.
So we compute the intersection between set and executed_set, and
set that value to executed_set to be used on the next intersection.
*/
if (member_set.intersection(&executed_set, &intersection_result) !=
RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_PROCESS_INTERSECTION_GTID_SET_ERROR); /* purecov:
inspected */
error = 1; /* purecov: inspected */
} else {
executed_set.clear();
if (executed_set.add_gtid_set(&intersection_result) !=
RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_PROCESS_GTID_SET_ERROR); /* purecov: inspected */
error = 1; /* purecov: inspected */
}
}
}
}
delete packet;
}
#if !defined(NDEBUG)
char *executed_set_string;
executed_set.to_string(&executed_set_string);
DBUG_PRINT("info",
("Certifier intersect_members_gtid_executed_and_garbage_collect: "
"executed_set: %s",
executed_set_string));
my_free(executed_set_string);
#endif
/*
Clearing the members to proceed with the next round of garbage
collection.
*/
clear_members();
mysql_mutex_unlock(&LOCK_members);
if (!error) {
garbage_collect_internal(&executed_set);
return true;
}
return false;
}
void Certifier::handle_view_change() {
DBUG_TRACE;
if (!is_initialized()) {
return;
}
mysql_mutex_lock(&LOCK_members);
clear_incoming();
clear_members();
mysql_mutex_unlock(&LOCK_members);
}
void Certifier::get_certification_info(
std::map<std::string, std::string> *cert_info) {
DBUG_TRACE;
if (!is_initialized()) {
return;
}
MUTEX_LOCK(guard, &LOCK_certification_info);
for (Certification_info::iterator it = certification_info.begin();
it != certification_info.end(); ++it) {
std::string key = it->first;
assert(key.compare(GTID_EXTRACTED_NAME) != 0);
size_t len = it->second->get_encoded_length();
uchar *buf = (uchar *)my_malloc(key_certification_data, len, MYF(0));
it->second->encode(buf);
std::string value(reinterpret_cast<const char *>(buf), len);
my_free(buf);
(*cert_info).insert(std::pair<std::string, std::string>(key, value));
}
// Add the group_gtid_executed to certification info sent to joiners.
size_t len = group_gtid_executed->get_encoded_length();
uchar *buf = (uchar *)my_malloc(key_certification_data, len, MYF(0));
group_gtid_executed->encode(buf);
std::string value(reinterpret_cast<const char *>(buf), len);
my_free(buf);
(*cert_info)
.insert(std::pair<std::string, std::string>(GTID_EXTRACTED_NAME, value));
}
bool Certifier::set_certification_info_recovery_metadata(
Recovery_metadata_message *recovery_metadata_message) {
/*
1. Get Compressed Certification info packet count from the received
recovery metadata.
*/
std::pair<Recovery_metadata_message::enum_recovery_metadata_message_error,
unsigned int>
payload_certification_info_packet_count_error =
recovery_metadata_message
->get_decoded_compressed_certification_info_packet_count();
/*
1.1. If certification info packet count is 0 which means certification info
payload is empty return false as recovery still need to process.
*/
if (payload_certification_info_packet_count_error.first ==
Recovery_metadata_message::enum_recovery_metadata_message_error::
ERR_CERT_INFO_EMPTY) {
return false;
}
// 1.2. If error while decoding certification info packet count, return error.
if (payload_certification_info_packet_count_error.first !=
Recovery_metadata_message::enum_recovery_metadata_message_error::
RECOVERY_METADATA_MESSAGE_OK) {
return true;
}
// 1.3. Get certification info packet count value.
unsigned int compressed_certification_info_packet_count{
payload_certification_info_packet_count_error.second};
DBUG_EXECUTE_IF("group_replication_certification_info_packet_count_check",
assert(compressed_certification_info_packet_count > 1););
// 2. Get Compression type from the received recovery metadata.
std::pair<Recovery_metadata_message::enum_recovery_metadata_message_error,
GR_compress::enum_compression_type>
payload_compression_type_error =
recovery_metadata_message->get_decoded_compression_type();
if (payload_compression_type_error.first !=
Recovery_metadata_message::enum_recovery_metadata_message_error::
RECOVERY_METADATA_MESSAGE_OK) {
return true;
}
// 2.1 Get Compression type value.
GR_compress::enum_compression_type compression_type{
payload_compression_type_error.second};
/*
3. Get compressed certification info iterator to iterate through
multiple packets of compressed certification info.
*/
Recovery_metadata_message_compressed_parts compressed_parts(
recovery_metadata_message, compressed_certification_info_packet_count);
if (!is_initialized()) {
return true;
}
mysql_mutex_lock(&LOCK_certification_info);
clear_certification_info();
// 3.1. Iterate through compressed certification info packets.
uint compressed_certification_info_packet_count_aux{0};
for (auto single_compressed_part : compressed_parts) {
/*
3.2. Decompress, unserialize using protobuf and then add it's content
to local certification info.
*/
if (set_certification_info_part(compression_type,
std::get<0>(single_compressed_part),
std::get<1>(single_compressed_part),
std::get<2>(single_compressed_part))) {
mysql_mutex_unlock(&LOCK_certification_info);
return true;
}
++compressed_certification_info_packet_count_aux;
}
/*
3.3. Check if number of received compressed certification info packets match
with packets sent.
*/
if (compressed_certification_info_packet_count !=
compressed_certification_info_packet_count_aux) {
LogPluginErr(ERROR_LEVEL,
ER_GROUP_REPLICATION_METADATA_CERT_INFO_PACKET_COUNT_ERROR);
mysql_mutex_unlock(&LOCK_certification_info);
return true;
}
/*
4. Sets the received gtid_executed from metadata sender.
Extract the donor group_gtid_executed so that it can be used to
while member is applying transactions that were already applied
by distributed recovery procedure.
*/
std::pair<Recovery_metadata_message::enum_recovery_metadata_message_error,
std::reference_wrapper<std::string>>
payload_after_gtids_error =
recovery_metadata_message->get_decoded_group_gtid_executed();
// 4.1. Set group_gtid_extracted if not error.
if (payload_after_gtids_error.first ==
Recovery_metadata_message::enum_recovery_metadata_message_error::
RECOVERY_METADATA_MESSAGE_OK) {
std::string gtid_extracted_set{payload_after_gtids_error.second.get()};
if (group_gtid_extracted->add_gtid_text(gtid_extracted_set.c_str()) !=
RETURN_STATUS_OK) {
LogPluginErr(ERROR_LEVEL,
ER_GROUP_REPLICATION_METADATA_READ_GTID_EXECUTED);
mysql_mutex_unlock(&LOCK_certification_info);
return true;
}
} else {
// Error decoding group_gtid_executed.
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_METADATA_READ_GTID_EXECUTED);
mysql_mutex_unlock(&LOCK_certification_info);
return true;
}
mysql_mutex_unlock(&LOCK_certification_info);
return false;
}
bool Certifier::set_certification_info_part(
GR_compress::enum_compression_type compression_type,
const unsigned char *buffer, unsigned long long buffer_length,
unsigned long long uncompressed_buffer_length) {
DBUG_TRACE;
unsigned char *uncompressed_buffer{nullptr};
std::size_t uncompressed_buffer_size{0};
mysql_mutex_assert_owner(&LOCK_certification_info);
if (buffer != nullptr && buffer_length > 0 &&
uncompressed_buffer_length > 0) {
// 1. Initialize compression library.
GR_decompress *decompress = new GR_decompress(compression_type);
// 2. Decompress data.
GR_decompress::enum_decompression_error decompression_error =
decompress->decompress(buffer, buffer_length,
uncompressed_buffer_length);
// 3. Verify decompression is successful.
if (decompression_error !=
GR_decompress::enum_decompression_error::DECOMPRESSION_OK) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_DECOMPRESS_PROCESS);
delete decompress;
return true;
}
// 4. Get data after decompression.
std::tie(uncompressed_buffer, uncompressed_buffer_size) =
decompress->get_buffer();
if (uncompressed_buffer == nullptr || uncompressed_buffer_size == 0) {
LogPluginErr(ERROR_LEVEL,
ER_GROUP_REPLICATION_METADATA_CERT_INFO_PACKET_EMPTY);
delete decompress;
return true;
}
// 5. Unserialize uncompressed data using Protobuf.
ProtoCertificationInformationMap cert_info;
if (!cert_info.ParseFromArray(uncompressed_buffer,
uncompressed_buffer_size)) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_METADATA_PROTOBUF_PARSING);
delete decompress;
return true;
}
// 6. Now release compression library object so output buffer memory can be
// released.
delete decompress;
// 7. Insert data to certification info.
for (auto it = cert_info.data().begin(); it != cert_info.data().end();
++it) {
std::string key = it->first;
Gtid_set_ref *value = new Gtid_set_ref(certification_info_tsid_map, -1);
if (value->add_gtid_encoding(
reinterpret_cast<const uchar *>(it->second.c_str()),
it->second.length()) != RETURN_STATUS_OK) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CANT_READ_WRITE_SET_ITEM,
key.c_str());
return true;
}
value->link();
certification_info.insert(
std::pair<std::string, Gtid_set_ref *>(key, value));
/*
The memory used by Gtid_set_ref is allocated by
`thread/group_rpl/THD_applier_module_receiver`, though it will be
released by `thread/group_rpl/THD_certifier_broadcast` thread. To avoid
untracked memory release on
`thread/group_rpl/THD_applier_module_receiver` we do dissociate this
used memory from this thread.
*/
value->claim_memory_ownership(false);
}
return false;
}
// 8. Error if input compressed certification_info packet is empty.
LogPluginErr(ERROR_LEVEL,
ER_GROUP_REPLICATION_METADATA_CERT_INFO_PACKET_EMPTY);
return true;
}
bool Certifier::initialize_server_gtid_set_after_distributed_recovery() {
DBUG_TRACE;
if (!is_initialized()) {
return true;
}
mysql_mutex_lock(&LOCK_certification_info);
if (initialize_server_gtid_set(false)) {
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_INIT_CERTIFICATION_INFO_FAILURE);
mysql_mutex_unlock(&LOCK_certification_info);
return true;
}
mysql_mutex_unlock(&LOCK_certification_info);
return false;
}
bool Certifier::compress_packet(
ProtoCertificationInformationMap &proto_cert_info,
unsigned char **uncompresssed_buffer,
std::vector<GR_compress *> &compressor_list,
GR_compress::enum_compression_type compression_type) {
size_t proto_cert_info_size = proto_cert_info.ByteSizeLong();
*uncompresssed_buffer =
(uchar *)my_realloc(key_compression_data, *uncompresssed_buffer,
proto_cert_info_size, MYF(0));
if (*uncompresssed_buffer == nullptr) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_METADATA_MEMORY_ALLOC,
"Serializing Protobuf Map");
return true;
}
// 1. Serialize Protobuf Map
if (!proto_cert_info.SerializeToArray(*uncompresssed_buffer,
proto_cert_info_size)) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_PROTOBUF_SERIALIZING_ERROR,
"Certification_info");
return true;
}
proto_cert_info.clear_data();
// 2. Initialize compression library.
GR_compress *compress = new GR_compress(compression_type);
// 3. Compress data.
GR_compress::enum_compression_error error =
compress->compress(*uncompresssed_buffer, proto_cert_info_size);
// 4. Verify compression is successful.
if (error != GR_compress::enum_compression_error::COMPRESSION_OK) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_COMPRESS_PROCESS);
delete compress;
return true;
}
// 5. Add compressed data to vector.
compressor_list.push_back(compress);
return false;
}
bool Certifier::get_certification_info_recovery_metadata(
Recovery_metadata_message *recovery_metadata_message) {
DBUG_TRACE;
bool error{false};
size_t max_length{0};
size_t max_compressed_packet_size_val{MAX_COMPRESSED_PACKET_SIZE};
std::string key{};
uchar *buf{nullptr};
uchar *uncompresssed_buffer{nullptr};
std::string value{};
size_t len{0};
ProtoCertificationInformationMap proto_cert_info;
if (!is_initialized()) {
return true;
}
mysql_mutex_lock(&LOCK_certification_info);
// I. Generate Compressed certification_info packets.
for (Certification_info::iterator it = certification_info.begin();
it != certification_info.end(); ++it) {
// 1. Read data from certification_info map.
key.assign(it->first);
len = it->second->get_encoded_length();
buf = (uchar *)my_realloc(key_certification_data, buf, len, MYF(0));
if (buf == nullptr) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_METADATA_MEMORY_ALLOC,
"reading data from certification_info");
error = true;
goto err;
}
it->second->encode(buf);
value.assign(reinterpret_cast<const char *>(buf), len);
// 2. Add to Protobuf map.
(*proto_cert_info.mutable_data())[key] = value;
// 3. If read size is greater than MAX_COMPRESSED_PACKET_SIZE,
// call compress_packet() which will
// - serialize Protobuf Map,
// - compress serialized string,
// - The compressed data is pushed to a std::vector, so that multiple
// packets of compressed data is prepared.
max_length += (key.length() + len);
DBUG_EXECUTE_IF("group_replication_max_compressed_packet_size_10000",
{ max_compressed_packet_size_val = 10000; });
if (max_length > max_compressed_packet_size_val) {
if (compress_packet(
proto_cert_info, &uncompresssed_buffer,
recovery_metadata_message->get_encode_compressor_list(),
recovery_metadata_message->get_encode_compression_type())) {
error = true;
goto err;
}
max_length = 0;
}
}
if (max_length > 0) {
if (compress_packet(
proto_cert_info, &uncompresssed_buffer,
recovery_metadata_message->get_encode_compressor_list(),
recovery_metadata_message->get_encode_compression_type())) {
error = true;
goto err;
}
}
// II. Get executed gtid set.
// Add the group_gtid_executed to Recovery Metadata which will be sent
// to joiners.
len = group_gtid_executed->get_encoded_length();
buf = (uchar *)my_realloc(key_certification_data, buf, len, MYF(0));
if (buf == nullptr) {
LogPluginErr(ERROR_LEVEL, ER_GROUP_REPLICATION_METADATA_MEMORY_ALLOC,
"getting executed gtid set for Recovery Metadata");
error = true;
goto err;
}
group_gtid_executed->encode(buf);
recovery_metadata_message->get_encode_group_gtid_executed().assign(
reinterpret_cast<const char *>(buf), len);
err:
my_free(buf);
my_free(uncompresssed_buffer);
mysql_mutex_unlock(&LOCK_certification_info);
return error;
}
std::pair<Gtid, mysql::utils::Return_status>
Certifier::generate_view_change_group_gtid() {
DBUG_TRACE;
if (!is_initialized()) {
Gtid resulting_gtid{-1, -1};
return std::make_pair(resulting_gtid, mysql::utils::Return_status::error);
}
MUTEX_LOCK(guard, &LOCK_certification_info);
auto [generated_gno, generation_code] =
gtid_generator.get_next_available_gtid(
nullptr, views_sidno_group_representation, *get_group_gtid_set());
DBUG_EXECUTE_IF("certifier_assert_next_seqno_equal_5",
assert(generated_gno == 5););
DBUG_EXECUTE_IF("certifier_assert_next_seqno_equal_7",
assert(generated_gno == 7););
if (generation_code == mysql::utils::Return_status::ok)
add_to_group_gtid_executed_internal(views_sidno_group_representation,
generated_gno);
Gtid resulting_gtid{views_sidno_server_representation, generated_gno};
return std::make_pair(resulting_gtid, generation_code);
}
int Certifier::set_certification_info(
std::map<std::string, std::string> *cert_info) {
DBUG_TRACE;
assert(cert_info != nullptr);
if (!is_initialized()) {
return 1;
}
if (cert_info->size() == 1) {
std::map<std::string, std::string>::iterator it =
cert_info->find(CERTIFICATION_INFO_ERROR_NAME);
if (it != cert_info->end()) {
// The certification database could not be transmitted
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_ERROR_ON_CERT_DB_INSTALL,
it->second.c_str());
return 1;
}
}
MUTEX_LOCK(guard, &LOCK_certification_info);
clear_certification_info();
for (std::map<std::string, std::string>::iterator it = cert_info->begin();
it != cert_info->end(); ++it) {
std::string key = it->first;
/*
Extract the donor group_gtid_executed so that it can be used to
while member is applying transactions that were already applied
by distributed recovery procedure.
*/
if (it->first.compare(GTID_EXTRACTED_NAME) == 0) {
if (group_gtid_extracted->add_gtid_encoding(
reinterpret_cast<const uchar *>(it->second.c_str()),
it->second.length()) != RETURN_STATUS_OK) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_CANT_READ_GRP_GTID_EXTRACTED); /* purecov: inspected */
return 1; /* purecov: inspected */
}
continue;
}
Gtid_set_ref *value = new Gtid_set_ref(certification_info_tsid_map, -1);
if (value->add_gtid_encoding(
reinterpret_cast<const uchar *>(it->second.c_str()),
it->second.length()) != RETURN_STATUS_OK) {
delete value; /* purecov: inspected */
LogPluginErr(ERROR_LEVEL, ER_GRP_RPL_CANT_READ_WRITE_SET_ITEM,
key.c_str()); /* purecov: inspected */
return 1; /* purecov: inspected */
}
value->link();
certification_info.insert(
std::pair<std::string, Gtid_set_ref *>(key, value));
/*
The memory used by Gtid_set_ref is allocated by
`thread/group_rpl/THD_applier_module_receiver`, though it will be released
by `thread/group_rpl/THD_certifier_broadcast` thread. To avoid untracked
memory release on `thread/group_rpl/THD_applier_module_receiver` we do
dissociate this used memory from this thread.
*/
value->claim_memory_ownership(false);
}
if (initialize_server_gtid_set()) {
LogPluginErr(
ERROR_LEVEL,
ER_GRP_RPL_INIT_CERTIFICATION_INFO_FAILURE); /* purecov: inspected */
return 1; /* purecov: inspected */
}
if (group_gtid_extracted->is_subset_not_equals(group_gtid_executed)) {
certifying_already_applied_transactions = true;
gtid_generator.recompute(*get_group_gtid_set());
#ifndef NDEBUG
debug_print_group_gtid_sets(*group_gtid_executed, *group_gtid_extracted,
true);
#endif
}
return 0;
}
void Certifier::update_certified_transaction_count(bool result,
bool local_transaction) {
mysql_mutex_assert_owner(&LOCK_certification_info);
if (result)
positive_cert++;
else
negative_cert++;
const Group_member_info::Group_member_status member_status =
local_member_info->get_recovery_status();
assert(member_status == Group_member_info::MEMBER_ONLINE ||
member_status == Group_member_info::MEMBER_IN_RECOVERY);
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_certified();
/*
If transaction is local and rolledback
increment local negative certifier count
*/
if (local_transaction && !result) {
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_local_rollback();
}
if (member_status == Group_member_info::MEMBER_IN_RECOVERY) {
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_certified_during_recovery();
if (!result) {
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_certified_negatively_during_recovery();
}
}
}
ulonglong Certifier::get_positive_certified() { return positive_cert; }
ulonglong Certifier::get_negative_certified() { return negative_cert; }
ulonglong Certifier::get_certification_info_size() {
return certification_info.size();
}
void Certifier::get_last_conflict_free_transaction(std::string *value) {
int length = 0;
char buffer[Gtid::MAX_TEXT_LENGTH + 1];
if (!is_initialized()) {
return;
}
MUTEX_LOCK(guard, &LOCK_certification_info);
if (last_conflict_free_transaction.is_empty()) return;
length =
last_conflict_free_transaction.to_string(group_gtid_tsid_map, buffer);
if (length > 0) value->assign(buffer);
}
void Certifier::enable_conflict_detection() {
DBUG_TRACE;
if (!is_initialized()) {
return;
}
MUTEX_LOCK(guard, &LOCK_certification_info);
conflict_detection_enable = true;
local_member_info->enable_conflict_detection();
}
void Certifier::disable_conflict_detection() {
DBUG_TRACE;
assert(local_member_info->in_primary_mode());
if (!is_initialized()) {
return;
}
{
MUTEX_LOCK(guard, &LOCK_certification_info);
conflict_detection_enable = false;
local_member_info->disable_conflict_detection();
}
LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_CONFLICT_DETECTION_DISABLED);
}
bool Certifier::is_conflict_detection_enable() {
DBUG_TRACE;
if (!is_initialized()) {
return false;
}
MUTEX_LOCK(guard, &LOCK_certification_info);
bool result = conflict_detection_enable;
return result;
}
/*
Gtid_Executed_Message implementation
*/
Gtid_Executed_Message::Gtid_Executed_Message()
: Plugin_gcs_message(CT_CERTIFICATION_MESSAGE) {}
Gtid_Executed_Message::~Gtid_Executed_Message() = default;
void Gtid_Executed_Message::append_gtid_executed(uchar *gtid_data, size_t len) {
data.insert(data.end(), gtid_data, gtid_data + len);
}
void Gtid_Executed_Message::encode_payload(
std::vector<unsigned char> *buffer) const {
DBUG_TRACE;
encode_payload_item_type_and_length(buffer, PIT_GTID_EXECUTED, data.size());
buffer->insert(buffer->end(), data.begin(), data.end());
encode_payload_item_int8(buffer, PIT_SENT_TIMESTAMP,
Metrics_handler::get_current_time());
}
void Gtid_Executed_Message::decode_payload(const unsigned char *buffer,
const unsigned char *) {
DBUG_TRACE;
const unsigned char *slider = buffer;
uint16 payload_item_type = 0;
unsigned long long payload_item_length = 0;
decode_payload_item_type_and_length(&slider, &payload_item_type,
&payload_item_length);
data.clear();
data.insert(data.end(), slider, slider + payload_item_length);
}
uint64_t Gtid_Executed_Message::get_sent_timestamp(const unsigned char *buffer,
size_t length) {
DBUG_TRACE;
return Plugin_gcs_message::get_sent_timestamp(buffer, length,
PIT_SENT_TIMESTAMP);
}