void Load_data_worker::execute()

in modules/util/import_table/load_data.cc [575:951]


void Load_data_worker::execute(
    const std::shared_ptr<mysqlshdk::db::mysql::Session> &session,
    std::unique_ptr<mysqlshdk::storage::IFile> file,
    const Transaction_options &options) {
  try {
    File_info fi;
    fi.worker_id = m_thread_id;
    fi.prog_data_bytes = m_prog_sent_bytes;
    fi.prog_file_bytes = m_prog_file_bytes;
    fi.user_interrupt = &m_interrupt;
    fi.max_rate = m_opt.max_rate();
    uint64_t max_trx_size = 0;
    const auto query = [&session](const auto &sql) {
      return session->query(sql);
    };
    const auto execute = [&session](const auto &sql) { session->execute(sql); };
    const auto executef = [&session](const auto &sql, auto &&...args) {
      session->executef(sql, std::forward<decltype(args)>(args)...);
    };

    // this sets the character_set_database and collation_database server
    // variables to the values the schema has
    executef("USE !;", m_opt.schema());

    // SQL mode:
    //  - no_auto_value_on_zero - normally, 0 generates the next sequence
    //    number, use this mode to prevent this behaviour (solves problems if
    //    dump has 0 stored in an AUTO_INCREMENT column)
    execute("SET SQL_MODE = 'no_auto_value_on_zero';");

    // if user has specified the character set, set the session variables
    // related to the client connection
    if (!m_opt.character_set().empty()) {
      executef("SET NAMES ?;", m_opt.character_set());
    }

    // BUG#34173126, BUG#33360787 - loading when global auto-commit is OFF fails
    execute("SET autocommit = 1");
    // set session variables
    execute("SET unique_checks = 0");
    execute("SET foreign_key_checks = 0");
    execute("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");

    session->set_local_infile_userdata(static_cast<void *>(&fi));
    session->set_local_infile_init(local_infile_init);
    session->set_local_infile_read(local_infile_read);
    session->set_local_infile_end(local_infile_end);
    session->set_local_infile_error(local_infile_error);

    const std::string on_duplicate_rows =
        m_opt.replace_duplicates() ? std::string{"REPLACE "} : std::string{};

    const std::string character_set =
        m_opt.character_set().empty()
            ? ""
            : "CHARACTER SET " +
                  shcore::quote_sql_string(m_opt.character_set()) + " ";

    const std::string partition =
        m_opt.partition().empty()
            ? ""
            : "PARTITION (" + shcore::quote_identifier(m_opt.partition()) +
                  ") ";

    try {
      for (const auto &s : m_opt.session_init_sql()) {
        log_info("Executing custom session init SQL: %s", s.c_str());
        session->execute(s);
      }
    } catch (const shcore::Error &e) {
      throw shcore::Exception::runtime_error(
          "Error while executing sessionInitSql: " + e.format());
    }

    const std::string query_body =
        on_duplicate_rows + "INTO TABLE " +
        shcore::quote_identifier(m_opt.schema()) + '.' +
        shcore::quote_identifier(m_opt.table()) + ' ' + partition +
        character_set + m_opt.dialect().build_sql();

    const auto &decode_columns = m_opt.decode_columns();

    std::string query_ignore_lines;
    std::string query_columns;
    const auto columns = m_opt.columns().get();

    if (columns && !columns->empty()) {
      const std::vector<std::string> x(columns->size(), "!");
      const auto placeholders = shcore::str_join(
          *columns, ", ",
          [&decode_columns](const shcore::Value &c) -> std::string {
            if (c.type == shcore::Value_type::UInteger) {
              // user defined variable
              return "@" + c.as_string();
            } else if (c.type == shcore::Value_type::Integer) {
              // We do not want user variable to be negative: `@-1`
              if (c.as_int() < 0) {
                throw shcore::Exception::value_error(
                    "User variable binding in 'columns' option must be "
                    "non-negative integer value");
              }
              // user defined variable
              return "@" + c.as_string();
            } else if (c.type == shcore::Value_type::String) {
              const auto column_name = c.as_string();
              std::string prefix;

              if (decode_columns.find(column_name) != decode_columns.end()) {
                prefix = "@";
              }

              return prefix + shcore::quote_identifier(column_name);
            } else {
              throw shcore::Exception::type_error(
                  "Option 'columns' " + type_name(shcore::Value_type::String) +
                  " (column name) or non-negative " +
                  type_name(shcore::Value_type::Integer) +
                  " (user variable binding) expected, but value is " +
                  type_name(c.type));
            }
          });

      query_columns = " (" + placeholders + ")";
    }

    if (!decode_columns.empty()) {
      query_columns += " SET ";

      for (const auto &it : decode_columns) {
        if (it.second == "UNHEX" || it.second == "FROM_BASE64") {
          query_columns += shcore::sqlformat("! = " + it.second + "(@!),",
                                             it.first, it.first);
        } else if (!it.second.empty()) {
          query_columns += shcore::sqlformat("! = ", it.first);
          // Append "as is".
          query_columns += "(" + it.second + "),";
        }
      }

      query_columns.pop_back();  // strip the last ,
    }

    char worker_name[64];
    snprintf(worker_name, sizeof(worker_name), "[Worker%03u] ",
             static_cast<unsigned int>(m_thread_id));

    uint64_t subchunk = 0;
    while (true) {
      if (!fi.continuation) {
        // new file, reset the counter
        subchunk = 0;
      }

      ++subchunk;
      mysqlsh::import_table::File_import_info r;

      const auto format_error_message = [&worker_name, &fi, &r](
                                            const std::string &error,
                                            const std::string &extra = {}) {
        const auto task = fi.filehandler ? fi.filehandler->filename() : "";
        auto msg = worker_name + task + ": " + error;

        if (fi.range_read) {
          msg += " @ file bytes range [" + std::to_string(r.range.first) +
                 ", " + std::to_string(r.range.second) + ")";
        }

        if (!extra.empty()) {
          msg += ": " + extra;
        }

        return msg;
      };

      try {
        if (m_range_queue) {
          if (!fi.continuation) {
            r = m_range_queue->pop();

            if (r.is_guard) {
              break;
            }

            fi.filehandler = m_opt.create_file_handle(r.file_path);
            fi.range_read = r.range_read;

            if (r.range_read) {
              fi.bytes_left = r.range.second - r.range.first;
              // TODO(pawel): maxBytesPerTransaction should not be ignored in
              // case of a single uncompressed file imported in chunks
              max_trx_size = 0;

              // if fi.range_read == true, we're importing in chunks from a
              // single uncompressed file, rows were already skipped
              query_ignore_lines.clear();
            } else {
              fi.bytes_left = 0;
              max_trx_size = m_opt.max_transaction_size();

              if (m_opt.skip_rows_count() > 0) {
                // this handles the case when a single compressed file or
                // multiple files are being imported and skipRows is set
                query_ignore_lines = " IGNORE " +
                                     std::to_string(m_opt.skip_rows_count()) +
                                     " LINES";
              }
            }

            fi.buffer =
                Transaction_buffer(m_opt.dialect(), fi.filehandler.get(),
                                   max_trx_size, r.range.first);
          }
        } else {
          if (file != nullptr) {
            fi.filehandler = std::move(file);
            file.reset(nullptr);
            fi.buffer = Transaction_buffer(m_opt.dialect(),
                                           fi.filehandler.get(), options);
          }
          fi.range_read = false;
          fi.bytes_left = 0;
        }
      } catch (const std::exception &e) {
        m_thread_exception[m_thread_id] = std::current_exception();
        mysqlsh::current_console()->print_error(format_error_message(e.what()));
        throw std::exception(e);
      }

      const std::string task = fi.filehandler->filename();
      std::shared_ptr<mysqlshdk::db::IResult> load_result = nullptr;
      const std::string query_prefix = shcore::sqlformat(
          "LOAD DATA LOCAL INFILE ? ", fi.filehandler->full_path().masked());
      const std::string full_query =
          query_prefix + query_body + query_ignore_lines + query_columns;

#ifndef NDEBUG
      log_debug("%s %s %i", worker_name, full_query.c_str(),
                m_range_queue != nullptr);
#endif

      fi.buffer.on_oversized_row([&format_error_message](uint64_t i) {
        // this is only printed once
        if (1 == i) {
          mysqlsh::current_console()->print_warning(format_error_message(
              "Attempting to load a row longer than maxBytesPerTransaction."));
        }
      });

      try {
        fi.buffer.before_query();
        load_result = query(m_query_comment + full_query);
        fi.buffer.flush_done(&fi.continuation);
        m_stats.total_data_bytes += fi.data_bytes;
        m_stats.total_file_bytes += fi.file_bytes;

        if (!fi.continuation) {
          // increase the counter only when there are no more subchunks
          ++m_stats.total_files_processed;
        }
      } catch (const mysqlshdk::db::Error &e) {
        m_thread_exception[m_thread_id] = std::current_exception();
        const auto error_msg = format_error_message(e.format(), full_query);
        mysqlsh::current_console()->print_error(error_msg);

        if (fi.buffer.oversized_rows()) {
          mysqlsh::current_console()->print_note(format_error_message(
              "This error has been reported for a sub-chunk which has at least "
              "one row longer than maxBytesPerTransaction (" +
              std::to_string(options.max_trx_size) + " bytes)."));
        }

        throw std::runtime_error(error_msg);
      } catch (const mysqlshdk::rest::Connection_error &e) {
        m_thread_exception[m_thread_id] = std::current_exception();
        const auto error_msg = format_error_message(e.what());
        mysqlsh::current_console()->print_error(error_msg);
        throw std::runtime_error(error_msg);
      } catch (const std::exception &e) {
        m_thread_exception[m_thread_id] = std::current_exception();
        const auto error_msg = format_error_message(e.what());
        mysqlsh::current_console()->print_error(error_msg);
        throw std::exception(e);
      }

      const auto warnings_num =
          load_result ? load_result->get_warning_count() : 0;

      {
        const char *mysql_info = session->get_mysql_info();
        const auto status =
            worker_name + task + ": " + (mysql_info ? mysql_info : "ERROR") +
            ((options.max_trx_size == 0 && max_trx_size == 0)
                 ? ""
                 : (fi.continuation
                        ? " - flushed sub-chunk " + std::to_string(subchunk)
                        : " - loading finished in " + std::to_string(subchunk) +
                              " sub-chunks"));

        if (m_opt.verbose()) {
          mysqlsh::current_console()->print_info(status);
        } else {
          log_info("%s", status.c_str());
        }

        if (mysql_info) {
          size_t records = 0;
          size_t deleted = 0;
          size_t skipped = 0;
          size_t warnings = 0;

          sscanf(mysql_info,
                 "Records: %zu  Deleted: %zu  Skipped: %zu  Warnings: %zu\n",
                 &records, &deleted, &skipped, &warnings);
          m_stats.total_records += records;
          m_stats.total_deleted += deleted;
          m_stats.total_skipped += skipped;
          m_stats.total_warnings += warnings;
        }

        if (warnings_num > 0) {
          // show first k warnings, where k = warnings_to_show
          constexpr int warnings_to_show = 5;
          auto w = load_result->fetch_one_warning();

          for (int i = 0; w && i < warnings_to_show;
               w = load_result->fetch_one_warning(), i++) {
            const std::string msg =
                task + " error " + std::to_string(w->code) + ": " + w->msg;

            switch (w->level) {
              case mysqlshdk::db::Warning::Level::Error:
                mysqlsh::current_console()->print_error(msg);
                break;
              case mysqlshdk::db::Warning::Level::Warn:
                mysqlsh::current_console()->print_warning(msg);
                break;
              case mysqlshdk::db::Warning::Level::Note:
                mysqlsh::current_console()->print_note(msg);
                break;
            }
          }

          // log remaining warnings
          size_t remaining_warnings_count = 0;
          for (; w; w = load_result->fetch_one_warning()) {
            remaining_warnings_count++;
            const std::string msg =
                task + " error " + std::to_string(w->code) + ": " + w->msg;

            switch (w->level) {
              case mysqlshdk::db::Warning::Level::Error:
                log_error("%s", msg.c_str());
                break;
              case mysqlshdk::db::Warning::Level::Warn:
                log_warning("%s", msg.c_str());
                break;
              case mysqlshdk::db::Warning::Level::Note:
                log_info("%s", msg.c_str());
                break;
            }
          }

          if (remaining_warnings_count > 0) {
            mysqlsh::current_console()->print_info(
                "Check mysqlsh.log for " +
                std::to_string(remaining_warnings_count) + " more warning" +
                (remaining_warnings_count == 1 ? "" : "s") + ".");
          }
        }
      }

      if (!m_range_queue && !fi.continuation) break;
    }
  } catch (...) {
    m_thread_exception[m_thread_id] = std::current_exception();
  }
}