sql/dependency_slave_worker.cc (218 lines of code) (raw):

#include <my_global.h> #include "debug_sync.h" #include "dependency_slave_worker.h" #include "log_event_wrapper.h" #include "rpl_slave_commit_order_manager.h" #include <../include/mysql/service_thd_engine_lock.h> bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli); void clear_current_group_events(Slave_worker *worker, Relay_log_info *rli, bool overfill); std::shared_ptr<Log_event_wrapper> Dependency_slave_worker::get_begin_event(Commit_order_manager *co_mngr) { std::shared_ptr<Log_event_wrapper> ret; mysql_mutex_lock(&c_rli->dep_lock); PSI_stage_info old_stage; info_thd->ENTER_COND(&c_rli->dep_empty_cond, &c_rli->dep_lock, &stage_slave_waiting_event_from_coordinator, &old_stage); while (!info_thd->killed && running_status == RUNNING && c_rli->dep_queue.empty()) { ++c_rli->begin_event_waits; ++c_rli->num_workers_waiting; const auto timeout_nsec= c_rli->mts_dependency_cond_wait_timeout * 1000000; struct timespec abstime; set_timespec_nsec(abstime, timeout_nsec); mysql_cond_timedwait(&c_rli->dep_empty_cond, &c_rli->dep_lock, &abstime); --c_rli->num_workers_waiting; } ret= c_rli->dequeue_dep(); // case: place ourselves in the commit order queue if (ret && co_mngr != NULL) { DBUG_ASSERT(c_rli->mts_dependency_order_commits); // case: if we need to order commits by DB we set current DB if (c_rli->mts_dependency_order_commits == DEP_RPL_ORDER_DB) set_current_db(ret->get_db()); co_mngr->register_trx(this); } // case: signal if queue is now empty if (c_rli->dep_queue.empty()) mysql_cond_signal(&c_rli->dep_empty_cond); // admission control if (unlikely(c_rli->dep_full)) { DBUG_ASSERT(c_rli->dep_queue.size() > 0); // case: signal if dep has space if (c_rli->dep_queue.size() < (c_rli->mts_dependency_size * c_rli->mts_dependency_refill_threshold / 100)) { c_rli->dep_full= false; mysql_cond_signal(&c_rli->dep_full_cond); } } info_thd->EXIT_COND(&old_stage); return ret; } // Pulls and executes events single group // Returns true if the group executed successfully bool Dependency_slave_worker::execute_group() { int err= 0; Commit_order_manager *commit_order_mngr= get_commit_order_manager(); DBUG_ASSERT(current_event_index == 0); auto begin_event= get_begin_event(commit_order_mngr); auto ev= begin_event; while (ev) { if (unlikely(err= execute_event(ev))) { c_rli->dependency_worker_error= true; break; } DBUG_EXECUTE_IF("dbug.dep_fake_gap_lock_on_insert", { if (!ev->is_end_event && ev->raw_event() && ev->raw_event()->get_type_code() == WRITE_ROWS_EVENT) { if (!c_rli->dep_fake_gap_lock.try_lock()) { thd_report_row_lock_wait( info_thd, c_rli->dep_fake_gap_lock_worker->info_thd); c_rli->dep_fake_gap_lock.lock(); c_rli->dep_fake_gap_lock_worker= this; } else { c_rli->dep_fake_gap_lock_worker= this; } } };); // case: restart trx if temporary error, see @slave_worker_ends_group if (unlikely(trans_retries && current_event_index == 0)) { DBUG_EXECUTE_IF("dbug.dep_fake_gap_lock_on_insert", { if (this == c_rli->dep_fake_gap_lock_worker) { c_rli->dep_fake_gap_lock_worker= nullptr; c_rli->dep_fake_gap_lock.unlock(); } };); ev= begin_event; continue; } finalize_event(ev); ev= ev->next(); } DBUG_EXECUTE_IF("dbug.dep_fake_gap_lock_on_insert", { if (this == c_rli->dep_fake_gap_lock_worker) { c_rli->dep_fake_gap_lock_worker= nullptr; c_rli->dep_fake_gap_lock.unlock(); } };); // case: in case of error rollback if commit ordering is enabled if (unlikely(err && commit_order_mngr)) { commit_order_mngr->report_rollback(this); } mysql_mutex_lock(&c_rli->dep_lock); if (likely(begin_event)) { DBUG_ASSERT(c_rli->num_in_flight_trx > 0); --c_rli->num_in_flight_trx; } if (c_rli->num_in_flight_trx <= 1) mysql_cond_broadcast(&c_rli->dep_trx_all_done_cond); mysql_mutex_unlock(&c_rli->dep_lock); c_rli->cleanup_group(begin_event); return err == 0 && !info_thd->killed && running_status == RUNNING; } int Dependency_slave_worker::execute_event(std::shared_ptr<Log_event_wrapper> &ev) { // wait for all dependencies to be satisfied if (unlikely(!ev->wait(this))) return 1; DBUG_EXECUTE_IF("dbug.dep_wait_before_update_execution", { if (ev->raw_event()->get_type_code() == UPDATE_ROWS_EVENT) { const char act[]= "now signal signal.reached wait_for signal.done"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT(!debug_sync_set_action(info_thd, STRING_WITH_LEN(act))); } };); // case: there was an error in one of the workers, so let's skip execution of // events immediately if (unlikely(c_rli->dependency_worker_error)) return 1; // case: the worker job queue is full, let's flush the queue to make progress if (unlikely(current_event_index >= jobs.size)) { // Resets current_event_index to 0 and disables trx retires because we've // flushed the events mysql_mutex_lock(&jobs_lock); clear_current_group_events(this, c_rli, true); mysql_mutex_unlock(&jobs_lock); } // case: append to jobs queue only if this is not a trx retry, trx retries // resets @current_event_index, see @slave_worker_ends_group if (likely(current_event_index == jobs.len)) { // NOTE: this is done so that @pop_jobs_item() can extract this event // although this is redundant it makes integration with existing code much // easier Slave_job_item item= { ev->raw_event() }; if (append_item_to_jobs(&item, this, c_rli)) return 1; ev->is_appended_to_queue= true; } DBUG_ASSERT(ev->is_appended_to_queue); return ev->execute(this, this->info_thd, c_rli) == 0 ? 0 : -1; } void Dependency_slave_worker::finalize_event(std::shared_ptr<Log_event_wrapper> &ev) { /* Attempt to clean up entries from the key lookup * * There are two cases: * 1) The "value" of the key-value pair is equal to this event. In this case, * remove the key-value pair from the map. * 2) The "value" of the key-value pair is _not_ equal to this event. In this * case, leave it be; the event corresponds to a later transaction. */ mysql_mutex_lock(&c_rli->dep_key_lookup_mutex); if (likely(!c_rli->dep_key_lookup.empty())) { for (const auto& key : ev->keys) { const auto it= c_rli->dep_key_lookup.find(key); DBUG_ASSERT(it != c_rli->dep_key_lookup.end()); /* Case 1. (Case 2 is implicitly handled by doing nothing.) */ if (it->second == ev) { c_rli->dep_key_lookup.erase(it); } } } ev->finalize(); mysql_mutex_unlock(&c_rli->dep_key_lookup_mutex); } Dependency_slave_worker::Dependency_slave_worker(Relay_log_info *rli #ifdef HAVE_PSI_INTERFACE ,PSI_mutex_key *param_key_info_run_lock, PSI_mutex_key *param_key_info_data_lock, PSI_mutex_key *param_key_info_sleep_lock, PSI_mutex_key *param_key_info_thd_lock, PSI_mutex_key *param_key_info_data_cond, PSI_mutex_key *param_key_info_start_cond, PSI_mutex_key *param_key_info_stop_cond, PSI_mutex_key *param_key_info_sleep_cond #endif ,uint param_id ) : Slave_worker(rli #ifdef HAVE_PSI_INTERFACE ,param_key_info_run_lock, param_key_info_data_lock, param_key_info_sleep_lock, param_key_info_thd_lock, param_key_info_data_cond, param_key_info_start_cond, param_key_info_stop_cond, param_key_info_sleep_cond #endif ,param_id ) { } void Dependency_slave_worker::start() { DBUG_ASSERT(c_rli->dep_queue.empty() && dep_key_lookup.empty() && keys_accessed_by_group.empty() && dbs_accessed_by_group.empty()); while (execute_group()); // case: cleanup if stopped abruptly if (running_status != STOP_ACCEPTED) { c_rli->dependency_worker_error= true; // tagging as exiting so Coordinator won't be able synchronize with it mysql_mutex_lock(&jobs_lock); running_status= ERROR_LEAVING; mysql_mutex_unlock(&jobs_lock); // Killing Coordinator to indicate eventual consistency error mysql_mutex_lock(&c_rli->info_thd->LOCK_thd_data); c_rli->info_thd->awake(THD::KILL_QUERY); mysql_mutex_unlock(&c_rli->info_thd->LOCK_thd_data); } }