in modules/util/import_table/load_data.cc [575:951]
void Load_data_worker::execute(
const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
std::unique_ptr<mysqlshdk::storage::IFile> file,
const Transaction_options &options) {
try {
File_info fi;
fi.worker_id = m_thread_id;
fi.prog_data_bytes = m_prog_sent_bytes;
fi.prog_file_bytes = m_prog_file_bytes;
fi.user_interrupt = &m_interrupt;
fi.max_rate = m_opt.max_rate();
uint64_t max_trx_size = 0;
const auto query = [&session](const auto &sql) {
return session->query(sql);
};
const auto execute = [&session](const auto &sql) { session->execute(sql); };
const auto executef = [&session](const auto &sql, auto &&...args) {
session->executef(sql, std::forward<decltype(args)>(args)...);
};
// this sets the character_set_database and collation_database server
// variables to the values the schema has
executef("USE !;", m_opt.schema());
// SQL mode:
// - no_auto_value_on_zero - normally, 0 generates the next sequence
// number, use this mode to prevent this behaviour (solves problems if
// dump has 0 stored in an AUTO_INCREMENT column)
execute("SET SQL_MODE = 'no_auto_value_on_zero';");
// if user has specified the character set, set the session variables
// related to the client connection
if (!m_opt.character_set().empty()) {
executef("SET NAMES ?;", m_opt.character_set());
}
// BUG#34173126, BUG#33360787 - loading when global auto-commit is OFF fails
execute("SET autocommit = 1");
// set session variables
execute("SET unique_checks = 0");
execute("SET foreign_key_checks = 0");
execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
session->set_local_infile_userdata(static_cast<void *>(&fi));
session->set_local_infile_init(local_infile_init);
session->set_local_infile_read(local_infile_read);
session->set_local_infile_end(local_infile_end);
session->set_local_infile_error(local_infile_error);
const std::string on_duplicate_rows =
m_opt.replace_duplicates() ? std::string{"REPLACE "} : std::string{};
const std::string character_set =
m_opt.character_set().empty()
? ""
: "CHARACTER SET " +
shcore::quote_sql_string(m_opt.character_set()) + " ";
const std::string partition =
m_opt.partition().empty()
? ""
: "PARTITION (" + shcore::quote_identifier(m_opt.partition()) +
") ";
try {
for (const auto &s : m_opt.session_init_sql()) {
log_info("Executing custom session init SQL: %s", s.c_str());
session->execute(s);
}
} catch (const shcore::Error &e) {
throw shcore::Exception::runtime_error(
"Error while executing sessionInitSql: " + e.format());
}
const std::string query_body =
on_duplicate_rows + "INTO TABLE " +
shcore::quote_identifier(m_opt.schema()) + '.' +
shcore::quote_identifier(m_opt.table()) + ' ' + partition +
character_set + m_opt.dialect().build_sql();
const auto &decode_columns = m_opt.decode_columns();
std::string query_ignore_lines;
std::string query_columns;
const auto columns = m_opt.columns().get();
if (columns && !columns->empty()) {
const std::vector<std::string> x(columns->size(), "!");
const auto placeholders = shcore::str_join(
*columns, ", ",
[&decode_columns](const shcore::Value &c) -> std::string {
if (c.type == shcore::Value_type::UInteger) {
// user defined variable
return "@" + c.as_string();
} else if (c.type == shcore::Value_type::Integer) {
// We do not want user variable to be negative: `@-1`
if (c.as_int() < 0) {
throw shcore::Exception::value_error(
"User variable binding in 'columns' option must be "
"non-negative integer value");
}
// user defined variable
return "@" + c.as_string();
} else if (c.type == shcore::Value_type::String) {
const auto column_name = c.as_string();
std::string prefix;
if (decode_columns.find(column_name) != decode_columns.end()) {
prefix = "@";
}
return prefix + shcore::quote_identifier(column_name);
} else {
throw shcore::Exception::type_error(
"Option 'columns' " + type_name(shcore::Value_type::String) +
" (column name) or non-negative " +
type_name(shcore::Value_type::Integer) +
" (user variable binding) expected, but value is " +
type_name(c.type));
}
});
query_columns = " (" + placeholders + ")";
}
if (!decode_columns.empty()) {
query_columns += " SET ";
for (const auto &it : decode_columns) {
if (it.second == "UNHEX" || it.second == "FROM_BASE64") {
query_columns += shcore::sqlformat("! = " + it.second + "(@!),",
it.first, it.first);
} else if (!it.second.empty()) {
query_columns += shcore::sqlformat("! = ", it.first);
// Append "as is".
query_columns += "(" + it.second + "),";
}
}
query_columns.pop_back(); // strip the last ,
}
char worker_name[64];
snprintf(worker_name, sizeof(worker_name), "[Worker%03u] ",
static_cast<unsigned int>(m_thread_id));
uint64_t subchunk = 0;
while (true) {
if (!fi.continuation) {
// new file, reset the counter
subchunk = 0;
}
++subchunk;
mysqlsh::import_table::File_import_info r;
const auto format_error_message = [&worker_name, &fi, &r](
const std::string &error,
const std::string &extra = {}) {
const auto task = fi.filehandler ? fi.filehandler->filename() : "";
auto msg = worker_name + task + ": " + error;
if (fi.range_read) {
msg += " @ file bytes range [" + std::to_string(r.range.first) +
", " + std::to_string(r.range.second) + ")";
}
if (!extra.empty()) {
msg += ": " + extra;
}
return msg;
};
try {
if (m_range_queue) {
if (!fi.continuation) {
r = m_range_queue->pop();
if (r.is_guard) {
break;
}
fi.filehandler = m_opt.create_file_handle(r.file_path);
fi.range_read = r.range_read;
if (r.range_read) {
fi.bytes_left = r.range.second - r.range.first;
// TODO(pawel): maxBytesPerTransaction should not be ignored in
// case of a single uncompressed file imported in chunks
max_trx_size = 0;
// if fi.range_read == true, we're importing in chunks from a
// single uncompressed file, rows were already skipped
query_ignore_lines.clear();
} else {
fi.bytes_left = 0;
max_trx_size = m_opt.max_transaction_size();
if (m_opt.skip_rows_count() > 0) {
// this handles the case when a single compressed file or
// multiple files are being imported and skipRows is set
query_ignore_lines = " IGNORE " +
std::to_string(m_opt.skip_rows_count()) +
" LINES";
}
}
fi.buffer =
Transaction_buffer(m_opt.dialect(), fi.filehandler.get(),
max_trx_size, r.range.first);
}
} else {
if (file != nullptr) {
fi.filehandler = std::move(file);
file.reset(nullptr);
fi.buffer = Transaction_buffer(m_opt.dialect(),
fi.filehandler.get(), options);
}
fi.range_read = false;
fi.bytes_left = 0;
}
} catch (const std::exception &e) {
m_thread_exception[m_thread_id] = std::current_exception();
mysqlsh::current_console()->print_error(format_error_message(e.what()));
throw std::exception(e);
}
const std::string task = fi.filehandler->filename();
std::shared_ptr<mysqlshdk::db::IResult> load_result = nullptr;
const std::string query_prefix = shcore::sqlformat(
"LOAD DATA LOCAL INFILE ? ", fi.filehandler->full_path().masked());
const std::string full_query =
query_prefix + query_body + query_ignore_lines + query_columns;
#ifndef NDEBUG
log_debug("%s %s %i", worker_name, full_query.c_str(),
m_range_queue != nullptr);
#endif
fi.buffer.on_oversized_row([&format_error_message](uint64_t i) {
// this is only printed once
if (1 == i) {
mysqlsh::current_console()->print_warning(format_error_message(
"Attempting to load a row longer than maxBytesPerTransaction."));
}
});
try {
fi.buffer.before_query();
load_result = query(m_query_comment + full_query);
fi.buffer.flush_done(&fi.continuation);
m_stats.total_data_bytes += fi.data_bytes;
m_stats.total_file_bytes += fi.file_bytes;
if (!fi.continuation) {
// increase the counter only when there are no more subchunks
++m_stats.total_files_processed;
}
} catch (const mysqlshdk::db::Error &e) {
m_thread_exception[m_thread_id] = std::current_exception();
const auto error_msg = format_error_message(e.format(), full_query);
mysqlsh::current_console()->print_error(error_msg);
if (fi.buffer.oversized_rows()) {
mysqlsh::current_console()->print_note(format_error_message(
"This error has been reported for a sub-chunk which has at least "
"one row longer than maxBytesPerTransaction (" +
std::to_string(options.max_trx_size) + " bytes)."));
}
throw std::runtime_error(error_msg);
} catch (const mysqlshdk::rest::Connection_error &e) {
m_thread_exception[m_thread_id] = std::current_exception();
const auto error_msg = format_error_message(e.what());
mysqlsh::current_console()->print_error(error_msg);
throw std::runtime_error(error_msg);
} catch (const std::exception &e) {
m_thread_exception[m_thread_id] = std::current_exception();
const auto error_msg = format_error_message(e.what());
mysqlsh::current_console()->print_error(error_msg);
throw std::exception(e);
}
const auto warnings_num =
load_result ? load_result->get_warning_count() : 0;
{
const char *mysql_info = session->get_mysql_info();
const auto status =
worker_name + task + ": " + (mysql_info ? mysql_info : "ERROR") +
((options.max_trx_size == 0 && max_trx_size == 0)
? ""
: (fi.continuation
? " - flushed sub-chunk " + std::to_string(subchunk)
: " - loading finished in " + std::to_string(subchunk) +
" sub-chunks"));
if (m_opt.verbose()) {
mysqlsh::current_console()->print_info(status);
} else {
log_info("%s", status.c_str());
}
if (mysql_info) {
size_t records = 0;
size_t deleted = 0;
size_t skipped = 0;
size_t warnings = 0;
sscanf(mysql_info,
"Records: %zu Deleted: %zu Skipped: %zu Warnings: %zu\n",
&records, &deleted, &skipped, &warnings);
m_stats.total_records += records;
m_stats.total_deleted += deleted;
m_stats.total_skipped += skipped;
m_stats.total_warnings += warnings;
}
if (warnings_num > 0) {
// show first k warnings, where k = warnings_to_show
constexpr int warnings_to_show = 5;
auto w = load_result->fetch_one_warning();
for (int i = 0; w && i < warnings_to_show;
w = load_result->fetch_one_warning(), i++) {
const std::string msg =
task + " error " + std::to_string(w->code) + ": " + w->msg;
switch (w->level) {
case mysqlshdk::db::Warning::Level::Error:
mysqlsh::current_console()->print_error(msg);
break;
case mysqlshdk::db::Warning::Level::Warn:
mysqlsh::current_console()->print_warning(msg);
break;
case mysqlshdk::db::Warning::Level::Note:
mysqlsh::current_console()->print_note(msg);
break;
}
}
// log remaining warnings
size_t remaining_warnings_count = 0;
for (; w; w = load_result->fetch_one_warning()) {
remaining_warnings_count++;
const std::string msg =
task + " error " + std::to_string(w->code) + ": " + w->msg;
switch (w->level) {
case mysqlshdk::db::Warning::Level::Error:
log_error("%s", msg.c_str());
break;
case mysqlshdk::db::Warning::Level::Warn:
log_warning("%s", msg.c_str());
break;
case mysqlshdk::db::Warning::Level::Note:
log_info("%s", msg.c_str());
break;
}
}
if (remaining_warnings_count > 0) {
mysqlsh::current_console()->print_info(
"Check mysqlsh.log for " +
std::to_string(remaining_warnings_count) + " more warning" +
(remaining_warnings_count == 1 ? "" : "s") + ".");
}
}
}
if (!m_range_queue && !fi.continuation) break;
}
} catch (...) {
m_thread_exception[m_thread_id] = std::current_exception();
}
}