modules/util/load/dump_loader.cc (2,601 lines of code) (raw):
/*
* Copyright (c) 2020, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms,
* as designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "modules/util/load/dump_loader.h"
#include <mysqld_error.h>
#include <algorithm>
#include <cinttypes>
#include <iterator>
#include <list>
#include <sstream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include "modules/mod_utils.h"
#include "modules/util/dump/capability.h"
#include "modules/util/dump/schema_dumper.h"
#include "modules/util/import_table/load_data.h"
#include "modules/util/load/load_errors.h"
#include "modules/util/load/load_progress_log.h"
#include "mysqlshdk/include/scripting/naming_style.h"
#include "mysqlshdk/include/shellcore/console.h"
#include "mysqlshdk/include/shellcore/shell_init.h"
#include "mysqlshdk/include/shellcore/shell_options.h"
#include "mysqlshdk/libs/mysql/instance.h"
#include "mysqlshdk/libs/mysql/script.h"
#include "mysqlshdk/libs/mysql/utils.h"
#include "mysqlshdk/libs/storage/compressed_file.h"
#include "mysqlshdk/libs/utils/debug.h"
#include "mysqlshdk/libs/utils/fault_injection.h"
#include "mysqlshdk/libs/utils/strformat.h"
#include "mysqlshdk/libs/utils/utils_general.h"
#include "mysqlshdk/libs/utils/utils_lexing.h"
#include "mysqlshdk/libs/utils/utils_mysql_parsing.h"
#include "mysqlshdk/libs/utils/utils_net.h"
#include "mysqlshdk/libs/utils/version.h"
namespace mysqlsh {
using mysqlshdk::utils::Version;
FI_DEFINE(dump_loader, [](const mysqlshdk::utils::FI::Args &args) {
throw std::runtime_error(args.get_string("msg"));
});
// how many seconds the server should wait to finish reading data from client
// basically how long it may take for a block of data to be read from its
// source (download + decompression)
static constexpr const int k_mysql_server_net_read_timeout = 30 * 60;
// number of seconds before server disconnects idle clients
// load can take a long time and some of the connections will be idle
// meanwhile so this needs to be high
static constexpr const int k_mysql_server_wait_timeout = 365 * 24 * 60 * 60;
// the version of the dump we support in this code
static constexpr const int k_supported_dump_version_major = 2;
static constexpr const int k_supported_dump_version_minor = 0;
// Multiplier for bytesPerChunk which determines how big a chunk can actually be
// before we enable sub-chunking for it.
static constexpr const auto k_chunk_size_overshoot_tolerance = 1.5;
namespace {
bool histograms_supported(const Version &version) {
return version > Version(8, 0, 0);
}
bool has_pke(mysqlshdk::db::ISession *session, const std::string &schema,
const std::string &table) {
// Return true if the table has a PK or equivalent (UNIQUE NOT NULL)
auto res = session->queryf("SHOW INDEX IN !.!", schema, table);
while (auto row = res->fetch_one_named()) {
if (row.get_int("Non_unique") == 0 && row.get_string("Null").empty())
return true;
}
return false;
}
compatibility::Deferred_statements preprocess_table_script_for_indexes(
std::string *script, const std::string &key, bool fulltext_only) {
compatibility::Deferred_statements stmts;
auto script_length = script->length();
std::istringstream stream(*script);
script->clear();
mysqlshdk::utils::iterate_sql_stream(
&stream, script_length,
[&](std::string_view s, std::string_view delim, size_t, size_t) {
auto sql = shcore::str_format(
"%.*s%.*s\n", static_cast<int>(s.length()), s.data(),
static_cast<int>(delim.length()), delim.data());
mysqlshdk::utils::SQL_iterator sit(sql);
if (shcore::str_caseeq(sit.next_token(), "CREATE") &&
shcore::str_caseeq(sit.next_token(), "TABLE")) {
assert(stmts.rewritten.empty());
stmts = compatibility::check_create_table_for_indexes(sql, key,
fulltext_only);
sql = stmts.rewritten;
}
script->append(sql);
return true;
},
[&key](std::string_view err) {
THROW_ERROR(SHERR_LOAD_SPLITTING_DDL_FAILED, key.c_str(),
std::string{err}.c_str());
});
return stmts;
}
void add_invisible_pk(std::string *script, const std::string &key) {
const auto script_length = script->length();
std::istringstream stream(*script);
script->clear();
mysqlshdk::utils::iterate_sql_stream(
&stream, script_length,
[&](std::string_view s, std::string_view delim, size_t, size_t) {
auto sql = shcore::str_format(
"%.*s%.*s\n", static_cast<int>(s.length()), s.data(),
static_cast<int>(delim.length()), delim.data());
mysqlshdk::utils::SQL_iterator sit(sql);
if (shcore::str_caseeq(sit.next_token(), "CREATE") &&
shcore::str_caseeq(sit.next_token(), "TABLE")) {
compatibility::add_pk_to_create_table(sql, &sql);
}
script->append(sql);
return true;
},
[&key](std::string_view err) {
THROW_ERROR(SHERR_LOAD_SPLITTING_DDL_FAILED, key.c_str(),
std::string{err}.c_str());
});
}
void execute_statement(const std::shared_ptr<mysqlshdk::db::ISession> &session,
std::string_view stmt, const std::string &error_prefix,
int silent_error = -1) {
assert(!error_prefix.empty());
constexpr uint32_t k_max_retry_time = 5 * 60 * 1000; // 5 minutes
uint32_t sleep_time = 200; // start with 200 ms, double it with each sleep
uint32_t total_sleep_time = 0;
while (true) {
try {
session->executes(stmt.data(), stmt.length());
return;
} catch (const mysqlshdk::db::Error &e) {
const std::string stmt_str{stmt};
const auto sensitive =
compatibility::contains_sensitive_information(stmt_str);
constexpr std::string_view replacement = "'****'";
std::vector<std::string> replaced;
const auto &query = sensitive ? compatibility::replace_quoted_strings(
stmt_str, replacement, &replaced)
: stmt_str;
auto error = e.format();
for (const auto &replace : replaced) {
error = shcore::str_replace(error, replace, replacement);
}
log_info("Error executing SQL: %s:\n%s", error.c_str(), query.c_str());
if (ER_LOCK_DEADLOCK == e.code() && total_sleep_time < k_max_retry_time) {
current_console()->print_note(error_prefix +
", will retry after delay: " + error);
if (total_sleep_time + sleep_time > k_max_retry_time) {
sleep_time = k_max_retry_time - total_sleep_time;
}
shcore::sleep_ms(sleep_time);
total_sleep_time += sleep_time;
sleep_time *= 2;
} else {
if (silent_error != e.code()) {
current_console()->print_error(
shcore::str_format("%s: %s: %s", error_prefix.c_str(),
error.c_str(), query.c_str()));
}
throw;
}
}
}
}
void execute_script(
const std::shared_ptr<mysqlshdk::db::ISession> &session,
const std::string &script, const std::string &error_prefix,
const std::function<bool(std::string_view, std::string *)> &process_stmt) {
std::stringstream stream(script);
mysqlshdk::utils::iterate_sql_stream(
&stream, 1024 * 64,
[&error_prefix, &session, &process_stmt](
std::string_view s, std::string_view, size_t, size_t) {
std::string new_stmt;
if (process_stmt && process_stmt(s, &new_stmt)) s = new_stmt;
if (!s.empty()) {
execute_statement(session, s, error_prefix);
}
return true;
},
[](std::string_view err) {
current_console()->print_error(std::string{err});
});
}
class Index_file {
public:
explicit Index_file(mysqlshdk::storage::IFile *data_file) {
m_idx_file = data_file->parent()->file(data_file->filename() + ".idx");
}
uint64_t data_size() {
load_metadata();
return m_data_size;
}
private:
void load_metadata() {
if (m_metadata_loaded) {
return;
}
m_file_size = m_idx_file->file_size();
if ((m_file_size % k_entry_size) != 0 || m_file_size < k_entry_size) {
log_warning(
"idx file %s has unexpected size %s, which is not a multiple of %s",
m_idx_file->filename().c_str(), std::to_string(m_file_size).c_str(),
std::to_string(k_entry_size).c_str());
return;
}
m_entries = m_file_size / k_entry_size;
m_idx_file->open(mysqlshdk::storage::Mode::READ);
m_idx_file->seek(m_file_size - k_entry_size);
m_idx_file->read(&m_data_size, k_entry_size);
m_idx_file->seek(0);
m_idx_file->close();
m_data_size = mysqlshdk::utils::network_to_host(m_data_size);
m_metadata_loaded = true;
}
static constexpr uint64_t k_entry_size = sizeof(uint64_t);
std::unique_ptr<mysqlshdk::storage::IFile> m_idx_file;
std::size_t m_file_size = 0;
uint64_t m_entries = 0;
uint64_t m_data_size = 0;
bool m_metadata_loaded = false;
};
std::string format_table(const std::string &schema, const std::string &table,
const std::string &partition, ssize_t chunk) {
return "`" + schema + "`.`" + table + "`" +
(partition.empty() ? "" : " partition `" + partition + "`") +
" (chunk " + std::to_string(chunk) + ")";
}
} // namespace
void Dump_loader::Worker::Task::handle_current_exception(
Worker *worker, Dump_loader *loader, const std::string &error) {
worker->handle_current_exception(loader, error);
}
bool Dump_loader::Worker::Schema_ddl_task::execute(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Worker *worker, Dump_loader *loader) {
log_debug("worker%zu will execute DDL for schema %s", id(), schema().c_str());
loader->post_worker_event(worker, Worker_event::SCHEMA_DDL_START);
if (!m_script.empty()) {
try {
log_info("%s DDL script for schema `%s`",
m_resuming ? "Re-executing" : "Executing", schema().c_str());
if (!loader->m_options.dry_run()) {
auto transforms = loader->m_default_sql_transforms;
transforms.add_execute_conditionally(
[this, loader](std::string_view type, const std::string &name) {
bool execute = true;
if (shcore::str_caseeq(type, "EVENT")) {
execute = loader->m_dump->include_event(schema(), name);
} else if (shcore::str_caseeq(type, "FUNCTION", "PROCEDURE")) {
execute = loader->m_dump->include_routine(schema(), name);
}
return execute;
});
// execute sql
execute_script(session, m_script,
shcore::str_format("While processing schema `%s`",
schema().c_str()),
transforms);
}
} catch (const std::exception &e) {
current_console()->print_error(shcore::str_format(
"Error processing schema `%s`: %s", schema().c_str(), e.what()));
if (loader->m_options.force()) {
loader->add_skipped_schema(schema());
} else {
handle_current_exception(
worker, loader,
shcore::str_format("While executing DDL for schema %s: %s",
schema().c_str(), e.what()));
return false;
}
}
}
// need to update status from the worker thread, 'cause the main thread may be
// looking for a task and doesn't have time to process events
loader->m_schema_ddl_ready[schema()] = true;
log_debug("worker%zu done", id());
++loader->m_ddl_executed;
loader->post_worker_event(worker, Worker_event::SCHEMA_DDL_END);
return true;
}
bool Dump_loader::Worker::Table_ddl_task::execute(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Worker *worker, Dump_loader *loader) {
log_debug("worker%zu will execute DDL file for table %s", id(),
key().c_str());
loader->post_worker_event(worker, Worker_event::TABLE_DDL_START);
try {
pre_process(loader);
if (!loader->m_options.dry_run()) {
// this is here to detect if data is loaded into a non-existing schema
Dump_loader::executef(session, "use !", m_schema.c_str());
}
load_ddl(session, loader);
post_process(session, loader);
} catch (const std::exception &e) {
handle_current_exception(
worker, loader,
shcore::str_format("While executing DDL script for %s: %s",
key().c_str(), e.what()));
return false;
}
log_debug("worker%zu done", id());
++loader->m_ddl_executed;
loader->post_worker_event(worker, Worker_event::TABLE_DDL_END);
return true;
}
void Dump_loader::Worker::Table_ddl_task::pre_process(Dump_loader *loader) {
if (!m_placeholder && (loader->m_options.load_ddl() ||
loader->m_options.load_deferred_indexes())) {
if (loader->m_options.defer_table_indexes() !=
Load_dump_options::Defer_index_mode::OFF) {
extract_deferred_statements(loader);
}
if (loader->m_options.load_ddl() && loader->should_create_pks() &&
!loader->m_options.auto_create_pks_supported() &&
!loader->m_dump->has_primary_key(m_schema, m_table)) {
add_invisible_pk(&m_script, key());
}
}
}
void Dump_loader::Worker::Table_ddl_task::extract_deferred_statements(
Dump_loader *loader) {
const auto &table_name = key();
const auto fulltext_only = loader->m_options.defer_table_indexes() ==
Load_dump_options::Defer_index_mode::FULLTEXT;
m_deferred_statements = std::make_unique<compatibility::Deferred_statements>(
preprocess_table_script_for_indexes(&m_script, table_name,
fulltext_only));
}
void Dump_loader::Worker::Table_ddl_task::post_process(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Dump_loader *loader) {
if ((m_status == Load_progress_log::DONE ||
loader->m_options.ignore_existing_objects()) &&
m_deferred_statements && !m_deferred_statements->empty()) {
remove_duplicate_deferred_statements(session, loader);
}
}
void Dump_loader::Worker::Table_ddl_task::remove_duplicate_deferred_statements(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Dump_loader *loader) {
// this handles the case where the table was already created (either in a
// previous run or by the user) and some indexes may already have been
// re-created
const auto &table_name = key();
const auto fulltext_only = loader->m_options.defer_table_indexes() ==
Load_dump_options::Defer_index_mode::FULLTEXT;
std::string ct;
try {
ct = Dump_loader::query(session, "show create table " + table_name)
->fetch_one()
->get_string(1);
} catch (const mysqlshdk::db::Error &e) {
if ((ER_BAD_DB_ERROR == e.code() || ER_NO_SUCH_TABLE == e.code()) &&
loader->m_options.dry_run()) {
// table may not exists if we're running in dryRun mode, this is not an
// error; there are no duplicates
return;
} else {
// in any other case we throw an exception, table should exist at this
// point
throw;
}
}
const auto recreated = compatibility::check_create_table_for_indexes(
ct, table_name, fulltext_only);
const auto remove_duplicates = [&table_name](
const std::vector<std::string> &needles,
std::vector<std::string> *haystack) {
for (const auto &n : needles) {
const auto pos = std::remove(haystack->begin(), haystack->end(), n);
if (haystack->end() != pos) {
log_info(
"Table %s already contains '%s', it's not going to be deferred.",
table_name.c_str(), n.c_str());
haystack->erase(pos, haystack->end());
}
}
};
remove_duplicates(recreated.index_info.fulltext,
&m_deferred_statements->index_info.fulltext);
remove_duplicates(recreated.index_info.spatial,
&m_deferred_statements->index_info.spatial);
remove_duplicates(recreated.index_info.regular,
&m_deferred_statements->index_info.regular);
remove_duplicates(recreated.foreign_keys,
&m_deferred_statements->foreign_keys);
if (!recreated.secondary_engine.empty()) {
if (m_deferred_statements->has_alters()) {
// recreated table already has a secondary engine (possibly table was
// modified by the user), but not all indexes were applied, we're unable
// to continue, as ALTER TABLE statements will fail
THROW_ERROR(SHERR_LOAD_SECONDARY_ENGINE_ERROR, table_name.c_str());
} else {
// recreated table has all the indexes and the secondary engine in place,
// nothing more to do
m_deferred_statements->secondary_engine.clear();
}
}
}
void Dump_loader::Worker::Table_ddl_task::load_ddl(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Dump_loader *loader) {
if (m_status != Load_progress_log::DONE && loader->m_options.load_ddl()) {
log_debug("worker%zu: Executing %stable DDL for %s", id(),
m_placeholder ? "placeholder " : "", key().c_str());
try {
log_info(
"[Worker%03zu] %s DDL script for %s%s", id(),
(m_status == Load_progress_log::INTERRUPTED ? "Re-executing"
: "Executing"),
key().c_str(),
(m_placeholder
? " (placeholder for view)"
: (m_deferred_statements && m_deferred_statements->has_alters()
? " (indexes removed for deferred creation)"
: "")));
if (!loader->m_options.dry_run()) {
// execute sql
execute_script(
session, m_script,
shcore::str_format("[Worker%03zu] Error processing table %s", id(),
key().c_str()),
loader->m_default_sql_transforms);
}
} catch (const std::exception &e) {
if (!loader->m_options.force()) throw;
}
}
}
bool Dump_loader::Worker::Load_chunk_task::execute(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Worker *worker, Dump_loader *loader) {
const auto masked_path = m_file->full_path().masked();
log_debug("worker%zu will load chunk %s for table `%s`.`%s`", id(),
masked_path.c_str(), schema().c_str(), table().c_str());
try {
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "BEFORE_LOAD_START"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())}}));
loader->post_worker_event(worker, Worker_event::LOAD_START);
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "AFTER_LOAD_START"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())}}));
// do work
if (!loader->m_options.dry_run()) {
// load the data
load(session, loader, worker);
}
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "BEFORE_LOAD_END"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())}}));
} catch (const std::exception &e) {
handle_current_exception(worker, loader,
shcore::str_format("While loading %s: %s",
masked_path.c_str(), e.what()));
return false;
}
log_debug("worker%zu done", id());
// signal for more work
loader->post_worker_event(worker, Worker_event::LOAD_END);
return true;
}
std::string Dump_loader::Worker::Load_chunk_task::query_comment() const {
std::string query_comment =
"/* mysqlsh loadDump(), thread " + std::to_string(id()) + ", table " +
shcore::str_replace(
"`" + schema() + "`.`" + table() + "`" +
(partition().empty() ? "" : ".`" + partition() + "`"),
"*/", "*\\/");
if (chunk_index() >= 0) {
query_comment += ", chunk ID: " + std::to_string(chunk_index());
}
query_comment += " */ ";
return query_comment;
}
void Dump_loader::Worker::Load_chunk_task::load(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Dump_loader *loader, Worker *worker) {
import_table::Import_table_options import_options;
import_table::Import_table_options::options().unpack(m_options,
&import_options);
// replace duplicate rows by default
import_options.set_replace_duplicates(true);
import_options.set_base_session(session);
import_options.set_verbose(false);
import_options.set_partition(partition());
import_table::Stats stats;
if (m_resume) {
// Truncate the table if its not chunked, but if it's chunked leave it
// and rely on duplicate rows being ignored.
if (chunk_index() < 0 && !has_pke(session.get(), schema(), table())) {
current_console()->print_note(shcore::str_format(
"Truncating table `%s`.`%s` because of resume and it "
"has no PK or equivalent key",
schema().c_str(), table().c_str()));
Dump_loader::executef(session, "TRUNCATE TABLE !.!", schema(), table());
m_bytes_to_skip = 0;
}
}
std::atomic<size_t> num_file_bytes_loaded{0};
import_table::Load_data_worker op(
import_options, id(), &loader->m_num_bytes_loaded, &num_file_bytes_loaded,
&loader->m_worker_hard_interrupt, nullptr, &loader->m_thread_exceptions,
&stats, query_comment());
loader->m_num_threads_loading++;
shcore::on_leave_scope cleanup([this, loader]() {
std::lock_guard<std::mutex> lock(loader->m_tables_being_loaded_mutex);
auto it = loader->m_tables_being_loaded.find(key());
while (it != loader->m_tables_being_loaded.end() && it->first == key()) {
if (it->second == raw_bytes_loaded) {
loader->m_tables_being_loaded.erase(it);
break;
}
++it;
}
loader->m_num_threads_loading--;
});
{
mysqlshdk::storage::Compression compr;
try {
compr = mysqlshdk::storage::from_extension(
std::get<1>(shcore::path::split_extension(m_file->filename())));
} catch (...) {
compr = mysqlshdk::storage::Compression::NONE;
}
// If max_transaction_size > 0, chunk truncation is enabled, where LOAD
// DATA will be truncated if the transaction size exceeds that value and
// then retried until the whole chunk is loaded.
//
// The max. transaction size depends on server options like
// max_binlog_cache_size and gr_transaction_size_limit. However, it's not
// straightforward to calculate the transaction size from CSV data (at least
// not without a lot of effort and cpu cycles). Thus, we instead use a
// different approach where we assume that the value of bytesPerChunk used
// during dumping will be sufficiently small to fit in a transaction. If any
// chunks are bigger than that value (because approximations made during
// dumping were not good), we will break them down further here during
// loading.
// Ideally, only chunks that are much bigger than the specified
// bytesPerChunk value will get sub-chunked, chunks that are smaller or just
// a little bigger will be loaded whole. If they still don't fit, the user
// should dump with a smaller bytesPerChunk value.
const auto max_bytes_per_transaction =
loader->m_options.max_bytes_per_transaction();
import_table::Transaction_options options;
// if the maxBytesPerTransaction is not given, it defaults to bytesPerChunk
// value used during the dump
options.max_trx_size =
max_bytes_per_transaction.value_or(loader->m_dump->bytes_per_chunk());
uint64_t max_chunk_size = options.max_trx_size;
// if maxBytesPerTransaction is not given, only files bigger than
// k_chunk_size_overshoot_tolerance * bytesPerChunk are affected
if (!max_bytes_per_transaction.has_value()) {
max_chunk_size *= k_chunk_size_overshoot_tolerance;
}
Index_file idx_file{m_file.get()};
if (options.max_trx_size > 0) {
bool valid = false;
auto chunk_file_size =
loader->m_dump->chunk_size(m_file->filename(), &valid);
if (!valid) {
// @.done.json not there yet, use the idx file directly
chunk_file_size = idx_file.data_size();
}
if (chunk_file_size < max_chunk_size) {
// chunk is small enough, so don't sub-chunk
options.max_trx_size = 0;
}
}
uint64_t subchunk = 0;
options.transaction_started = [this, &loader, &worker, &subchunk]() {
log_debug("Transaction for '%s'.'%s'/%zi subchunk %" PRIu64
" has started",
schema().c_str(), table().c_str(), chunk_index(), subchunk);
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "BEFORE_LOAD_SUBCHUNK_START"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())},
{"subchunk", std::to_string(subchunk)}}));
loader->post_worker_event(worker, Worker_event::LOAD_SUBCHUNK_START,
shcore::make_dict("subchunk", subchunk));
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "AFTER_LOAD_SUBCHUNK_START"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())},
{"subchunk", std::to_string(subchunk)}}));
};
options.transaction_finished = [this, &loader, &worker,
&subchunk](uint64_t bytes) {
log_debug("Transaction for '%s'.'%s'/%zi subchunk %" PRIu64
" has finished, wrote %" PRIu64 " bytes",
schema().c_str(), table().c_str(), chunk_index(), subchunk,
bytes);
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "BEFORE_LOAD_SUBCHUNK_END"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())},
{"subchunk", std::to_string(subchunk)}}));
loader->post_worker_event(
worker, Worker_event::LOAD_SUBCHUNK_END,
shcore::make_dict("subchunk", subchunk, "bytes", bytes));
FI_TRIGGER_TRAP(dump_loader,
mysqlshdk::utils::FI::Trigger_options(
{{"op", "AFTER_LOAD_SUBCHUNK_END"},
{"schema", schema()},
{"table", table()},
{"chunk", std::to_string(chunk_index())},
{"subchunk", std::to_string(subchunk)}}));
++subchunk;
};
options.skip_bytes = m_bytes_to_skip;
op.execute(session, mysqlshdk::storage::make_file(std::move(m_file), compr),
options);
}
if (loader->m_thread_exceptions[id()])
std::rethrow_exception(loader->m_thread_exceptions[id()]);
bytes_loaded = m_bytes_to_skip + stats.total_data_bytes;
rows_loaded = stats.total_records;
loader->m_num_raw_bytes_loaded += raw_bytes_loaded;
loader->m_num_chunks_loaded += 1;
loader->m_num_rows_loaded += rows_loaded;
loader->m_num_warnings += stats.total_warnings;
}
bool Dump_loader::Worker::Analyze_table_task::execute(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Worker *worker, Dump_loader *loader) {
log_debug("worker%zu will analyze table `%s`.`%s`", id(), schema().c_str(),
table().c_str());
auto console = current_console();
if (m_histograms.empty() ||
!histograms_supported(loader->m_options.target_server_version()))
log_info("Analyzing table `%s`.`%s`", schema().c_str(), table().c_str());
else
log_info("Updating histogram for table `%s`.`%s`", schema().c_str(),
table().c_str());
loader->post_worker_event(worker, Worker_event::ANALYZE_START);
// do work
try {
if (!loader->m_options.dry_run()) {
if (m_histograms.empty() ||
!histograms_supported(loader->m_options.target_server_version())) {
Dump_loader::executef(session, "ANALYZE TABLE !.!", schema(), table());
} else {
for (const auto &h : m_histograms) {
shcore::sqlstring q(
"ANALYZE TABLE !.! UPDATE HISTOGRAM ON ! WITH ? BUCKETS", 0);
q << schema() << table() << h.column << h.buckets;
std::string sql = q.str();
log_debug("Executing %s", sql.c_str());
Dump_loader::execute(session, sql);
}
}
}
} catch (const std::exception &e) {
handle_current_exception(
worker, loader,
shcore::str_format("While analyzing table `%s`.`%s`: %s",
schema().c_str(), table().c_str(), e.what()));
return false;
}
log_debug("worker%zu done", id());
++loader->m_tables_analyzed;
// signal for more work
loader->post_worker_event(worker, Worker_event::ANALYZE_END);
return true;
}
bool Dump_loader::Worker::Index_recreation_task::execute(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
Worker *worker, Dump_loader *loader) {
log_debug("worker%zu will recreate %zu indexes for table %s", id(),
m_indexes->size(), key().c_str());
const auto console = current_console();
if (!m_indexes->empty())
log_info("[Worker%03zu] Recreating indexes for %s", id(), key().c_str());
loader->post_worker_event(worker, Worker_event::INDEX_START);
// do work
if (!loader->m_options.dry_run()) {
loader->m_num_threads_recreating_indexes++;
shcore::on_leave_scope cleanup(
[loader]() { loader->m_num_threads_recreating_indexes--; });
// for the analysis of various index types and their impact on the parallel
// index creation see: BUG#33976718
std::list<std::vector<std::string_view>> batches;
{
const auto append = [](const std::vector<std::string> &what,
std::vector<std::string_view> *where) {
where->insert(where->end(), what.begin(), what.end());
};
auto &first_batch = batches.emplace_back();
// BUG#34787778 - fulltext indexes need to be added one at a time
if (!m_indexes->fulltext.empty()) {
first_batch.emplace_back(m_indexes->fulltext.front());
}
if (!m_indexes->spatial.empty()) {
// we load all indexes at once if:
// - server does not support parallel index creation
auto single_batch =
loader->m_options.target_server_version() < Version(8, 0, 27);
// - table has a virtual column
single_batch |= m_indexes->has_virtual_columns;
// - table has a fulltext index
single_batch |= !m_indexes->fulltext.empty();
// if server supports parallel index creation and table does not have
// virtual columns or fulltext indexes, we add spatial indexes in
// another batch
append(m_indexes->spatial,
single_batch ? &first_batch : &batches.emplace_back());
}
append(m_indexes->regular, &first_batch);
// BUG#34787778 - fulltext indexes need to be added one at a time
if (!m_indexes->fulltext.empty()) {
for (auto it = std::next(m_indexes->fulltext.begin()),
end = m_indexes->fulltext.end();
it != end; ++it) {
batches.emplace_back().emplace_back(*it);
}
}
}
try {
auto current = batches.begin();
const auto end = batches.end();
while (end != current) {
auto query = "ALTER TABLE " + key() + " ";
for (const auto &definition : *current) {
query += "ADD ";
query += definition;
query += ',';
}
// remove last comma
query.pop_back();
auto retry = false;
try {
execute_statement(
session, query, "While recreating indexes for table " + key(),
current->size() <= 1 ? -1 : ER_TEMP_FILE_WRITE_FAILURE);
loader->m_indexes_recreated += current->size();
} catch (const shcore::Error &e) {
if (e.code() == ER_TEMP_FILE_WRITE_FAILURE) {
if (current->size() <= 1) {
// we cannot split any more, report the error
throw;
} else {
log_info(
"Failed to add indexes: the innodb_tmpdir is full, failed "
"query: %s",
query.c_str());
// split this batch in two and retry
const auto new_size = current->size() / 2;
std::vector<std::string_view> new_batch;
std::move(std::next(current->begin(), new_size), current->end(),
std::back_inserter(new_batch));
current->resize(new_size);
batches.insert(std::next(current), std::move(new_batch));
retry = true;
++loader->m_num_index_retries;
}
} else {
throw;
}
}
if (!retry) {
++current;
}
}
} catch (const std::exception &e) {
handle_current_exception(
worker, loader,
shcore::str_format("While recreating indexes for table %s: %s",
key().c_str(), e.what()));
return false;
}
}
log_debug("worker%zu done", id());
// signal for more work
loader->post_worker_event(worker, Worker_event::INDEX_END);
return true;
}
Dump_loader::Worker::Worker(size_t id, Dump_loader *owner)
: m_id(id), m_owner(owner), m_connection_id(0) {}
void Dump_loader::Worker::run() {
try {
do_run();
} catch (const mysqlshdk::db::Error &e) {
handle_current_exception(e.format());
} catch (const shcore::Error &e) {
handle_current_exception(e.format());
} catch (const std::exception &e) {
handle_current_exception(e.what());
} catch (...) {
handle_current_exception("Unknown exception");
}
}
void Dump_loader::Worker::do_run() {
auto console = current_console();
try {
connect();
} catch (const shcore::Error &e) {
handle_current_exception("Error opening connection to MySQL: " +
e.format());
return;
}
for (;;) {
m_owner->post_worker_event(this, Worker_event::READY);
// wait for signal that there's work to do... false means stop worker
bool work = m_work_ready.pop();
if (!work || m_owner->m_worker_interrupt) {
m_owner->post_worker_event(this, Worker_event::EXIT);
break;
}
assert(std::numeric_limits<size_t>::max() != m_task->id());
if (!m_task->execute(m_session, this, m_owner)) break;
}
}
void Dump_loader::Worker::stop() { m_work_ready.push(false); }
void Dump_loader::Worker::connect() {
m_session = m_owner->create_session();
m_connection_id = m_session->get_connection_id();
}
void Dump_loader::Worker::schedule(std::unique_ptr<Task> task) {
task->set_id(m_id);
m_task = std::move(task);
m_work_ready.push(true);
}
void Dump_loader::Worker::handle_current_exception(Dump_loader *loader,
const std::string &error) {
const auto id = this->id();
if (!loader->m_thread_exceptions[id]) {
current_console()->print_error(
shcore::str_format("[Worker%03zu] %s", id, error.c_str()));
loader->m_thread_exceptions[id] = std::current_exception();
}
loader->m_num_errors += 1;
loader->m_worker_interrupt = true;
loader->post_worker_event(this, Worker_event::FATAL_ERROR);
}
// ----
Dump_loader::Dump_loader(const Load_dump_options &options)
: m_options(options),
m_num_threads_loading(0),
m_num_threads_recreating_indexes(0),
m_character_set(options.character_set()),
m_num_rows_loaded(0),
m_num_bytes_loaded(0),
m_num_raw_bytes_loaded(0),
m_num_chunks_loaded(0),
m_num_warnings(0),
m_num_errors(0),
m_progress_thread("Load dump", options.show_progress()) {
if (m_options.ignore_version()) {
m_default_sql_transforms.add_strip_removed_sql_modes();
}
}
Dump_loader::~Dump_loader() {}
std::shared_ptr<mysqlshdk::db::mysql::Session> Dump_loader::create_session() {
auto session = establish_session(m_options.connection_options(), false);
// Make sure we don't get affected by user customizations of sql_mode
execute(session, "SET SQL_MODE = 'NO_AUTO_VALUE_ON_ZERO'");
// Set timeouts to larger values since worker threads may get stuck
// downloading data for some time before they have a chance to get back to
// doing MySQL work.
executef(session, "SET SESSION net_read_timeout = ?",
k_mysql_server_net_read_timeout);
// This is the time until the server kicks out idle connections. Our
// connections should last for as long as the dump lasts even if they're
// idle.
executef(session, "SET SESSION wait_timeout = ?",
k_mysql_server_wait_timeout);
// Disable binlog if requested by user
if (m_options.skip_binlog()) {
try {
execute(session, "SET sql_log_bin=0");
} catch (const mysqlshdk::db::Error &e) {
THROW_ERROR(SHERR_LOAD_FAILED_TO_DISABLE_BINLOG, e.format().c_str());
}
}
execute(session, "SET foreign_key_checks = 0");
execute(session, "SET unique_checks = 0");
if (!m_character_set.empty())
executef(session, "SET NAMES ?", m_character_set);
if (m_dump->tz_utc()) execute(session, "SET TIME_ZONE='+00:00'");
if (m_options.load_ddl() && m_options.auto_create_pks_supported()) {
// target server supports automatic creation of primary keys, we need to
// explicitly set the value of session variable, so we won't end up creating
// primary keys when user doesn't want to do that
const auto create_pks = should_create_pks();
// we toggle the session variable only if the global value is different from
// the value requested by the user, as it requires at least
// SESSION_VARIABLES_ADMIN privilege; in case of MDS (where users do no
// have this privilege) we expect that user has set the appropriate
// compatibility option during the dump and this variable is not going to be
// toggled
if (m_options.sql_generate_invisible_primary_key() != create_pks) {
executef(session, "SET @@SESSION.sql_generate_invisible_primary_key=?",
create_pks);
}
}
try {
for (const auto &s : m_options.session_init_sql()) {
log_info("Executing custom session init SQL: %s", s.c_str());
session->execute(s);
}
} catch (const shcore::Error &e) {
throw shcore::Exception::runtime_error(
"Error while executing sessionInitSql: " + e.format());
}
return std::dynamic_pointer_cast<mysqlshdk::db::mysql::Session>(session);
}
std::function<bool(const std::string &, const std::string &)>
Dump_loader::filter_user_script_for_mds() const {
// In MDS, the list of grants that a service user can have is restricted,
// even in "root" accounts.
// User accounts have at most:
// - a subset of global privileges
// - full DB privileges on any DB
// - a subset of DB privileges on mysql.* and sys.*
// Trying to grant or revoke any of those will result in an error, because
// the user doing the load will lack these privileges.
// Global privileges are stripped during dump with the
// strip_restricted_grants compat option, but revokes have to be stripped
// at load time. This is because:
// - we can't revoke what we don't have
// - anything we don't have will be implicitly revoked anyway
// Thus, stripping privs during dump will only work as intended if it's
// loaded in MDS, where the implicit revokes will happen. If a stripped
// dump is loaded by a root user in a non-MDS instance, accounts
// can end up without the expected original revokes.
mysqlshdk::mysql::Instance instance(m_session);
auto allowed_global_grants =
mysqlshdk::mysql::get_global_grants(instance, "administrator", "%");
if (allowed_global_grants.empty())
current_console()->print_warning("`administrator` role not found!");
// get list of DB level privileges revoked from the administrator role
auto restrictions =
mysqlshdk::mysql::get_user_restrictions(instance, "administrator", "%");
#ifndef NDEBUG
for (const auto &r : restrictions) {
log_debug("Restrictions: schema=%s privileges=%s", r.first.c_str(),
shcore::str_join(r.second, ", ").c_str());
}
#endif
// filter the users script
return [restrictions = std::move(restrictions),
allowed_global_grants = std::move(allowed_global_grants)](
const std::string &priv_type, const std::string &priv_level) {
// strip global privileges
if (priv_level == "*.*") {
// return true if the priv should be stripped
return std::find_if(allowed_global_grants.begin(),
allowed_global_grants.end(),
[&priv_type](const std::string &priv) {
return shcore::str_caseeq(priv, priv_type);
}) == allowed_global_grants.end();
}
std::string schema, object;
shcore::split_priv_level(priv_level, &schema, &object);
if (object.empty() && schema == "*") return false;
// strip DB privileges
// only schema.* revokes are expected, this needs to be reviewed if
// object specific revokes are ever added
for (const auto &r : restrictions) {
if (r.first == schema) {
// return true if the priv should be stripped
return std::find_if(r.second.begin(), r.second.end(),
[&priv_type](const std::string &priv) {
return shcore::str_caseeq(priv, priv_type);
}) != r.second.end();
}
}
return false;
};
}
void Dump_loader::on_dump_begin() {
std::string pre_script = m_dump->begin_script();
current_console()->print_status("Executing common preamble SQL");
if (!m_options.dry_run())
execute_script(m_session, pre_script, "While executing preamble SQL",
m_default_sql_transforms);
}
void Dump_loader::on_dump_end() {
std::string post_script = m_dump->end_script();
// Execute schema end scripts
for (const std::string &schema : m_dump->schemas()) {
on_schema_end(schema);
}
current_console()->print_status("Executing common postamble SQL");
if (!m_options.dry_run())
execute_script(m_session, post_script, "While executing postamble SQL",
m_default_sql_transforms);
// Update GTID_PURGED only when requested by the user
if (m_options.update_gtid_set() != Load_dump_options::Update_gtid_set::OFF) {
auto status = m_load_log->gtid_update_status();
if (status == Load_progress_log::Status::DONE) {
current_console()->print_status("GTID_PURGED already updated");
log_info("GTID_PURGED already updated");
} else if (!m_dump->gtid_executed().empty()) {
if (m_dump->gtid_executed_inconsistent()) {
current_console()->print_warning(
"The gtid update requested, but gtid_executed was not guaranteed "
"to be consistent during the dump");
}
try {
m_load_log->start_gtid_update();
const auto query = m_options.is_mds() ? "CALL sys.set_gtid_purged(?)"
: "SET GLOBAL GTID_PURGED=?";
if (m_options.update_gtid_set() ==
Load_dump_options::Update_gtid_set::REPLACE) {
current_console()->print_status(
"Resetting GTID_PURGED to dumped gtid set");
log_info("Setting GTID_PURGED to %s",
m_dump->gtid_executed().c_str());
if (!m_options.dry_run()) {
executef(query, m_dump->gtid_executed());
}
} else {
current_console()->print_status(
"Appending dumped gtid set to GTID_PURGED");
log_info("Appending %s to GTID_PURGED",
m_dump->gtid_executed().c_str());
if (!m_options.dry_run()) {
executef(query, "+" + m_dump->gtid_executed());
}
}
m_load_log->end_gtid_update();
} catch (const std::exception &e) {
current_console()->print_error(
std::string("Error while updating GTID_PURGED: ") + e.what());
throw;
}
} else {
current_console()->print_warning(
"gtid update requested but, gtid_executed not set in dump");
}
}
// check if redo log is disabled and print a reminder if so
auto res = query(
"SELECT VARIABLE_VALUE = 'OFF' FROM "
"performance_schema.global_status "
"WHERE variable_name = 'Innodb_redo_log_enabled'");
if (auto row = res->fetch_one()) {
if (row->get_int(0, 0)) {
current_console()->print_note(
"The redo log is currently disabled, which causes MySQL to not be "
"crash safe! Do not forget to enable it again before putting this "
"instance in production.");
}
}
}
void Dump_loader::on_schema_end(const std::string &schema) {
if (m_options.load_deferred_indexes()) {
const auto &fks = m_dump->deferred_schema_fks(schema);
if (!fks.empty()) {
log_info("Recreating FOREIGN KEY constraints for schema %s",
shcore::quote_identifier(schema).c_str());
if (!m_options.dry_run()) {
for (const auto &q : fks) {
try {
execute(q);
} catch (const std::exception &e) {
current_console()->print_error(
"Error while restoring FOREIGN KEY constraint in schema `" +
schema + "` with query: " + q);
throw;
}
}
}
}
}
std::list<Dump_reader::Name_and_file> triggers;
m_dump->schema_table_triggers(schema, &triggers);
for (const auto &it : triggers) {
const auto &table = it.first;
const auto status = m_load_log->triggers_ddl_status(schema, table);
log_debug("Triggers DDL for `%s`.`%s` (%s)", schema.c_str(), table.c_str(),
to_string(status).c_str());
if (m_options.load_ddl()) {
m_load_log->start_triggers_ddl(schema, table);
if (status != Load_progress_log::DONE) {
log_info("Executing triggers SQL for `%s`.`%s`", schema.c_str(),
table.c_str());
it.second->open(mysqlshdk::storage::Mode::READ);
std::string script = mysqlshdk::storage::read_file(it.second.get());
it.second->close();
if (!m_options.dry_run()) {
executef("USE !", schema);
auto transforms = m_default_sql_transforms;
transforms.add_execute_conditionally(
[this, &schema, &table](std::string_view type,
const std::string &name) {
if (!shcore::str_caseeq(type, "TRIGGER")) return true;
return m_dump->include_trigger(schema, table, name);
});
execute_script(m_session, script, "While executing triggers SQL",
transforms);
}
}
m_load_log->end_triggers_ddl(schema, table);
}
}
{
const auto &queries = m_dump->queries_on_schema_end(schema);
if (!queries.empty()) {
log_info("Executing finalization queries for schema %s",
shcore::quote_identifier(schema).c_str());
if (!m_options.dry_run()) {
for (const auto &q : queries) {
try {
execute(q);
} catch (const std::exception &e) {
current_console()->print_error(
"Error while executing finalization queries for schema `" +
schema + "` with query: " + q);
throw;
}
}
}
}
}
}
void Dump_loader::switch_schema(const std::string &schema, bool load_done) {
if (!m_options.dry_run()) {
try {
executef("use !", schema.c_str());
} catch (const std::exception &e) {
current_console()->print_error(shcore::str_format(
"Unable to use schema `%s`%s, error message: %s", schema.c_str(),
load_done ? " that according to load status was already created, "
"consider resetting progress"
: "",
e.what()));
if (!m_options.force()) throw;
add_skipped_schema(schema);
}
}
}
bool Dump_loader::should_fetch_table_ddl(bool placeholder) const {
return m_options.load_ddl() ||
(!placeholder && m_options.load_deferred_indexes());
}
bool Dump_loader::handle_table_data() {
std::unique_ptr<mysqlshdk::storage::IFile> data_file;
bool scheduled = false;
bool chunked = false;
size_t index = 0;
size_t total = 0;
size_t size = 0;
std::unordered_multimap<std::string, size_t> tables_being_loaded;
std::string schema;
std::string table;
std::string partition;
shcore::Dictionary_t options;
// Note: job scheduling should preferably load different tables per thread,
// each partition is treated as a different table
do {
{
std::lock_guard<std::mutex> lock(m_tables_being_loaded_mutex);
tables_being_loaded = m_tables_being_loaded;
}
if (m_dump->next_table_chunk(tables_being_loaded, &schema, &table,
&partition, &chunked, &index, &total,
&data_file, &size, &options)) {
const auto chunk = chunked ? index : -1;
auto status =
m_load_log->table_chunk_status(schema, table, partition, chunk);
if (status == Load_progress_log::DONE) {
m_dump->on_chunk_loaded(schema, table, partition);
}
log_debug("Table data for %s (%s)",
format_table(schema, table, partition, chunk).c_str(),
to_string(status).c_str());
uint64_t bytes_to_skip = 0;
// if task was interrupted, check if any of the subchunks were loaded, if
// yes then we need to skip them
if (status == Load_progress_log::INTERRUPTED) {
uint64_t subchunk = 0;
while (m_load_log->table_subchunk_status(schema, table, partition,
chunk, subchunk) ==
Load_progress_log::DONE) {
bytes_to_skip += m_load_log->table_subchunk_size(
schema, table, partition, chunk, subchunk);
++subchunk;
}
if (subchunk > 0) {
log_debug(
"Loading table data for %s was interrupted after "
"%" PRIu64 " subchunks were loaded, skipping %" PRIu64 " bytes",
format_table(schema, table, partition, chunk).c_str(), subchunk,
bytes_to_skip);
}
}
if (m_options.load_data()) {
if (status != Load_progress_log::DONE) {
scheduled = schedule_table_chunk(
schema, table, partition, chunk, std::move(data_file), size,
options, status == Load_progress_log::INTERRUPTED, bytes_to_skip);
}
}
} else {
scheduled = false;
break;
}
} while (!scheduled);
return scheduled;
}
bool Dump_loader::schedule_table_chunk(
const std::string &schema, const std::string &table,
const std::string &partition, ssize_t chunk_index,
std::unique_ptr<mysqlshdk::storage::IFile> file, size_t size,
shcore::Dictionary_t options, bool resuming, uint64_t bytes_to_skip) {
{
std::lock_guard<std::recursive_mutex> lock(m_skip_schemas_mutex);
if (m_skip_schemas.find(schema) != m_skip_schemas.end() ||
m_skip_tables.find(schema_object_key(schema, table)) !=
m_skip_tables.end() ||
!m_dump->include_table(schema, table))
return false;
}
{
std::lock_guard<std::mutex> lock(m_tables_being_loaded_mutex);
m_tables_being_loaded.emplace(
schema_table_object_key(schema, table, partition), file->file_size());
}
log_debug("Scheduling chunk for table %s (%s)",
format_table(schema, table, partition, chunk_index).c_str(),
file->full_path().masked().c_str());
push_pending_task(load_chunk_file(schema, table, partition, std::move(file),
chunk_index, size, options, resuming,
bytes_to_skip));
return true;
}
size_t Dump_loader::handle_worker_events(
const std::function<bool()> &schedule_next) {
const auto to_string = [](Worker_event::Event event) {
switch (event) {
case Worker_event::Event::READY:
return "READY";
case Worker_event::Event::FATAL_ERROR:
return "FATAL_ERROR";
case Worker_event::Event::SCHEMA_DDL_START:
return "SCHEMA_DDL_START";
case Worker_event::Event::SCHEMA_DDL_END:
return "SCHEMA_DDL_END";
case Worker_event::Event::TABLE_DDL_START:
return "TABLE_DDL_START";
case Worker_event::Event::TABLE_DDL_END:
return "TABLE_DDL_END";
case Worker_event::Event::LOAD_START:
return "LOAD_START";
case Worker_event::Event::LOAD_END:
return "LOAD_END";
case Worker_event::Event::INDEX_START:
return "INDEX_START";
case Worker_event::Event::INDEX_END:
return "INDEX_END";
case Worker_event::Event::ANALYZE_START:
return "ANALYZE_START";
case Worker_event::Event::ANALYZE_END:
return "ANALYZE_END";
case Worker_event::Event::EXIT:
return "EXIT";
case Worker_event::Event::LOAD_SUBCHUNK_START:
return "LOAD_SUBCHUNK_START";
case Worker_event::Event::LOAD_SUBCHUNK_END:
return "LOAD_SUBCHUNK_END";
}
return "";
};
std::list<Worker *> idle_workers;
const auto thread_count = m_options.threads_count();
while (idle_workers.size() < m_workers.size()) {
Worker_event event;
// Wait for events from workers, but update progress and check for ^C
// every now and then
for (;;) {
auto event_opt = m_worker_events.try_pop(std::chrono::seconds{1});
if (event_opt && event_opt->worker) {
event = std::move(*event_opt);
break;
}
}
log_debug2("Got event %s from worker %zi", to_string(event.event),
event.worker->id());
switch (event.event) {
case Worker_event::LOAD_START: {
auto task = static_cast<Worker::Load_chunk_task *>(
event.worker->current_task());
on_chunk_load_start(task->schema(), task->table(), task->partition(),
task->chunk_index());
break;
}
case Worker_event::LOAD_END: {
auto task = static_cast<Worker::Load_chunk_task *>(
event.worker->current_task());
on_chunk_load_end(task->schema(), task->table(), task->partition(),
task->chunk_index(), task->bytes_loaded,
task->raw_bytes_loaded, task->rows_loaded);
break;
}
case Worker_event::LOAD_SUBCHUNK_START: {
const auto task = static_cast<Worker::Load_chunk_task *>(
event.worker->current_task());
on_subchunk_load_start(task->schema(), task->table(), task->partition(),
task->chunk_index(),
event.details->get_uint("subchunk"));
break;
}
case Worker_event::LOAD_SUBCHUNK_END: {
const auto task = static_cast<Worker::Load_chunk_task *>(
event.worker->current_task());
on_subchunk_load_end(task->schema(), task->table(), task->partition(),
task->chunk_index(),
event.details->get_uint("subchunk"),
event.details->get_uint("bytes"));
break;
}
case Worker_event::SCHEMA_DDL_START: {
auto task = static_cast<Worker::Schema_ddl_task *>(
event.worker->current_task());
on_schema_ddl_start(task->schema());
break;
}
case Worker_event::SCHEMA_DDL_END: {
auto task = static_cast<Worker::Schema_ddl_task *>(
event.worker->current_task());
on_schema_ddl_end(task->schema());
break;
}
case Worker_event::TABLE_DDL_START: {
auto task =
static_cast<Worker::Table_ddl_task *>(event.worker->current_task());
on_table_ddl_start(task->schema(), task->table(), task->placeholder());
break;
}
case Worker_event::TABLE_DDL_END: {
auto task =
static_cast<Worker::Table_ddl_task *>(event.worker->current_task());
on_table_ddl_end(task->schema(), task->table(), task->placeholder(),
task->steal_deferred_statements());
break;
}
case Worker_event::INDEX_START: {
const auto task = event.worker->current_task();
on_index_start(task->schema(), task->table());
break;
}
case Worker_event::INDEX_END: {
const auto task = event.worker->current_task();
on_index_end(task->schema(), task->table());
break;
}
case Worker_event::ANALYZE_START: {
const auto task = event.worker->current_task();
on_analyze_start(task->schema(), task->table());
break;
}
case Worker_event::ANALYZE_END: {
const auto task = event.worker->current_task();
on_analyze_end(task->schema(), task->table());
break;
}
case Worker_event::READY:
if (const auto task = event.worker->current_task()) {
m_current_weight -= task->weight();
task->done();
}
break;
case Worker_event::FATAL_ERROR:
if (!m_abort) {
current_console()->print_error("Aborting load...");
}
m_worker_interrupt = true;
m_abort = true;
clear_worker(event.worker);
break;
case Worker_event::EXIT:
clear_worker(event.worker);
break;
}
// schedule more work if the worker became free
if (event.event == Worker_event::READY) {
// no more work to do
if (m_worker_interrupt || (!schedule_next() && m_pending_tasks.empty())) {
idle_workers.push_back(event.worker);
} else {
assert(!m_pending_tasks.empty());
const auto pending_weight = m_pending_tasks.top()->weight();
if (m_current_weight + pending_weight > thread_count) {
// the task is too heavy, wait till more threads are idle
idle_workers.push_back(event.worker);
} else {
event.worker->schedule(m_pending_tasks.pop_top());
m_current_weight += pending_weight;
}
}
}
}
size_t num_idle_workers = idle_workers.size();
// put all idle workers back into the queue, so that they can get assigned
// new tasks if more becomes available later
for (auto *worker : idle_workers) {
m_worker_events.push({Worker_event::READY, worker, {}});
}
return num_idle_workers;
}
bool Dump_loader::schedule_next_task() {
if (!handle_table_data()) {
std::string schema;
std::string table;
if (m_options.load_deferred_indexes()) {
setup_create_indexes_progress();
if (is_data_load_complete()) {
m_index_count_is_known = true;
}
compatibility::Deferred_statements::Index_info *indexes = nullptr;
if (m_dump->next_deferred_index(&schema, &table, &indexes)) {
assert(indexes != nullptr);
push_pending_task(recreate_indexes(schema, table, indexes));
return true;
}
}
const auto analyze_tables = m_options.analyze_tables();
if (Load_dump_options::Analyze_table_mode::OFF != analyze_tables) {
setup_analyze_tables_progress();
std::vector<Dump_reader::Histogram> histograms;
if (m_dump->next_table_analyze(&schema, &table, &histograms)) {
// if Analyze_table_mode is HISTOGRAM, only analyze tables with
// histogram info in the dump
if (Load_dump_options::Analyze_table_mode::ON == analyze_tables ||
!histograms.empty()) {
++m_tables_to_analyze;
push_pending_task(analyze_table(schema, table, histograms));
return true;
}
} else if (is_data_load_complete()) {
m_all_analyze_tasks_scheduled = true;
}
}
return false;
} else {
return true;
}
}
void Dump_loader::interrupt() {
// 1st ^C does a soft interrupt (stop new tasks but let current work finish)
// 2nd ^C sends kill to all workers
if (!m_worker_interrupt) {
m_worker_interrupt = true;
current_console()->print_info(
"^C -- Load interrupted. Canceling remaining work. "
"Press ^C again to abort current tasks and rollback active "
"transactions (slow).");
} else {
m_worker_hard_interrupt = true;
current_console()->print_info(
"^C -- Aborting active transactions. This may take a while...");
}
}
void Dump_loader::run() {
try {
m_progress_thread.start();
open_dump();
spawn_workers();
{
shcore::on_leave_scope cleanup([this]() {
join_workers();
m_progress_thread.finish();
});
execute_tasks();
}
} catch (...) {
translate_current_exception(m_progress_thread);
}
show_summary();
if (m_worker_interrupt && !m_abort) {
// If interrupted by the user and not by a fatal error
throw std::runtime_error("Aborted");
}
for (const auto &e : m_thread_exceptions) {
if (e) {
THROW_ERROR0(SHERR_LOAD_WORKER_THREAD_FATAL_ERROR);
}
}
}
void Dump_loader::show_summary() {
using namespace mysqlshdk::utils;
const auto console = current_console();
if (m_num_rows_loaded == 0) {
if (m_resuming)
console->print_info("There was no remaining data left to be loaded.");
else
console->print_info("No data loaded.");
} else {
const auto seconds = m_progress_thread.duration().seconds();
const auto load_seconds = m_load_data_stage->duration().seconds();
console->print_info(shcore::str_format(
"%zi chunks (%s, %s) for %" PRIu64
" tables in %zi schemas were "
"loaded in %s (avg throughput %s)",
m_num_chunks_loaded.load(),
format_items("rows", "rows", m_num_rows_loaded.load()).c_str(),
format_bytes(m_num_bytes_loaded.load()).c_str(),
m_dump->tables_to_load(), m_dump->schemas().size(),
format_seconds(seconds, false).c_str(),
format_throughput_bytes(
m_num_bytes_loaded.load() - m_num_bytes_previously_loaded,
load_seconds)
.c_str()));
}
if (m_options.load_users()) {
std::string msg =
std::to_string(m_loaded_accounts) + " accounts were loaded";
if (m_ignored_grant_errors) {
msg += ", " + std::to_string(m_ignored_grant_errors) +
" GRANT statement errors were ignored";
}
if (m_dropped_accounts) {
msg += ", " + std::to_string(m_dropped_accounts) +
" accounts were dropped due to GRANT statement errors";
}
console->print_info(msg);
}
if (m_num_errors > 0) {
console->print_info(shcore::str_format(
"%zi errors and %zi warnings messages were reported during the load.",
m_num_errors.load(), m_num_warnings.load()));
} else {
console->print_info(shcore::str_format(
"%zi warnings were reported during the load.", m_num_warnings.load()));
}
if (m_num_index_retries) {
console->print_info(
shcore::str_format("There were %zi retries to create indexes.",
m_num_index_retries.load()));
}
}
void Dump_loader::open_dump() { open_dump(m_options.create_dump_handle()); }
void Dump_loader::open_dump(
std::unique_ptr<mysqlshdk::storage::IDirectory> dumpdir) {
auto console = current_console();
m_dump = std::make_unique<Dump_reader>(std::move(dumpdir), m_options);
console->print_status("Opening dump...");
auto status = m_dump->open();
if (m_dump->dump_version().get_major() > k_supported_dump_version_major ||
(m_dump->dump_version().get_major() == k_supported_dump_version_major &&
m_dump->dump_version().get_minor() > k_supported_dump_version_minor)) {
console->print_error(
"Dump format has version " + m_dump->dump_version().get_full() +
" which is not supported by this version of MySQL Shell. "
"Please upgrade MySQL Shell to load it.");
THROW_ERROR0(SHERR_LOAD_UNSUPPORTED_DUMP_VERSION);
}
if (m_dump->dump_version() < Version(dump::Schema_dumper::version())) {
console->print_note(
"Dump format has version " + m_dump->dump_version().get_full() +
" and was created by an older version of MySQL Shell. "
"If you experience problems loading it, please recreate the dump using "
"the current version of MySQL Shell and try again.");
}
std::string missing_capabilities;
// 8.0.27 is the version where capabilities were added
Version minimum_version{8, 0, 27};
for (const auto &capability : m_dump->capabilities()) {
if (!dump::capability::is_supported(capability.id)) {
if (minimum_version < capability.version_required) {
minimum_version = capability.version_required;
}
missing_capabilities += "* ";
missing_capabilities += capability.description;
missing_capabilities += "\n\n";
}
}
if (!missing_capabilities.empty()) {
console->print_error(
"Dump is using capabilities which are not supported by this version of "
"MySQL Shell:\n\n" +
missing_capabilities +
"The minimum required version of MySQL Shell to load this dump is: " +
minimum_version.get_base() + ".");
THROW_ERROR0(SHERR_LOAD_UNSUPPORTED_DUMP_CAPABILITIES);
}
if (status != Dump_reader::Status::COMPLETE) {
if (m_options.dump_wait_timeout_ms() > 0) {
console->print_note(
"Dump is still ongoing, data will be loaded as it becomes "
"available.");
} else {
console->print_error(
"Dump is not yet finished. Use the 'waitDumpTimeout' option to "
"enable concurrent load and set a timeout for when we need to wait "
"for new data to become available.");
THROW_ERROR0(SHERR_LOAD_INCOMPLETE_DUMP);
}
}
m_dump->validate_options();
m_dump->show_metadata();
// Pick charset
if (m_character_set.empty()) {
m_character_set = m_dump->default_character_set();
}
}
void Dump_loader::check_server_version() {
const auto console = current_console();
const auto &target_server = m_options.target_server_version();
const auto mds = m_options.is_mds();
mysqlshdk::mysql::Instance session(m_options.base_session());
std::string msg = "Target is MySQL " + target_server.get_full();
if (mds) msg += " (MySQL HeatWave Service)";
msg +=
". Dump was produced from MySQL " + m_dump->server_version().get_full();
console->print_info(msg);
if (target_server < Version(5, 7, 0)) {
THROW_ERROR0(SHERR_LOAD_UNSUPPORTED_SERVER_VERSION);
}
if (mds && !m_dump->mds_compatibility()) {
msg =
"Destination is a MySQL HeatWave Service DB System instance but the "
"dump was produced without the compatibility option. ";
if (m_options.ignore_version()) {
msg +=
"The 'ignoreVersion' option is enabled, so loading anyway. If this "
"operation fails, create the dump once again with the 'ocimds' "
"option enabled.";
console->print_warning(msg);
} else {
msg +=
"Please enable the 'ocimds' option when dumping your database. "
"Alternatively, enable the 'ignoreVersion' option to load anyway.";
console->print_error(msg);
THROW_ERROR0(SHERR_LOAD_DUMP_NOT_MDS_COMPATIBLE);
}
}
if (target_server.get_major() != m_dump->server_version().get_major()) {
if (target_server.get_major() < m_dump->server_version().get_major())
msg =
"Destination MySQL version is older than the one where the dump "
"was created.";
else
msg =
"Destination MySQL version is newer than the one where the dump "
"was created.";
msg +=
" Loading dumps from different major MySQL versions is "
"not fully supported and may not work.";
if (m_options.ignore_version()) {
msg += " The 'ignoreVersion' option is enabled, so loading anyway.";
console->print_warning(msg);
} else {
msg += " Enable the 'ignoreVersion' option to load anyway.";
console->print_error(msg);
THROW_ERROR0(SHERR_LOAD_SERVER_VERSION_MISMATCH);
}
}
if (m_options.analyze_tables() ==
Load_dump_options::Analyze_table_mode::HISTOGRAM &&
!histograms_supported(target_server))
console->print_warning("Histogram creation enabled but MySQL Server " +
target_server.get_base() + " does not support it.");
if (m_options.update_gtid_set() != Load_dump_options::Update_gtid_set::OFF) {
// Check if group replication is running
bool group_replication_running = false;
try {
group_replication_running = session.queryf_one_int(
0, 0,
"select count(*) from performance_schema.replication_group_members "
"where MEMBER_ID = @@server_uuid AND MEMBER_STATE IS NOT NULL AND "
"MEMBER_STATE <> 'OFFLINE';");
} catch (...) {
}
if (group_replication_running) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_GR_IS_RUNNING);
}
if (target_server < Version(8, 0, 0)) {
if (m_options.update_gtid_set() ==
Load_dump_options::Update_gtid_set::APPEND) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_APPEND_NOT_SUPPORTED);
}
if (!m_options.skip_binlog()) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_REQUIRES_SKIP_BINLOG);
}
if (!session.queryf_one_int(0, 0,
"select @@global.gtid_executed = '' and "
"@@global.gtid_purged = ''")) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_REPLACE_REQUIRES_EMPTY_VARIABLES);
}
} else {
const char *g = m_dump->gtid_executed().c_str();
if (m_options.update_gtid_set() ==
Load_dump_options::Update_gtid_set::REPLACE) {
if (!session.queryf_one_int(
0, 0,
"select GTID_SUBTRACT(?, "
"GTID_SUBTRACT(@@global.gtid_executed, "
"@@global.gtid_purged)) = gtid_subtract(?, '')",
g, g)) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_REPLACE_SETS_INTERSECT);
}
if (!session.queryf_one_int(
0, 0, "select GTID_SUBSET(@@global.gtid_purged, ?);", g)) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_REPLACE_REQUIRES_SUPERSET);
}
} else if (!session.queryf_one_int(
0, 0,
"select GTID_SUBTRACT(@@global.gtid_executed, ?) = "
"@@global.gtid_executed",
g)) {
THROW_ERROR0(SHERR_LOAD_UPDATE_GTID_APPEND_SETS_INTERSECT);
}
}
}
if (should_create_pks() && target_server < Version(8, 0, 24)) {
THROW_ERROR0(SHERR_LOAD_INVISIBLE_PKS_UNSUPPORTED_SERVER_VERSION);
}
if (m_options.load_users() &&
m_dump->partial_revokes() != m_options.partial_revokes()) {
const auto status = [](bool b) { return b ? "enabled" : "disabled"; };
console->print_warning(shcore::str_format(
"The dump was created on an instance where the 'partial_revokes' "
"system variable was %s, however the target instance has it %s. GRANT "
"statements on object names with wildcard characters (%% or _) will "
"behave differently.",
status(m_dump->partial_revokes()),
status(m_options.partial_revokes())));
}
}
void Dump_loader::check_tables_without_primary_key() {
if (!m_options.load_ddl()) {
return;
}
if (m_options.is_mds() && m_dump->has_tables_without_pk()) {
bool warning = true;
std::string msg =
"The dump contains tables without Primary Keys and it is loaded with "
"the 'createInvisiblePKs' option set to ";
if (should_create_pks()) {
msg +=
"true, Inbound Replication into an MySQL HeatWave Service DB System "
"instance with High Availability can";
if (m_options.target_server_version() < Version(8, 0, 32)) {
msg += "not";
} else {
warning = false;
}
msg += " be used with this dump.";
} else {
msg +=
"false, this dump cannot be loaded into an MySQL HeatWave Service "
"DB System instance with High Availability.";
}
if (warning) {
current_console()->print_warning(msg);
} else {
current_console()->print_note(msg);
}
}
if (m_options.target_server_version() < Version(8, 0, 13) ||
should_create_pks()) {
return;
}
if (query("show variables like 'sql_require_primary_key';")
->fetch_one()
->get_string(1) != "ON")
return;
std::string tbs;
for (const auto &s : m_dump->tables_without_pk())
tbs += "schema " + shcore::quote_identifier(s.first) + ": " +
shcore::str_join(s.second, ", ") + "\n";
if (!tbs.empty()) {
const auto error_msg = shcore::str_format(
"The sql_require_primary_key option is enabled at the destination "
"server and one or more tables without a Primary Key were found in "
"the dump:\n%s\n"
"You must do one of the following to be able to load this dump:\n"
"- Add a Primary Key to the tables where it's missing\n"
"- Use the \"createInvisiblePKs\" option to automatically create "
"Primary Keys on a 8.0.24+ server\n"
"- Use the \"excludeTables\" option to load the dump without those "
"tables\n"
"- Disable the sql_require_primary_key sysvar at the server (note "
"that the underlying reason for the option to be enabled may still "
"prevent your database from functioning properly)",
tbs.c_str());
current_console()->print_error(error_msg);
THROW_ERROR0(SHERR_LOAD_REQUIRE_PRIMARY_KEY_ENABLED);
}
}
namespace {
std::vector<std::string> fetch_names(mysqlshdk::db::IResult *result) {
std::vector<std::string> names;
while (auto row = result->fetch_one()) {
names.push_back(row->get_string(0));
}
return names;
}
std::shared_ptr<mysqlshdk::db::IResult> query_names(
mysqlshdk::db::ISession *session, const std::string &schema,
const std::vector<std::string> &names, const std::string &query_prefix) {
std::string set = shcore::str_join(names, ",", [](const std::string &s) {
return shcore::quote_sql_string(s);
});
set = set.empty() ? "" : "(" + set + ")";
if (!set.empty())
return session->queryf(query_prefix + set, schema);
else
return {};
}
} // namespace
bool Dump_loader::report_duplicates(const std::string &what,
const std::string &schema,
mysqlshdk::db::IResult *result) {
bool has_duplicates = false;
while (auto row = result->fetch_one()) {
std::string name = row->get_string(0);
if (m_options.ignore_existing_objects())
current_console()->print_note("Schema `" + schema +
"` already contains a " + what + " named " +
name);
else
current_console()->print_error("Schema `" + schema +
"` already contains a " + what +
" named " + name);
has_duplicates = true;
}
return has_duplicates;
}
void Dump_loader::check_existing_objects() {
auto console = current_console();
console->print_status("Checking for pre-existing objects...");
bool has_duplicates = false;
if (m_options.load_users()) {
std::set<std::string> accounts;
for (const auto &a : m_dump->accounts()) {
if (m_options.filters().users().is_included(a))
accounts.emplace(shcore::str_lower(
shcore::str_format("'%s'@'%s'", a.user.c_str(), a.host.c_str())));
}
auto result = query(
"SELECT DISTINCT grantee FROM information_schema.user_privileges");
for (auto row = result->fetch_one(); row; row = result->fetch_one()) {
auto grantee = row->get_string(0);
if (accounts.count(shcore::str_lower(grantee))) {
if (m_options.ignore_existing_objects())
current_console()->print_note("Account " + grantee +
" already exists");
else
current_console()->print_error("Account " + grantee +
" already exists");
has_duplicates = true;
}
}
}
// Case handling:
// Partition, subpartition, column, index, stored routine, event, and
// resource group names are not case-sensitive on any platform, nor are
// column aliases. Schema, table and trigger names depend on the value of
// lower_case_table_names
// Get list of schemas being loaded that already exist
std::string set = shcore::str_join(
m_dump->schemas(), ",",
[](const std::string &s) { return shcore::quote_sql_string(s); });
if (set.empty()) return;
auto result = query(
"SELECT schema_name FROM information_schema.schemata"
" WHERE schema_name in (" +
set + ")");
std::vector<std::string> dup_schemas = fetch_names(result.get());
for (const auto &schema : dup_schemas) {
std::vector<std::string> tables;
std::vector<std::string> views;
std::vector<std::string> triggers;
std::vector<std::string> functions;
std::vector<std::string> procedures;
std::vector<std::string> events;
if (!m_dump->schema_objects(schema, &tables, &views, &triggers, &functions,
&procedures, &events))
continue;
result = query_names(m_session.get(), schema, tables,
"SELECT table_name FROM information_schema.tables"
" WHERE table_schema = ? AND table_name in ");
if (result)
has_duplicates |= report_duplicates("table", schema, result.get());
result = query_names(m_session.get(), schema, views,
"SELECT table_name FROM information_schema.views"
" WHERE table_schema = ? AND table_name in ");
if (result)
has_duplicates |= report_duplicates("view", schema, result.get());
result = query_names(m_session.get(), schema, triggers,
"SELECT trigger_name FROM information_schema.triggers"
" WHERE trigger_schema = ? AND trigger_name in ");
if (result)
has_duplicates |= report_duplicates("trigger", schema, result.get());
result =
query_names(m_session.get(), schema, functions,
"SELECT routine_name FROM information_schema.routines"
" WHERE routine_schema = ? AND routine_type = 'FUNCTION'"
" AND routine_name in ");
if (result)
has_duplicates |= report_duplicates("function", schema, result.get());
result =
query_names(m_session.get(), schema, procedures,
"SELECT routine_name FROM information_schema.routines"
" WHERE routine_schema = ? AND routine_type = 'PROCEDURE'"
" AND routine_name in ");
if (result)
has_duplicates |= report_duplicates("procedure", schema, result.get());
result = query_names(m_session.get(), schema, events,
"SELECT event_name FROM information_schema.events"
" WHERE event_schema = ? AND event_name in ");
if (result)
has_duplicates |= report_duplicates("event", schema, result.get());
}
if (has_duplicates) {
if (m_options.ignore_existing_objects()) {
console->print_note(
"One or more objects in the dump already exist in the destination "
"database but will be ignored because the 'ignoreExistingObjects' "
"option was enabled.");
} else {
console->print_error(
"One or more objects in the dump already exist in the destination "
"database. You must either DROP these objects or exclude them from "
"the load.");
THROW_ERROR0(SHERR_LOAD_DUPLICATE_OBJECTS_FOUND);
}
}
}
void Dump_loader::setup_progress_file(bool *out_is_resuming) {
auto console = current_console();
m_load_log = std::make_unique<Load_progress_log>();
if (!m_options.progress_file().has_value() ||
!m_options.progress_file()->empty()) {
auto progress_file = m_dump->create_progress_file_handle();
const auto path = progress_file->full_path().masked();
bool rewrite_on_flush = !progress_file->is_local();
auto progress = m_load_log->init(std::move(progress_file),
m_options.dry_run(), rewrite_on_flush);
if (progress.status != Load_progress_log::PENDING) {
if (!m_options.reset_progress()) {
console->print_note(
"Load progress file detected. Load will be resumed from where it "
"was left, assuming no external updates were made.");
console->print_info(
"You may enable the 'resetProgress' option to discard progress "
"for this MySQL instance and force it to be completely "
"reloaded.");
*out_is_resuming = true;
log_info("Resuming load, last loaded %s bytes",
std::to_string(progress.bytes_completed).c_str());
// Recall the partial progress that was made before
m_num_bytes_previously_loaded = progress.bytes_completed;
m_num_bytes_loaded.store(progress.bytes_completed);
m_num_raw_bytes_loaded.store(progress.raw_bytes_completed);
} else {
console->print_note(
"Load progress file detected for the instance but "
"'resetProgress' option was enabled. Load progress will be "
"discarded and the whole dump will be reloaded.");
m_load_log->reset_progress();
}
} else {
log_info("Logging load progress to %s", path.c_str());
}
m_load_log->set_server_uuid(m_options.server_uuid());
}
}
void Dump_loader::execute_threaded(const std::function<bool()> &schedule_next) {
do {
// handle events from workers and schedule more chunks when a worker
// becomes available
size_t num_idle_workers = handle_worker_events(schedule_next);
if (num_idle_workers == m_workers.size()) {
// make sure that there's really no more work. schedule_work() is
// supposed to just return false without doing anything. If it does
// something (and returns true), then we have a bug.
assert(!schedule_next() || m_worker_interrupt);
break;
}
} while (!m_worker_interrupt);
}
void Dump_loader::execute_table_ddl_tasks() {
m_ddl_executed = 0;
std::atomic<uint64_t> ddl_to_execute = 0;
std::atomic<bool> all_tasks_scheduled = false;
dump::Progress_thread::Progress_config config;
config.current = [this]() -> uint64_t { return m_ddl_executed; };
config.total = [&ddl_to_execute]() { return ddl_to_execute.load(); };
config.is_total_known = [&all_tasks_scheduled]() {
return all_tasks_scheduled.load();
};
const auto stage =
m_progress_thread.start_stage("Executing DDL", std::move(config));
shcore::on_leave_scope finish_stage([stage]() { stage->finish(); });
// Create all schemas, all tables and all view placeholders.
// Views and other objects must only be created after all
// tables/placeholders from all schemas are created, because there may be
// cross-schema references.
Load_progress_log::Status schema_load_status;
std::string schema;
std::list<Dump_reader::Name_and_file> tables;
std::list<Dump_reader::Name_and_file> view_placeholders;
const auto thread_pool_ptr = m_dump->create_thread_pool();
const auto pool = thread_pool_ptr.get();
shcore::Synchronized_queue<std::unique_ptr<Worker::Task>> worker_tasks;
const auto handle_ddl_files = [this, pool, &worker_tasks, &ddl_to_execute](
const std::string &s,
std::list<Dump_reader::Name_and_file> *list,
bool placeholder,
Load_progress_log::Status schema_status) {
// GCC 12 may warn about a possibly uninitialized usage of IFile in the lambda
// capture
#if __GNUC__ >= 12 && !defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
#endif
if (should_fetch_table_ddl(placeholder)) {
for (auto &item : *list) {
if (item.second) {
const auto status = placeholder
? schema_status
: m_load_log->table_ddl_status(s, item.first);
++ddl_to_execute;
pool->add_task(
[file = std::move(item.second), s, table = item.first]() {
log_debug("Fetching table DDL for %s.%s", s.c_str(),
table.c_str());
file->open(mysqlshdk::storage::Mode::READ);
auto script = mysqlshdk::storage::read_file(file.get());
file->close();
return script;
},
[s, table = item.first, placeholder, &worker_tasks,
status](std::string &&data) {
worker_tasks.push(std::make_unique<Worker::Table_ddl_task>(
s, table, std::move(data), placeholder, status));
});
}
}
}
#if __GNUC__ >= 12 && !defined(__clang__)
#pragma GCC diagnostic pop
#endif
};
log_debug("Begin loading table DDL");
pool->start_threads();
const auto &pool_status = pool->process_async();
while (!m_worker_interrupt) {
// fetch next schema and process it
if (!m_dump->next_schema_and_tables(&schema, &tables, &view_placeholders)) {
break;
}
schema_load_status = m_load_log->schema_ddl_status(schema);
log_debug("Schema DDL for '%s' (%s)", schema.c_str(),
to_string(schema_load_status).c_str());
bool is_schema_ready = true;
if (schema_load_status == Load_progress_log::DONE ||
!m_options.load_ddl()) {
// we track views together with the schema DDL, so no need to
// load placeholders if schemas was already loaded
} else {
handle_ddl_files(schema, &view_placeholders, true, schema_load_status);
++ddl_to_execute;
is_schema_ready = false;
pool->add_task(
[this, schema]() {
log_debug("Fetching schema DDL for %s", schema.c_str());
return m_dump->fetch_schema_script(schema);
},
[&worker_tasks, schema,
resuming =
schema_load_status ==
mysqlsh::Load_progress_log::INTERRUPTED](std::string &&data) {
worker_tasks.push(std::make_unique<Worker::Schema_ddl_task>(
schema, std::move(data), resuming),
shcore::Queue_priority::HIGH);
},
shcore::Thread_pool::Priority::HIGH);
}
m_schema_ddl_ready[schema] = is_schema_ready;
handle_ddl_files(schema, &tables, false, schema_load_status);
}
all_tasks_scheduled = true;
auto pending_tasks = ddl_to_execute.load();
pool->tasks_done();
{
std::list<std::unique_ptr<Worker::Task>> schema_tasks;
std::list<std::unique_ptr<Worker::Task>> table_tasks;
execute_threaded([this, &pool_status, &worker_tasks, &pending_tasks,
&schema_tasks, &table_tasks]() {
while (!m_worker_interrupt) {
// if there was an exception in the thread pool, interrupt the process
if (pool_status == shcore::Thread_pool::Async_state::TERMINATED) {
m_worker_interrupt = true;
break;
}
std::unique_ptr<Worker::Task> work;
// don't fetch too many tasks at once, as we may starve worker threads
auto tasks_to_fetch = m_options.threads_count();
// move tasks from the queue in the processing thread to the main thread
while (pending_tasks > 0 && tasks_to_fetch > 0) {
// just have a peek, we may have other tasks to execute
auto work_opt = worker_tasks.try_pop(std::chrono::milliseconds{1});
if (!work_opt) break;
work = std::move(*work_opt);
if (!work) break;
--pending_tasks;
--tasks_to_fetch;
(work->table().empty() ? schema_tasks : table_tasks)
.emplace_back(std::move(work));
}
// schedule schema tasks first
if (!schema_tasks.empty()) {
work = std::move(schema_tasks.front());
schema_tasks.pop_front();
} else if (!table_tasks.empty()) {
// select table task for a schema which is ready, with the lowest
// number of concurrent tasks
const auto end = table_tasks.end();
auto best = end;
auto best_count = std::numeric_limits<uint64_t>::max();
for (auto it = table_tasks.begin(); it != end; ++it) {
const auto &s = (*it)->schema();
if (m_schema_ddl_ready[s]) {
auto count = m_ddl_in_progress_per_schema[s];
if (count < best_count) {
best_count = count;
best = it;
}
// the best possible outcome, exit immediately
if (0 == best_count) {
break;
}
}
}
if (end != best) {
// mark the schema as being loaded
++m_ddl_in_progress_per_schema[(*best)->schema()];
// schedule task for execution
work = std::move(*best);
// remove the task
table_tasks.erase(best);
}
}
// if there's work, schedule it
if (work) {
push_pending_task(std::move(work));
return true;
}
// no more work to do, finish
if (schema_tasks.empty() && table_tasks.empty() && 0 == pending_tasks) {
break;
}
}
return false;
});
// notify the pool in case of an interrupt
if (m_worker_interrupt) {
pool->terminate();
}
pool->wait_for_process();
}
// no need to keep these any more
m_schema_ddl_ready.clear();
m_ddl_in_progress_per_schema.clear();
log_debug("End loading table DDL");
}
void Dump_loader::execute_view_ddl_tasks() {
m_ddl_executed = 0;
std::atomic<uint64_t> ddl_to_execute = 0;
std::atomic<bool> all_tasks_scheduled = false;
dump::Progress_thread::Progress_config config;
config.current = [this]() -> uint64_t { return m_ddl_executed; };
config.total = [&ddl_to_execute]() { return ddl_to_execute.load(); };
config.is_total_known = [&all_tasks_scheduled]() {
return all_tasks_scheduled.load();
};
const auto stage =
m_progress_thread.start_stage("Executing view DDL", std::move(config));
shcore::on_leave_scope finish_stage([stage]() { stage->finish(); });
Load_progress_log::Status schema_load_status;
std::string schema;
std::list<Dump_reader::Name_and_file> views;
std::unordered_map<std::string, uint64_t> views_per_schema;
const auto thread_pool_ptr = m_dump->create_thread_pool();
const auto pool = thread_pool_ptr.get();
log_debug("Begin loading view DDL");
// the DDL is executed in the main thread, in order to avoid concurrency
// issues (i.e. one thread removes the placeholder table for a view, while
// another one tries to create a view which references that deleted view)
pool->start_threads();
while (!m_worker_interrupt) {
if (!m_dump->next_schema_and_views(&schema, &views)) {
break;
}
schema_load_status = m_load_log->schema_ddl_status(schema);
if (schema_load_status != Load_progress_log::DONE) {
if (views.empty()) {
// there are no views, mark schema as ready
m_load_log->end_schema_ddl(schema);
} else {
ddl_to_execute += views.size();
views_per_schema[schema] = views.size();
// GCC 12 may warn about a possibly uninitialized usage of IFile in the
// lambda capture
#if __GNUC__ >= 12 && !defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
#endif
for (auto &item : views) {
pool->add_task(
[file = std::move(item.second), schema, view = item.first]() {
log_debug("Fetching view DDL for %s.%s", schema.c_str(),
view.c_str());
file->open(mysqlshdk::storage::Mode::READ);
auto script = mysqlshdk::storage::read_file(file.get());
file->close();
return script;
},
[this, schema, view = item.first,
resuming = schema_load_status == Load_progress_log::INTERRUPTED,
pool, &views_per_schema](std::string &&script) {
log_info("%s DDL script for view `%s`.`%s`",
(resuming ? "Re-executing" : "Executing"),
schema.c_str(), view.c_str());
if (!m_options.dry_run()) {
executef("use !", schema.c_str());
// execute sql
execute_script(
m_session, script,
shcore::str_format(
"Error executing DDL script for view `%s`.`%s`",
schema.c_str(), view.c_str()),
m_default_sql_transforms);
}
++m_ddl_executed;
if (0 == --views_per_schema[schema]) {
m_load_log->end_schema_ddl(schema);
}
if (m_worker_interrupt) {
pool->terminate();
}
});
}
#if __GNUC__ >= 12 && !defined(__clang__)
#pragma GCC diagnostic pop
#endif
}
}
}
all_tasks_scheduled = true;
pool->tasks_done();
pool->process();
log_debug("End loading view DDL");
}
void Dump_loader::execute_tasks() {
auto console = current_console();
if (m_options.dry_run())
console->print_info("dryRun enabled, no changes will be made.");
check_server_version();
m_session = create_session();
log_server_version();
setup_progress_file(&m_resuming);
// the 1st potentially slow operation, as many errors should be detected
// before this as possible
m_dump->rescan(&m_progress_thread);
handle_schema_option();
if (!m_resuming && m_options.load_ddl()) check_existing_objects();
check_tables_without_primary_key();
size_t num_idle_workers = 0;
do {
if (m_dump->status() != Dump_reader::Status::COMPLETE) {
wait_for_more_data();
}
if (!m_init_done) {
// process dump metadata first
on_dump_begin();
// NOTE: this assumes that all DDL files are already present
if (m_options.load_ddl() || m_options.load_deferred_indexes()) {
// exec DDL for all tables in parallel (fetching DDL for
// thousands of tables from remote storage can be slow)
if (!m_worker_interrupt) execute_table_ddl_tasks();
// users have to be loaded after all objects and view placeholders are
// created, because GRANTs on specific objects require the objects to
// exist
// users have to be loaded before view placeholders are replaced with
// views, because creating a view which uses another view with the
// DEFINER clause requires that user to exist
if (!m_worker_interrupt) load_users();
// exec DDL for all views after all tables and users are created
if (!m_worker_interrupt && m_options.load_ddl())
execute_view_ddl_tasks();
}
m_init_done = true;
if (!m_worker_interrupt) {
setup_load_data_progress();
}
}
m_total_tables_with_data = m_dump->tables_with_data();
// handle events from workers and schedule more chunks when a worker
// becomes available
num_idle_workers =
handle_worker_events([this]() { return schedule_next_task(); });
// If the whole dump is already available and there's no more data to be
// loaded and all workers are idle (done loading), then we're done
if (m_dump->status() == Dump_reader::Status::COMPLETE &&
!m_dump->data_available() && !m_dump->work_available() &&
num_idle_workers == m_workers.size()) {
break;
}
} while (!m_worker_interrupt);
if (!m_worker_interrupt) {
on_dump_end();
m_load_log->cleanup();
}
log_debug("Import done");
}
bool Dump_loader::wait_for_more_data() {
const auto start_time = std::chrono::steady_clock::now();
bool waited = false;
auto console = current_console();
bool was_ready = m_dump->ready();
// if there are still idle workers, check if there's more that was dumped
while (m_dump->status() != Dump_reader::Status::COMPLETE &&
!m_worker_interrupt) {
m_dump->rescan();
if (m_dump->status() == Dump_reader::Status::DUMPING) {
if (m_dump->data_available() || (!was_ready && m_dump->ready())) {
log_debug("Dump data available");
return true;
}
if (m_options.dump_wait_timeout_ms() > 0) {
const auto current_time = std::chrono::steady_clock::now();
const auto time_diff = std::chrono::duration_cast<std::chrono::seconds>(
current_time - start_time)
.count();
if (static_cast<uint64_t>(time_diff * 1000) >=
m_options.dump_wait_timeout_ms()) {
console->print_warning(
"Timeout while waiting for dump to finish. Imported data "
"may be incomplete.");
THROW_ERROR0(SHERR_LOAD_DUMP_WAIT_TIMEOUT);
}
} else {
// Dump isn't complete yet, but we're not waiting for it
break;
}
if (!waited) {
console->print_status("Waiting for more data to become available...");
}
waited = true;
if (m_options.dump_wait_timeout_ms() < 1000) {
shcore::sleep_ms(m_options.dump_wait_timeout_ms());
} else {
// wait for at most 5s at a time and try again
for (uint64_t j = 0;
j < std::min<uint64_t>(5000, m_options.dump_wait_timeout_ms()) &&
!m_worker_interrupt;
j += 1000) {
shcore::sleep_ms(1000);
}
}
}
}
return false;
}
void Dump_loader::spawn_workers() {
m_thread_exceptions.resize(m_options.threads_count());
for (uint64_t i = 0; i < m_options.threads_count(); i++) {
m_workers.emplace_back(i, this);
Worker &worker = m_workers.back();
m_worker_threads.push_back(mysqlsh::spawn_scoped_thread([&worker]() {
mysqlsh::Mysql_thread mysql_thread;
worker.run();
}));
}
}
void Dump_loader::join_workers() {
log_debug("Waiting on worker threads...");
for (auto &w : m_workers) w.stop();
for (auto &t : m_worker_threads)
if (t.joinable()) t.join();
log_debug("All worker threads stopped");
}
void Dump_loader::clear_worker(Worker *worker) {
const auto wid = worker->id();
m_worker_threads[wid].join();
m_workers.remove_if([wid](const Worker &w) { return w.id() == wid; });
}
void Dump_loader::post_worker_event(Worker *worker, Worker_event::Event event,
shcore::Dictionary_t &&details) {
m_worker_events.push(Worker_event{event, worker, std::move(details)});
}
void Dump_loader::on_schema_ddl_start(const std::string &schema) {
m_load_log->start_schema_ddl(schema);
}
void Dump_loader::on_schema_ddl_end(const std::string &) {}
void Dump_loader::on_table_ddl_start(const std::string &schema,
const std::string &table,
bool placeholder) {
if (!placeholder) m_load_log->start_table_ddl(schema, table);
}
void Dump_loader::on_table_ddl_end(
const std::string &schema, const std::string &table, bool placeholder,
std::unique_ptr<compatibility::Deferred_statements> deferred_indexes) {
if (!placeholder) {
m_load_log->end_table_ddl(schema, table);
if (deferred_indexes && !deferred_indexes->empty()) {
m_indexes_to_recreate += m_dump->add_deferred_statements(
schema, table, std::move(*deferred_indexes));
}
}
on_ddl_done_for_schema(schema);
}
void Dump_loader::on_chunk_load_start(const std::string &schema,
const std::string &table,
const std::string &partition,
ssize_t index) {
m_load_log->start_table_chunk(schema, table, partition, index);
}
void Dump_loader::on_chunk_load_end(const std::string &schema,
const std::string &table,
const std::string &partition, ssize_t index,
size_t bytes_loaded,
size_t raw_bytes_loaded,
size_t rows_loaded) {
m_load_log->end_table_chunk(schema, table, partition, index, bytes_loaded,
raw_bytes_loaded, rows_loaded);
m_dump->on_chunk_loaded(schema, table, partition);
m_unique_tables_loaded.insert(
schema_table_object_key(schema, table, partition));
log_debug("Ended loading chunk %s (%s, %s)",
format_table(schema, table, partition, index).c_str(),
std::to_string(bytes_loaded).c_str(),
std::to_string(raw_bytes_loaded).c_str());
}
void Dump_loader::on_subchunk_load_start(const std::string &schema,
const std::string &table,
const std::string &partition,
ssize_t index, uint64_t subchunk) {
m_load_log->start_table_subchunk(schema, table, partition, index, subchunk);
}
void Dump_loader::on_subchunk_load_end(const std::string &schema,
const std::string &table,
const std::string &partition,
ssize_t index, uint64_t subchunk,
uint64_t bytes) {
m_load_log->end_table_subchunk(schema, table, partition, index, subchunk,
bytes);
}
void Dump_loader::on_index_start(const std::string &schema,
const std::string &table) {
m_load_log->start_table_indexes(schema, table);
}
void Dump_loader::on_index_end(const std::string &schema,
const std::string &table) {
m_dump->on_index_end(schema, table);
m_load_log->end_table_indexes(schema, table);
}
void Dump_loader::on_analyze_start(const std::string &schema,
const std::string &table) {
m_load_log->start_analyze_table(schema, table);
}
void Dump_loader::on_analyze_end(const std::string &schema,
const std::string &table) {
m_dump->on_analyze_end(schema, table);
m_load_log->end_analyze_table(schema, table);
}
void Dump_loader::Sql_transform::add_strip_removed_sql_modes() {
// Remove NO_AUTO_CREATE_USER from sql_mode, which doesn't exist in 8.0 but
// does in 5.7
add([](std::string_view sql, std::string *out_new_sql) {
static std::regex re(
R"*((\/\*![0-9]+\s+)?(SET\s+sql_mode\s*=\s*')(.*)('.*))*",
std::regex::icase | std::regex::optimize);
std::cmatch m;
if (std::regex_match(sql.data(), sql.data() + sql.size(), m, re)) {
auto modes = shcore::str_split(m[3].str(), ",");
std::string new_modes;
for (const auto &mode : modes) {
if (mode != "NO_AUTO_CREATE_USER")
new_modes.append(mode).append(1, ',');
}
if (!new_modes.empty()) new_modes.pop_back(); // strip last ,
*out_new_sql = m[1].str() + m[2].str() + new_modes + m[4].str();
} else {
*out_new_sql = sql;
}
});
}
void Dump_loader::Sql_transform::add_execute_conditionally(
std::function<bool(std::string_view, const std::string &)> f) {
add([f = std::move(f)](std::string_view sql, std::string *out_new_sql) {
*out_new_sql = sql;
std::string sql_str{sql};
mysqlshdk::utils::SQL_iterator it(sql_str, 0, false);
while (it.valid()) {
auto token = it.next_token();
if (!shcore::str_caseeq(token, "CREATE", "ALTER", "DROP")) continue;
auto type = it.next_token();
if (shcore::str_caseeq(type, "DEFINER")) {
// =
it.next_token();
// user
it.next_token();
// type or @
type = it.next_token();
if (shcore::str_caseeq(type, "@")) {
// continuation of an account, host
it.next_token();
// type
type = it.next_token();
}
}
if (shcore::str_caseeq(type, "EVENT", "FUNCTION", "PROCEDURE",
"TRIGGER")) {
auto name = it.next_token();
if (shcore::str_caseeq(name, "IF")) {
// NOT or EXISTS
token = it.next_token();
if (shcore::str_caseeq(token, "NOT")) {
// EXISTS
it.next_token();
}
// name follows
name = it.next_token();
}
// name can be either object_name, `object_name` or
// schema.`object_name`, split_schema_and_table will handle all these
// cases and unquote the object name
std::string object_name;
shcore::split_schema_and_table(std::string{name}, nullptr, &object_name,
true);
if (!f(type, object_name)) {
out_new_sql->clear();
}
}
return;
}
});
}
void Dump_loader::Sql_transform::add_rename_schema(std::string_view new_name) {
add([new_name = std::string{new_name}](std::string_view sql,
std::string *out_new_sql) {
std::string sql_str{sql};
mysqlshdk::utils::SQL_iterator it(sql_str, 0, false);
const auto token = it.next_token();
if (shcore::str_caseeq(token, "CREATE")) {
if (shcore::str_caseeq(it.next_token(), "DATABASE", "SCHEMA")) {
auto schema = it.next_token();
if (shcore::str_caseeq(schema, "IF")) {
// NOT
it.next_token();
// EXISTS
it.next_token();
// schema follows
schema = it.next_token();
}
*out_new_sql = sql_str.substr(0, it.position() - schema.length()) +
shcore::quote_identifier(new_name) +
sql_str.substr(it.position());
return;
}
} else if (shcore::str_caseeq(token, "USE")) {
*out_new_sql = "USE " + shcore::quote_identifier(new_name);
return;
}
*out_new_sql = std::move(sql_str);
});
}
void Dump_loader::handle_schema_option() {
if (!m_options.target_schema().empty()) {
m_dump->replace_target_schema(m_options.target_schema());
m_default_sql_transforms.add_rename_schema(m_options.target_schema());
}
}
bool Dump_loader::should_create_pks() const {
return m_dump->should_create_pks();
}
void Dump_loader::setup_load_data_progress() {
// Progress mechanics:
// - if the dump is complete when it's opened, we show progress and
// throughput relative to total uncompressed size
// pct% (current GB / total GB), thrp MB/s
// - if the dump is incomplete when it's opened, we show # of uncompressed
// bytes loaded so far and the throughput
// current GB compressed ready; current GB loaded, thrp MB/s
// - when the dump completes during load, we switch to displaying progress
// relative to the total size
if (m_load_data_stage) {
return;
}
dump::Progress_thread::Throughput_config config;
config.space_before_item = false;
config.initial = [this]() { return m_num_bytes_previously_loaded; };
config.current = [this]() -> uint64_t {
if (is_data_load_complete()) {
// this callback can be called before m_load_data_stage is assigned to
if (m_load_data_stage) {
// finish the stage when all data is loaded
m_load_data_stage->finish(false);
}
}
return m_num_bytes_loaded;
};
config.total = [this]() { return m_dump->filtered_data_size(); };
config.left_label = [this]() {
std::string label;
if (m_dump->status() != Dump_reader::Status::COMPLETE) {
label += "Dump still in progress, " +
mysqlshdk::utils::format_bytes(m_dump->dump_size()) +
" ready (compr.) - ";
}
const auto threads_loading = m_num_threads_loading.load();
if (threads_loading) {
label += std::to_string(threads_loading) + " thds loading - ";
}
const auto threads_indexing = m_num_threads_recreating_indexes.load();
if (threads_indexing) {
label += std::to_string(threads_indexing) + " thds indexing - ";
}
static const char k_progress_spin[] = "-\\|/";
static size_t progress_idx = 0;
if (label.length() > 2) {
label[label.length() - 2] = k_progress_spin[progress_idx++];
if (progress_idx >= shcore::array_size(k_progress_spin) - 1) {
progress_idx = 0;
}
}
return label;
};
config.right_label = [this]() {
return shcore::str_format(
", %zu / %zu tables%s done", m_unique_tables_loaded.size(),
m_total_tables_with_data,
m_dump->has_partitions() ? " and partitions" : "");
};
config.on_display_started = []() {
current_console()->print_status("Starting data load");
};
m_load_data_stage =
m_progress_thread.start_stage("Loading data", std::move(config));
}
void Dump_loader::setup_create_indexes_progress() {
if (m_create_indexes_stage) {
return;
}
m_indexes_recreated = 0;
m_index_count_is_known = false;
dump::Progress_thread::Progress_config config;
config.current = [this]() -> uint64_t { return m_indexes_recreated; };
config.total = [this]() { return m_indexes_to_recreate; };
config.is_total_known = [this]() { return m_index_count_is_known; };
m_create_indexes_stage =
m_progress_thread.start_stage("Recreating indexes", std::move(config));
}
void Dump_loader::setup_analyze_tables_progress() {
if (m_analyze_tables_stage) {
return;
}
m_tables_analyzed = 0;
m_tables_to_analyze = 0;
m_all_analyze_tasks_scheduled = false;
dump::Progress_thread::Progress_config config;
config.current = [this]() -> uint64_t { return m_tables_analyzed; };
config.total = [this]() { return m_tables_to_analyze; };
config.is_total_known = [this]() { return m_all_analyze_tasks_scheduled; };
m_analyze_tables_stage =
m_progress_thread.start_stage("Analyzing tables", std::move(config));
}
void Dump_loader::add_skipped_schema(const std::string &schema) {
std::lock_guard<std::recursive_mutex> lock(m_skip_schemas_mutex);
m_skip_schemas.insert(schema);
}
void Dump_loader::on_ddl_done_for_schema(const std::string &schema) {
// notify that DDL for a schema has finished loading
auto it = m_ddl_in_progress_per_schema.find(schema);
assert(m_ddl_in_progress_per_schema.end() != it);
if (0 == --(it->second)) {
m_ddl_in_progress_per_schema.erase(it);
}
}
bool Dump_loader::is_data_load_complete() const {
return Dump_reader::Status::COMPLETE == m_dump->status() &&
m_num_bytes_loaded >= m_dump->filtered_data_size();
}
void Dump_loader::log_server_version() const {
log_info("Destination server: %s",
query("SELECT CONCAT(@@version, ' ', @@version_comment)")
->fetch_one()
->get_string(0)
.c_str());
}
void Dump_loader::push_pending_task(Task_ptr task) {
size_t weight = std::min(task->weight(), m_options.threads_count());
task->set_weight(weight);
if (weight > 1) {
log_debug("Pushing new task for %s with weight %zu", task->key().c_str(),
weight);
}
m_pending_tasks.emplace(std::move(task));
}
Dump_loader::Task_ptr Dump_loader::load_chunk_file(
const std::string &schema, const std::string &table,
const std::string &partition,
std::unique_ptr<mysqlshdk::storage::IFile> file, ssize_t chunk_index,
size_t chunk_size, const shcore::Dictionary_t &options, bool resuming,
uint64_t bytes_to_skip) const {
log_debug("Loading data for %s",
format_table(schema, table, partition, chunk_index).c_str());
assert(!schema.empty());
assert(!table.empty());
assert(file);
// The reason why sending work to worker threads isn't done through a
// regular queue is because a regular queue would create a static schedule
// for the chunk loading order. But we need to be able to dynamically
// schedule chunks based on the current conditions at the time each new
// chunk needs to be scheduled.
auto task = std::make_unique<Worker::Load_chunk_task>(
schema, table, partition, chunk_index, std::move(file), options, resuming,
bytes_to_skip);
task->raw_bytes_loaded = chunk_size;
return task;
}
Dump_loader::Task_ptr Dump_loader::recreate_indexes(
const std::string &schema, const std::string &table,
compatibility::Deferred_statements::Index_info *indexes) const {
log_debug("Recreating indexes for `%s`.`%s`", schema.c_str(), table.c_str());
assert(!schema.empty());
assert(!table.empty());
uint64_t weight = m_options.threads_per_add_index();
if (weight > 1) {
if (auto table_size = m_dump->table_data_size(schema, table)) {
// size per each thread
table_size /= weight;
// in case of small tables, we assume that they're not that impactful
if (table_size <= 10 * 1024 * 1024) { // 10MiB
weight = 1;
}
} // else, we don't have the size info, just use the default weight
}
DBUG_EXECUTE_IF("dump_loader_force_index_weight", { weight = 4; });
return std::make_unique<Worker::Index_recreation_task>(schema, table, indexes,
weight);
}
Dump_loader::Task_ptr Dump_loader::analyze_table(
const std::string &schema, const std::string &table,
const std::vector<Dump_reader::Histogram> &histograms) const {
log_debug("Analyzing table `%s`.`%s`", schema.c_str(), table.c_str());
assert(!schema.empty());
assert(!table.empty());
return std::make_unique<Worker::Analyze_table_task>(schema, table,
histograms);
}
void Dump_loader::load_users() {
if (!m_options.load_users()) {
return;
}
const auto script = m_dump->users_script();
const auto console = current_console();
console->print_status("Executing user accounts SQL...");
std::function<bool(const std::string &, const std::string &)>
strip_revoked_privilege;
if (m_options.is_mds()) {
strip_revoked_privilege = filter_user_script_for_mds();
}
const auto statements = dump::Schema_dumper::preprocess_users_script(
script,
[this](const std::string &account) {
return m_options.filters().users().is_included(account);
},
strip_revoked_privilege);
std::unordered_set<std::string> all_accounts;
std::unordered_set<std::string> ignored_accounts;
std::string new_stmt;
if (m_options.dry_run()) {
return;
}
for (const auto &group : statements) {
all_accounts.emplace(group.account);
if (ignored_accounts.count(group.account) > 0) {
continue;
}
for (std::string_view stmt : group.statements) {
// repeated to stop executing statements if one of the grants fails
if (ignored_accounts.count(group.account) > 0) {
continue;
}
if (m_default_sql_transforms(stmt, &new_stmt)) {
stmt = new_stmt;
}
if (stmt.empty()) {
continue;
}
try {
execute_statement(m_session, stmt, "While executing user accounts SQL");
} catch (const mysqlshdk::db::Error &e) {
if (dump::Schema_dumper::User_statements::Type::GRANT != group.type) {
throw;
}
switch (m_options.on_grant_errors()) {
case Load_dump_options::Handle_grant_errors::ABORT:
throw;
case Load_dump_options::Handle_grant_errors::DROP_ACCOUNT: {
console->print_note(
"Due to the above error the account " + group.account +
" was dropped, the load operation will continue.");
ignored_accounts.emplace(group.account);
const auto drop = "DROP USER IF EXISTS " + group.account;
execute_statement(m_session, drop,
"While dropping the account " + group.account);
++m_dropped_accounts;
break;
}
case Load_dump_options::Handle_grant_errors::IGNORE:
console->print_note(
"The above error was ignored, the load operation will "
"continue.");
++m_ignored_grant_errors;
break;
}
}
}
}
m_loaded_accounts = all_accounts.size() - ignored_accounts.size();
}
} // namespace mysqlsh