modules/util/load/dump_reader.cc (1,126 lines of code) (raw):
/*
* Copyright (c) 2020, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms,
* as designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "modules/util/load/dump_reader.h"
#include <algorithm>
#include <atomic>
#include <numeric>
#include <utility>
#include "modules/util/common/dump/utils.h"
#include "modules/util/dump/schema_dumper.h"
#include "modules/util/load/load_errors.h"
#include "mysqlshdk/libs/utils/utils_lexing.h"
#include "mysqlshdk/libs/utils/utils_path.h"
#include "mysqlshdk/libs/utils/utils_sqlstring.h"
#include "mysqlshdk/libs/utils/utils_string.h"
namespace mysqlsh {
namespace {
using mysqlshdk::utils::Version;
std::string fetch_file(mysqlshdk::storage::IDirectory *dir,
const std::string &fn) {
auto file = dir->file(fn);
file->open(mysqlshdk::storage::Mode::READ);
std::string data = mysqlshdk::storage::read_file(file.get());
file->close();
return data;
}
shcore::Dictionary_t parse_metadata(const std::string &data,
const std::string &fn) {
try {
auto metadata = shcore::Value::parse(data);
if (metadata.type != shcore::Map) {
THROW_ERROR(SHERR_LOAD_INVALID_METADATA_FILE, fn.c_str());
}
return metadata.as_map();
} catch (const shcore::Exception &e) {
THROW_ERROR(SHERR_LOAD_PARSING_METADATA_FILE_FAILED, fn.c_str(),
e.format().c_str());
}
}
shcore::Dictionary_t fetch_metadata(mysqlshdk::storage::IDirectory *dir,
const std::string &fn) {
return parse_metadata(fetch_file(dir, fn), fn);
}
auto to_vector_of_strings(const shcore::Array_t &arr) {
std::vector<std::string> res;
res.reserve(arr->size());
for (const auto &s : *arr) {
res.emplace_back(s.as_string());
}
return res;
}
} // namespace
Dump_reader::Dump_reader(
std::unique_ptr<mysqlshdk::storage::IDirectory> dump_dir,
const Load_dump_options &options)
: m_dir(std::move(dump_dir)), m_options(options) {}
Dump_reader::Status Dump_reader::open() {
shcore::Dictionary_t md(fetch_metadata(m_dir.get(), "@.json"));
shcore::Dictionary_t basenames(md->get_map("basenames"));
for (const auto &schema : *md->get_array("schemas")) {
if (include_schema(schema.as_string())) {
auto info = std::make_shared<Schema_info>();
info->schema = schema.get_string();
if (basenames->has_key(info->schema)) {
info->basename = basenames->get_string(info->schema);
} else {
info->basename = info->schema;
}
m_contents.schemas.emplace(info->schema, info);
} else {
log_debug("Skipping schema '%s'", schema.as_string().c_str());
}
}
if (md->has_key("version"))
m_contents.dump_version = Version(md->get_string("version"));
if (md->has_key("serverVersion"))
m_contents.server_version = Version(md->get_string("serverVersion"));
if (md->has_key("origin")) m_contents.origin = md->get_string("origin");
if (md->has_key("defaultCharacterSet"))
m_contents.default_charset = md->get_string("defaultCharacterSet");
if (md->has_key("binlogFile"))
m_contents.binlog_file = md->get_string("binlogFile");
if (md->has_key("binlogPosition"))
m_contents.binlog_position = md->get_uint("binlogPosition");
if (md->has_key("gtidExecuted"))
m_contents.gtid_executed = md->get_string("gtidExecuted");
if (md->has_key("gtidExecutedInconsistent")) {
m_contents.gtid_executed_inconsistent =
md->get_bool("gtidExecutedInconsistent");
}
if (md->has_key("tzUtc")) m_contents.tz_utc = md->get_bool("tzUtc");
if (md->has_key("mdsCompatibility"))
m_contents.mds_compatibility = md->get_bool("mdsCompatibility");
if (md->has_key("partialRevokes"))
m_contents.partial_revokes = md->get_bool("partialRevokes");
if (md->has_key("compatibilityOptions")) {
const auto options = md->at("compatibilityOptions")
.to_string_container<std::vector<std::string>>();
m_contents.create_invisible_pks =
std::find(options.begin(), options.end(), "create_invisible_pks") !=
options.end();
}
if (md->has_key("tableOnly"))
m_contents.table_only = md->get_bool("tableOnly");
if (md->has_key("bytesPerChunk"))
m_contents.bytes_per_chunk = md->get_uint("bytesPerChunk");
m_contents.has_users = md->has_key("users");
if (md->has_key("capabilities")) {
const auto capabilities = md->at("capabilities").as_array();
for (const auto &entry : *capabilities) {
const auto capability = entry.as_map();
m_contents.capabilities.emplace_back(Capability_info{
capability->at("id").as_string(),
capability->at("description").as_string(),
Version(capability->at("versionRequired").as_string())});
}
}
try {
m_contents.parse_done_metadata(m_dir.get());
m_dump_status = Status::COMPLETE;
} catch (const std::exception &e) {
log_info("@.done.json: %s", e.what());
m_dump_status = Status::DUMPING;
}
return m_dump_status;
}
std::string Dump_reader::begin_script() const {
return m_contents.sql ? *m_contents.sql : "";
}
std::string Dump_reader::end_script() const {
return m_contents.post_sql ? *m_contents.post_sql : "";
}
std::string Dump_reader::users_script() const {
return m_contents.users_sql ? *m_contents.users_sql : "";
}
bool Dump_reader::next_schema_and_tables(
std::string *out_schema, std::list<Name_and_file> *out_tables,
std::list<Name_and_file> *out_view_placeholders) {
out_tables->clear();
out_view_placeholders->clear();
// find the first schema that is ready
for (auto &it : m_contents.schemas) {
auto &s = it.second;
if (!s->table_sql_done && s->ready()) {
*out_schema = s->schema;
for (const auto &t : s->tables) {
out_tables->emplace_back(
t.first,
t.second->has_sql ? m_dir->file(t.second->script_name()) : nullptr);
}
if (s->has_view_sql) {
for (const auto &v : s->views) {
out_view_placeholders->emplace_back(v.table,
m_dir->file(v.pre_script_name()));
}
}
s->table_sql_done = true;
return true;
}
}
return false;
}
bool Dump_reader::next_schema_and_views(std::string *out_schema,
std::list<Name_and_file> *out_views) {
out_views->clear();
// find the first schema that is ready
for (auto &it : m_contents.schemas) {
auto &s = it.second;
// always return true for every schema, even if there are no views
if (!s->view_sql_done && s->ready()) {
*out_schema = s->schema;
if (s->has_view_sql) {
for (const auto &v : s->views) {
out_views->emplace_back(v.table, m_dir->file(v.script_name()));
}
}
s->view_sql_done = true;
return true;
}
}
return false;
}
std::vector<shcore::Account> Dump_reader::accounts() const {
std::vector<shcore::Account> account_list;
std::string script = users_script();
// parse the script to extract user list
dump::Schema_dumper::preprocess_users_script(
script, [&account_list](const std::string &account) {
account_list.emplace_back(shcore::split_account(account));
return true;
});
return account_list;
}
std::vector<std::string> Dump_reader::schemas() const {
std::vector<std::string> slist;
for (const auto &s : m_contents.schemas) {
slist.push_back(s.second->schema);
}
return slist;
}
bool Dump_reader::schema_objects(const std::string &schema,
std::vector<std::string> *out_tables,
std::vector<std::string> *out_views,
std::vector<std::string> *out_triggers,
std::vector<std::string> *out_functions,
std::vector<std::string> *out_procedures,
std::vector<std::string> *out_events) {
auto schema_it = m_contents.schemas.find(schema);
if (schema_it == m_contents.schemas.end()) return false;
auto schema_info = schema_it->second;
out_tables->clear();
for (const auto &t : schema_info->tables) {
out_tables->push_back(t.first);
}
out_views->clear();
for (const auto &v : schema_info->views) {
out_views->push_back(v.table);
}
*out_triggers = schema_info->trigger_names;
*out_functions = schema_info->function_names;
*out_procedures = schema_info->procedure_names;
*out_events = schema_info->event_names;
return true;
}
void Dump_reader::schema_table_triggers(
const std::string &schema, std::list<Name_and_file> *out_table_triggers) {
out_table_triggers->clear();
const auto &s = m_contents.schemas.at(schema);
for (const auto &t : s->tables) {
if (t.second->has_triggers)
out_table_triggers->emplace_back(
t.first, m_dir->file(t.second->triggers_script_name()));
}
}
const std::vector<std::string> &Dump_reader::deferred_schema_fks(
const std::string &schema) const {
const auto &s = m_contents.schemas.at(schema);
return s->foreign_key_queries;
}
const std::vector<std::string> &Dump_reader::queries_on_schema_end(
const std::string &schema) const {
const auto &s = m_contents.schemas.at(schema);
return s->queries_on_schema_end;
}
const std::map<std::string, std::vector<std::string>>
Dump_reader::tables_without_pk() const {
std::map<std::string, std::vector<std::string>> res;
for (const auto &s : m_contents.schemas) {
std::vector<std::string> tables;
for (const auto &t : s.second->tables)
if (t.second->primary_index.empty())
tables.emplace_back(shcore::quote_identifier(t.first));
if (!tables.empty()) {
std::sort(tables.begin(), tables.end());
res.emplace(s.first, tables);
}
}
return res;
}
bool Dump_reader::has_tables_without_pk() const {
for (const auto &s : m_contents.schemas) {
for (const auto &t : s.second->tables) {
if (t.second->primary_index.empty()) {
return true;
}
}
}
return false;
}
bool Dump_reader::has_primary_key(const std::string &schema,
const std::string &table) const {
return !m_contents.schemas.at(schema)
->tables.at(table)
->primary_index.empty();
}
std::string Dump_reader::fetch_schema_script(const std::string &schema) const {
std::string script;
const auto &s = m_contents.schemas.at(schema);
if (s->has_sql) {
// Get the base script for the schema
script = fetch_file(m_dir.get(), s->script_name());
}
return script;
}
// Proportional chunk scheduling
//
// Multiple sessions writing to the same table mean they will be competing for
// locks to be able to update indexes.
// So to optimize performance, we try to have as many different tables loaded
// at the same time as possible.
// If there are more tables than threads, that means there will be a single
// session per table.
// But if there are more threads than tables, we need to distribute them
// among tables. If we distribute the threads equally, then smaller tables
// will finish sooner and bigger tables will take extra longer because
// it will have more threads working. So our optimization goal must be to keep
// as many different tables loading concurrently for as long as possible, while
// ensuring that tables that are currently being loaded have preference over
// others, which could otherwise trigger tables to get flushed out from the
// buffer pool.
// Thus, smaller tables must get fewer threads allocated so they take longer
// to load, while bigger threads get more, with the hope that the total time
// to load all tables is minimized.
Dump_reader::Candidate Dump_reader::schedule_chunk_proportionally(
const std::unordered_multimap<std::string, size_t> &tables_being_loaded,
std::unordered_set<Dump_reader::Table_data_info *> *tables_with_data,
uint64_t max_concurrent_tables) {
if (tables_with_data->empty()) return tables_with_data->end();
std::vector<Candidate> tables_in_progress;
// first check if there's any table that's not being loaded
{
const auto end = tables_with_data->end();
auto best = end;
for (auto it = tables_with_data->begin(); it != end; ++it) {
if ((*it)->chunks_consumed) {
tables_in_progress.emplace_back(it);
}
if (tables_being_loaded.find((*it)->key()) == tables_being_loaded.end()) {
// table is better if it's bigger and in the same state as the current
// best, or if it was previously scheduled and current best was not
if (best == end ||
((*it)->bytes_available() > (*best)->bytes_available() &&
!(*it)->chunks_consumed == !(*best)->chunks_consumed) ||
((*it)->chunks_consumed && !(*best)->chunks_consumed))
best = it;
}
}
// schedule a new table only if we're not exceeding the maximum number of
// concurrent tables that can be loaded at the same time
if (best != end && (tables_in_progress.size() < max_concurrent_tables ||
(*best)->chunks_consumed)) {
return best;
}
}
// if all available tables are already loaded, then schedule proportionally
std::unordered_map<std::string, double> worker_weights;
// calc ratio of data being loaded per table / total data being loaded
double total_bytes_loading = std::accumulate(
tables_being_loaded.begin(), tables_being_loaded.end(),
static_cast<size_t>(0),
[&worker_weights](size_t size,
const std::pair<std::string, size_t> &table_size) {
worker_weights[table_size.first] += table_size.second;
return size + table_size.second;
});
if (total_bytes_loading > 0) {
for (auto &it : worker_weights) {
it.second = it.second / total_bytes_loading;
}
}
std::vector<std::pair<Candidate, double>> candidate_weights;
// calc ratio of data available per table / total data available
double total_bytes_available = std::accumulate(
tables_in_progress.begin(), tables_in_progress.end(),
static_cast<size_t>(0),
[](size_t size, auto it) { return size + (*it)->bytes_available(); });
if (total_bytes_available > 0) {
for (auto it = tables_in_progress.begin(); it != tables_in_progress.end();
++it) {
candidate_weights.emplace_back(
*it, static_cast<double>((**it)->bytes_available()) /
total_bytes_available);
}
} else {
assert(0);
return tables_in_progress.front();
}
// pick a chunk from the table that has the biggest difference between both
double best_diff = 0;
Candidate best = tables_in_progress.front();
for (const auto &cand : candidate_weights) {
const auto it = worker_weights.find((*cand.first)->key());
const auto weight = it == worker_weights.end() ? 0.0 : it->second;
const auto d = cand.second - weight;
if (d > best_diff) {
best_diff = d;
best = cand.first;
}
}
return best;
}
bool Dump_reader::next_table_chunk(
const std::unordered_multimap<std::string, size_t> &tables_being_loaded,
std::string *out_schema, std::string *out_table, std::string *out_partition,
bool *out_chunked, size_t *out_chunk_index, size_t *out_chunks_total,
std::unique_ptr<mysqlshdk::storage::IFile> *out_file,
size_t *out_chunk_size, shcore::Dictionary_t *out_options) {
auto iter = schedule_chunk_proportionally(
tables_being_loaded, &m_tables_with_data, m_options.threads_count());
if (iter != m_tables_with_data.end()) {
*out_schema = (*iter)->owner->schema;
*out_table = (*iter)->owner->table;
*out_partition = (*iter)->partition;
*out_chunked = (*iter)->chunked;
*out_chunk_index = (*iter)->chunks_consumed;
if ((*iter)->last_chunk_seen) {
*out_chunks_total = (*iter)->available_chunks.size();
} else {
*out_chunks_total = 0;
}
const auto &info = (*iter)->available_chunks[*out_chunk_index];
if (!info.has_value()) {
throw std::logic_error(
"Trying to use chunk " + std::to_string(*out_chunk_index) + " of " +
schema_table_object_key(*out_schema, *out_table, *out_partition) +
" which is not yet available");
}
*out_file = m_dir->file(info->name());
*out_chunk_size = info->size();
*out_options = (*iter)->owner->options;
(*iter)->consume_chunk();
if (!(*iter)->has_data_available()) m_tables_with_data.erase(iter);
return true;
}
return false;
}
bool Dump_reader::next_deferred_index(
std::string *out_schema, std::string *out_table,
compatibility::Deferred_statements::Index_info **out_indexes) {
for (auto &schema : m_contents.schemas) {
for (auto &table : schema.second->tables) {
if ((!m_options.load_data() || table.second->all_data_loaded()) &&
!table.second->indexes_scheduled) {
table.second->indexes_scheduled = true;
*out_schema = schema.second->schema;
*out_table = table.second->table;
*out_indexes = &table.second->indexes;
return true;
}
}
}
return false;
}
bool Dump_reader::next_table_analyze(std::string *out_schema,
std::string *out_table,
std::vector<Histogram> *out_histograms) {
for (auto &schema : m_contents.schemas) {
for (auto &table : schema.second->tables) {
if ((!m_options.load_data() || table.second->all_data_loaded()) &&
table.second->indexes_created && !table.second->analyze_scheduled) {
table.second->analyze_scheduled = true;
*out_schema = schema.second->schema;
*out_table = table.second->table;
*out_histograms = table.second->histograms;
return true;
}
}
}
return false;
}
bool Dump_reader::data_available() const { return !m_tables_with_data.empty(); }
bool Dump_reader::work_available() const {
for (auto &schema : m_contents.schemas) {
for (auto &table : schema.second->tables) {
if ((m_options.load_data() && !table.second->all_data_scheduled()) ||
!table.second->indexes_scheduled ||
!table.second->analyze_scheduled) {
return true;
}
}
}
return false;
}
size_t Dump_reader::filtered_data_size() const {
if (m_dump_status == Status::COMPLETE) {
return m_filtered_data_size;
} else {
return total_data_size();
}
}
size_t Dump_reader::table_data_size(const std::string &schema,
const std::string &table) const {
if (const auto s = m_contents.table_data_size.find(schema);
s != m_contents.table_data_size.end()) {
if (const auto t = s->second.find(table); t != s->second.end()) {
return t->second;
}
}
return 0;
}
void Dump_reader::compute_filtered_data_size() {
m_filtered_data_size = 0;
for (const auto &schema : m_contents.schemas) {
const auto s = m_contents.table_data_size.find(schema.first);
if (s != m_contents.table_data_size.end()) {
for (const auto &table : schema.second->tables) {
const auto t = s->second.find(table.second->table);
if (t != s->second.end()) {
m_filtered_data_size += t->second;
}
}
}
}
}
// Scan directory for new files and adds them to the pending file list
void Dump_reader::rescan(dump::Progress_thread *progress_thread) {
Files files;
{
dump::Progress_thread::Stage *stage = nullptr;
shcore::on_leave_scope finish_stage([&stage]() {
if (stage) {
stage->finish();
}
});
if (!m_dir->is_local()) {
current_console()->print_status(
"Fetching dump data from remote location...");
if (progress_thread) {
stage = progress_thread->start_stage("Listing files");
}
}
files = m_dir->list_files();
}
log_debug("Finished listing files, starting rescan");
m_contents.rescan(m_dir.get(), files, this, progress_thread);
log_debug("Rescan done");
if (files.find({"@.done.json"}) != files.end() &&
m_dump_status != Status::COMPLETE) {
m_contents.parse_done_metadata(m_dir.get());
m_dump_status = Status::COMPLETE;
}
compute_filtered_data_size();
}
uint64_t Dump_reader::add_deferred_statements(
const std::string &schema, const std::string &table,
compatibility::Deferred_statements &&stmts) {
const auto s = m_contents.schemas.find(schema);
if (s == m_contents.schemas.end()) {
throw std::logic_error("Unable to find schema " + schema +
" for adding index");
}
const auto t = s->second->tables.find(table);
if (t == s->second->tables.end()) {
throw std::logic_error("Unable to find table " + table + " in schema " +
schema + " for adding index");
}
// if indexes are not going to be recreated, we're marking them as already
// created
t->second->indexes_scheduled = t->second->indexes_created =
!m_options.load_deferred_indexes() || stmts.index_info.empty();
t->second->indexes = std::move(stmts.index_info);
const auto table_name = schema_object_key(schema, table);
for (const auto &fk : stmts.foreign_keys) {
s->second->foreign_key_queries.emplace_back("ALTER TABLE " + table_name +
" ADD " + fk);
}
if (!stmts.secondary_engine.empty()) {
s->second->queries_on_schema_end.emplace_back(
std::move(stmts.secondary_engine));
}
return t->second->indexes.size();
}
void Dump_reader::replace_target_schema(const std::string &schema) {
if (1 != m_contents.schemas.size()) {
current_console()->print_error(
"The 'schema' option can only be used when loading a single schema, "
"but " +
std::to_string(m_contents.schemas.size()) + " will be loaded.");
throw std::invalid_argument("Invalid option: schema.");
}
const auto info = m_contents.schemas.begin()->second;
m_contents.schemas.clear();
m_contents.schemas.emplace(schema, info);
m_schema_override = {schema, info->schema};
info->schema = schema;
for (const auto &table : info->tables) {
table.second->schema = schema;
}
for (auto &view : info->views) {
view.schema = schema;
}
for (const auto &table : m_tables_with_data) {
table->owner->schema = schema;
table->owner->options->set("schema", shcore::Value(schema));
}
}
void Dump_reader::validate_options() {
if (m_options.load_users() && !m_contents.has_users) {
current_console()->print_warning(
"The 'loadUsers' option is set to true, but the dump does not contain "
"the user data.");
}
if (table_only()) {
// old version of dumpTables() - no schema SQL is available
if (m_options.target_schema().empty()) {
// user didn't provide the new schema name, we cannot proceed
current_console()->print_error(
"The dump was created by an older version of the util." +
shcore::get_member_name("dumpTables",
shcore::current_naming_style()) +
"() function and needs to be loaded into an existing schema. "
"Please set the target schema using the 'schema' option.");
throw std::invalid_argument("The target schema was not specified.");
} else {
const auto result = m_options.base_session()->queryf(
"SELECT SCHEMA_NAME FROM information_schema.schemata WHERE "
"SCHEMA_NAME=?",
m_options.target_schema());
if (nullptr == result->fetch_one()) {
throw std::invalid_argument("The specified schema does not exist.");
}
}
}
if (should_create_pks() && !m_options.load_ddl()) {
current_console()->print_warning(
"The 'createInvisiblePKs' option is set to true, but the 'loadDdl' "
"option is false, Primary Keys are not going to be created.");
}
}
std::string Dump_reader::Table_info::script_name() const {
return dump::common::get_table_filename(basename);
}
std::string Dump_reader::Table_info::triggers_script_name() const {
return dump::common::get_table_data_filename(basename, "triggers.sql");
}
bool Dump_reader::Table_info::ready() const {
return md_done && (!has_sql || sql_seen);
}
std::string Dump_reader::Table_info::metadata_name() const {
// schema@table.json
return dump::common::get_table_data_filename(basename, "json");
}
bool Dump_reader::Table_info::should_fetch_metadata_file(
const Files &files) const {
if (!md_done && files.find(metadata_name()) != files.end()) {
return true;
}
return false;
}
void Dump_reader::Table_info::update_metadata(const std::string &data,
Dump_reader *reader) {
auto md = parse_metadata(data, metadata_name());
Table_data_info di;
di.owner = this;
has_sql = md->get_bool("includesDdl", true);
di.has_data = md->get_bool("includesData", true);
options = md->get_map("options");
if (options) {
// "compression" and "primaryIndex" are not used by the chunk importer
// and need to be removed
// these were misplaced in the options dictionary, code is kept for
// backward compatibility
options->erase("compression");
if (options->has_key("primaryIndex")) {
auto index = options->get_string("primaryIndex");
if (!index.empty()) {
primary_index.emplace_back(std::move(index));
}
options->erase("primaryIndex");
}
// chunk importer uses characterSet instead of defaultCharacterSet
if (options->has_key("defaultCharacterSet")) {
options->set("characterSet", options->at("defaultCharacterSet"));
options->erase("defaultCharacterSet");
} else {
// By default, we use the character set from the source DB
options->set("characterSet",
shcore::Value(reader->default_character_set()));
}
}
options->set("showProgress",
shcore::Value(reader->m_options.show_progress()));
// Override characterSet if given in options
if (!reader->m_options.character_set().empty()) {
options->set("characterSet",
shcore::Value(reader->m_options.character_set()));
}
di.extension = md->get_string("extension", "tsv");
di.chunked = md->get_bool("chunking", false);
if (md->has_key("primaryIndex")) {
primary_index = to_vector_of_strings(md->get_array("primaryIndex"));
}
{
const auto trigger_list = md->get_array("triggers");
has_triggers = trigger_list && !trigger_list->empty();
}
auto histogram_list = md->get_array("histograms");
if (histogram_list) {
for (const auto &h : *histogram_list) {
auto histogram = h.as_map();
if (histogram) {
histograms.emplace_back(
Histogram{histogram->get_string("column"),
static_cast<size_t>(histogram->get_int("buckets"))});
}
}
}
{
// maps partition names to basenames
const auto basenames = md->get_map("basenames");
if (basenames && !basenames->empty()) {
for (const auto &p : *basenames) {
auto copy = di;
copy.partition = p.first;
copy.basename = p.second.as_string();
data_info.emplace_back(std::move(copy));
}
} else {
di.basename = basename;
data_info.emplace_back(std::move(di));
}
}
reader->on_table_metadata_parsed(*this);
md_done = true;
}
void Dump_reader::Table_info::rescan(const Files &files) {
// MD not included for tables if data is not dumped
// check for the sql file for the schema
if (!sql_seen) {
if (files.find(script_name()) != files.end()) {
sql_seen = true;
}
}
if (!has_triggers) {
if (files.find(triggers_script_name()) != files.end()) {
has_triggers = true;
}
}
}
bool Dump_reader::Table_info::all_data_scheduled() const {
for (const auto &di : data_info) {
if (!di.data_scheduled()) return false;
}
return true;
}
bool Dump_reader::Table_info::all_data_loaded() const {
for (const auto &di : data_info) {
if (!di.data_loaded()) return false;
}
return true;
}
void Dump_reader::Table_data_info::rescan_data(const Files &files,
Dump_reader *reader) {
bool found_data = false;
const auto try_to_add_chunk = [&files, reader, &found_data,
this](auto &&...params) {
static_assert(sizeof...(params) == 0 || sizeof...(params) == 2);
// default values for non-chunked case
size_t idx = 0;
bool last_chunk = true;
if constexpr (sizeof...(params) == 2) {
std::tie(idx, last_chunk) = std::forward_as_tuple(params...);
}
const auto it = files.find(
dump::common::get_table_data_filename(basename, extension, params...));
if (it == files.end()) {
return false;
}
if (idx >= available_chunks.size()) {
available_chunks.resize(idx + 1, {});
}
available_chunks[idx] = *it;
reader->m_contents.dump_size += it->size();
++chunks_seen;
found_data = true;
if (last_chunk) {
last_chunk_seen = true;
}
return true;
};
// in older versions of Shell, empty tables would create a single data file
// using the non-chunked name format even if file was supposed to be chunked
try_to_add_chunk();
if (chunked) {
if (chunks_seen < available_chunks.size()) {
// search for chunks that we're still missing
for (size_t i = 0, s = available_chunks.size(); i < s; ++i) {
// If we have found the last chunk, then the last element in the array
// will already be set. If we didn't find the last chunk, then the last
// element in the array is going to be a regular chunk.
if (!available_chunks[i].has_value()) {
try_to_add_chunk(i, false);
}
}
}
if (!last_chunk_seen) {
// We don't know how many chunks there are, try to find a consecutive
// sequence of chunk IDs. If sequence stops, check if the final chunk
// follows, then always break as we don't want to scan too many files.
for (size_t i = available_chunks.size(); i < files.size(); ++i) {
if (!try_to_add_chunk(i, false)) {
try_to_add_chunk(i, true);
break;
}
}
}
}
if (found_data) reader->m_tables_with_data.insert(this);
}
std::string Dump_reader::View_info::script_name() const {
return dump::common::get_table_filename(basename);
}
std::string Dump_reader::View_info::pre_script_name() const {
return dump::common::get_table_data_filename(basename, "pre.sql");
}
void Dump_reader::View_info::rescan(const Files &files) {
// check for the sql file for the schema
if (!sql_seen) {
if (files.find(script_name()) != files.end()) {
sql_seen = true;
}
}
if (!sql_pre_seen) {
if (files.find(pre_script_name()) != files.end()) {
sql_pre_seen = true;
}
}
}
bool Dump_reader::Schema_info::ready() const {
if (md_done && (!has_sql || sql_seen)) {
return true;
}
return false;
}
std::string Dump_reader::Schema_info::script_name() const {
return dump::common::get_schema_filename(basename);
}
std::string Dump_reader::Schema_info::metadata_name() const {
// schema.json
return dump::common::get_schema_filename(basename, "json");
}
bool Dump_reader::Schema_info::should_fetch_metadata_file(
const Files &files) const {
if (!md_loaded && files.find(metadata_name()) != files.end()) {
return true;
}
return false;
}
void Dump_reader::Schema_info::update_metadata(const std::string &data,
Dump_reader *reader) {
auto md = parse_metadata(data, metadata_name());
has_sql = md->get_bool("includesDdl", true);
has_view_sql = md->get_bool("includesViewsDdl", has_sql);
has_data = md->get_bool("includesData", true);
shcore::Dictionary_t basenames = md->get_map("basenames");
if (md->has_key("tables")) {
for (const auto &t : *md->get_array("tables")) {
if (reader->include_table(schema, t.as_string())) {
auto info = std::make_shared<Table_info>();
info->schema = schema;
info->table = t.as_string();
if (basenames->has_key(info->table))
info->basename = basenames->get_string(info->table);
else
info->basename = basename + "@" + info->table;
// if tables are not going to be analysed, we're marking them as already
// analysed
info->analyze_scheduled = info->analyze_finished =
reader->m_options.analyze_tables() ==
Load_dump_options::Analyze_table_mode::OFF;
tables.emplace(info->table, std::move(info));
}
}
log_debug("%s has %zi tables", schema.c_str(), tables.size());
}
if (md->has_key("views")) {
for (const auto &v : *md->get_array("views")) {
if (reader->include_table(schema, v.as_string())) {
View_info info;
info.schema = schema;
info.table = v.as_string();
if (basenames->has_key(info.table))
info.basename = basenames->get_string(info.table);
else
info.basename = basename + "@" + info.table;
views.emplace_back(std::move(info));
}
}
log_debug("%s has %zi views", schema.c_str(), views.size());
}
if (md->has_key("functions"))
function_names = to_vector_of_strings(md->get_array("functions"));
if (md->has_key("procedures"))
procedure_names = to_vector_of_strings(md->get_array("procedures"));
if (md->has_key("events"))
event_names = to_vector_of_strings(md->get_array("events"));
md_loaded = true;
reader->on_metadata_parsed();
}
void Dump_reader::Schema_info::rescan(mysqlshdk::storage::IDirectory *dir,
const Files &files, Dump_reader *reader,
shcore::Thread_pool *pool) {
log_debug("Scanning contents of schema '%s'", schema.c_str());
if (md_loaded && !md_done) {
// we have the list of tables, so check for their metadata and data files
std::size_t files_to_fetch = 0;
for (auto &t : tables) {
if (!t.second->ready()) {
if (t.second->should_fetch_metadata_file(files)) {
// metadata available, fetch it, parse it, then rescan asynchronously
reader->on_metadata_available();
++files_to_fetch;
pool->add_task(
[dir, mdpath = t.second->metadata_name()]() {
return fetch_file(dir, mdpath);
},
[table = t.second.get(), &files, reader](std::string &&data) {
table->update_metadata(data, reader);
table->rescan(files);
});
} else {
// metadata already parsed, scan synchronously
t.second->rescan(files);
}
}
}
if (files_to_fetch > 0 && !dir->is_local()) {
log_info("Fetching %zu table metadata files for schema `%s`...",
files_to_fetch, schema.c_str());
}
for (auto &v : views) {
if (!v.ready()) {
v.rescan(files);
}
}
}
// check for the sql file for the schema
if (!sql_seen) {
if (files.find(script_name()) != files.end()) {
sql_seen = true;
}
}
}
void Dump_reader::Schema_info::check_if_ready() {
if (md_loaded && !md_done) {
bool children_done = true;
for (const auto &t : tables) {
if (!t.second->ready()) {
children_done = false;
break;
}
}
if (children_done) {
for (const auto &v : views) {
if (!v.ready()) {
children_done = false;
break;
}
}
}
md_done = children_done;
if (md_done) {
log_debug("All metadata for schema `%s` was scanned", schema.c_str());
}
}
}
void Dump_reader::Schema_info::rescan_data(const Files &files,
Dump_reader *reader) {
for (auto &t : tables) {
for (auto &di : t.second->data_info) {
if (!di.data_dumped()) {
di.rescan_data(files, reader);
}
}
}
}
bool Dump_reader::Dump_info::ready() const {
return sql && post_sql && has_users == !!users_sql && md_done;
}
void Dump_reader::Dump_info::rescan(mysqlshdk::storage::IDirectory *dir,
const Files &files, Dump_reader *reader,
dump::Progress_thread *progress_thread) {
if (!sql && files.find({"@.sql"}) != files.end()) {
sql = std::make_unique<std::string>(fetch_file(dir, "@.sql"));
}
if (!post_sql && files.find({"@.post.sql"}) != files.end()) {
post_sql = std::make_unique<std::string>(fetch_file(dir, "@.post.sql"));
}
if (has_users && !users_sql && files.find({"@.users.sql"}) != files.end()) {
users_sql = std::make_unique<std::string>(fetch_file(dir, "@.users.sql"));
}
if (!md_done) {
rescan_metadata(dir, files, reader, progress_thread);
}
rescan_data(files, reader);
}
void Dump_reader::Dump_info::rescan_metadata(
mysqlshdk::storage::IDirectory *dir, const Files &files,
Dump_reader *reader, dump::Progress_thread *progress_thread) {
const auto thread_pool_ptr = reader->create_thread_pool();
const auto pool = thread_pool_ptr.get();
std::atomic<uint64_t> task_producers{0};
const auto maybe_shutdown = [&task_producers, pool]() {
if (0 == --task_producers) {
pool->tasks_done();
}
};
++task_producers;
dump::Progress_thread::Stage *stage = nullptr;
shcore::on_leave_scope finish_stage([&stage]() {
if (stage) {
stage->finish();
}
});
if (progress_thread) {
dump::Progress_thread::Progress_config config;
config.current = [reader]() { return reader->metadata_parsed(); };
config.total = [reader]() { return reader->metadata_available(); };
config.is_total_known = [&task_producers]() { return 0 == task_producers; };
stage =
progress_thread->start_stage("Scanning metadata", std::move(config));
}
pool->start_threads();
for (const auto &s : schemas) {
if (!s.second->ready()) {
if (s.second->should_fetch_metadata_file(files)) {
// metadata available, fetch it, parse it, then rescan asynchronously
reader->on_metadata_available();
++task_producers;
pool->add_task(
[dir, mdpath = s.second->metadata_name()]() {
return fetch_file(dir, mdpath);
},
[&maybe_shutdown, schema = s.second.get(), dir, &files, reader,
pool](std::string &&data) {
shcore::on_leave_scope cleanup(
[&maybe_shutdown]() { maybe_shutdown(); });
schema->update_metadata(data, reader);
schema->rescan(dir, files, reader, pool);
});
} else {
// metadata already parsed, scan synchronously
s.second->rescan(dir, files, reader, pool);
}
}
}
maybe_shutdown();
pool->process();
check_if_ready();
}
void Dump_reader::Dump_info::check_if_ready() {
if (!md_done) {
bool children_done = true;
for (const auto &s : schemas) {
s.second->check_if_ready();
if (!s.second->ready()) {
children_done = false;
}
}
md_done = children_done;
if (md_done) log_debug("All metadata for dump was scanned");
}
}
void Dump_reader::Dump_info::rescan_data(const Files &files,
Dump_reader *reader) {
for (const auto &s : schemas) {
if (s.second->ready()) {
log_debug("Scanning data of schema '%s'", s.second->schema.c_str());
s.second->rescan_data(files, reader);
}
}
}
void Dump_reader::Dump_info::parse_done_metadata(
mysqlshdk::storage::IDirectory *dir) {
shcore::Dictionary_t metadata = fetch_metadata(dir, "@.done.json");
log_info("Dump %s is complete", dir->full_path().masked().c_str());
if (metadata) {
if (metadata->has_key("dataBytes")) {
data_size = metadata->get_uint("dataBytes");
} else {
log_warning(
"Dump metadata file @.done.json does not contain dataBytes "
"information");
}
if (metadata->has_key("tableDataBytes")) {
for (const auto &schema : *metadata->get_map("tableDataBytes")) {
for (const auto &table : *schema.second.as_map()) {
table_data_size[schema.first][table.first] = table.second.as_uint();
}
}
} else {
log_warning(
"Dump metadata file @.done.json does not contain tableDataBytes "
"information");
}
// only exists in 1.0.1+
if (metadata->has_key("chunkFileBytes")) {
for (const auto &file : *metadata->get_map("chunkFileBytes")) {
chunk_sizes[file.first] = file.second.as_uint();
}
}
} else {
log_warning("Dump metadata file @.done.json is invalid");
}
}
std::unique_ptr<mysqlshdk::storage::IFile>
Dump_reader::create_progress_file_handle() const {
return m_options.create_progress_file_handle();
}
void Dump_reader::show_metadata() const {
if (m_options.show_metadata()) {
const auto metadata = shcore::make_dict(
"Dump_metadata",
shcore::make_dict("Binlog_file", binlog_file(), "Binlog_position",
binlog_position(), "Executed_GTID_set",
gtid_executed()));
const auto console = current_console();
console->println();
console->println(shcore::Value(metadata).yaml());
}
}
bool Dump_reader::should_create_pks() const {
return m_options.should_create_pks(m_contents.create_invisible_pks);
}
std::unique_ptr<shcore::Thread_pool> Dump_reader::create_thread_pool() const {
auto threads = m_options.threads_count();
if (!m_dir->is_local()) {
// in case of remote dumps we're using more threads, in order to compensate
// for download times
threads *= 4;
}
return std::make_unique<shcore::Thread_pool>(
m_options.background_threads_count(threads));
}
void Dump_reader::on_table_metadata_parsed(const Table_info &info) {
const auto partitions = info.data_info.size();
assert(partitions > 0);
if (info.data_info[0].has_data) {
++m_tables_to_load;
m_tables_and_partitions_to_load += partitions;
if (partitions > 1) {
m_dump_has_partitions = true;
}
}
on_metadata_parsed();
}
bool Dump_reader::include_schema(const std::string &schema) const {
return m_options.filters().schemas().is_included(override_schema(schema));
}
bool Dump_reader::include_table(const std::string &schema,
const std::string &table) const {
return m_options.filters().tables().is_included(override_schema(schema),
table);
}
bool Dump_reader::include_event(const std::string &schema,
const std::string &event) const {
return m_options.filters().events().is_included(override_schema(schema),
event);
}
bool Dump_reader::include_routine(const std::string &schema,
const std::string &routine) const {
return m_options.filters().routines().is_included(override_schema(schema),
routine);
}
bool Dump_reader::include_trigger(const std::string &schema,
const std::string &table,
const std::string &trigger) const {
return m_options.filters().triggers().is_included(override_schema(schema),
table, trigger);
}
const std::string &Dump_reader::override_schema(const std::string &s) const {
if (!m_schema_override.has_value()) return s;
const auto &value = *m_schema_override;
return value.first == s ? value.second : s;
}
void Dump_reader::on_chunk_loaded(std::string_view schema,
std::string_view table,
std::string_view partition) {
const auto t = find_table(schema, table, "chunk was loaded");
for (auto &tdi : t->data_info) {
if (tdi.partition == partition) {
++tdi.chunks_loaded;
return;
}
}
throw std::logic_error(
shcore::str_format("Unable to find partition %.*s of table %.*s in "
"schema %.*s whose chunk was loaded",
static_cast<int>(partition.length()), partition.data(),
static_cast<int>(table.length()), table.data(),
static_cast<int>(schema.length()), schema.data()));
}
void Dump_reader::on_index_end(std::string_view schema,
std::string_view table) {
find_table(schema, table, "indexes were created")->indexes_created = true;
}
void Dump_reader::on_analyze_end(std::string_view schema,
std::string_view table) {
find_table(schema, table, "analysis was finished")->analyze_finished = true;
}
Dump_reader::Table_info *Dump_reader::find_table(std::string_view schema,
std::string_view table,
std::string_view context) {
const auto s = m_contents.schemas.find(std::string{schema});
if (s == m_contents.schemas.end()) {
throw std::logic_error(
shcore::str_format("Unable to find schema %.*s whose %.*s",
static_cast<int>(schema.length()), schema.data(),
static_cast<int>(context.length()), context.data()));
}
const auto t = s->second->tables.find(std::string{table});
if (t == s->second->tables.end()) {
throw std::logic_error(shcore::str_format(
"Unable to find table %.*s in schema %.*s whose %.*s",
static_cast<int>(table.length()), table.data(),
static_cast<int>(schema.length()), schema.data(),
static_cast<int>(context.length()), context.data()));
}
return t->second.get();
}
} // namespace mysqlsh