grt::StringRef SqlEditorForm::do_exec_sql()

in backend/wbprivate/sqlide/wb_sql_editor_form.cpp [1943:2386]


grt::StringRef SqlEditorForm::do_exec_sql(Ptr self_ptr, std::shared_ptr<std::string> sql, SqlEditorPanel *editor,
                                          ExecFlags flags, RecordsetsRef result_list) {

  logDebug("Background task for sql execution started\n");

  bool use_non_std_delimiter = (flags & NeedNonStdDelimiter) != 0;
  bool dont_add_limit_clause = (flags & DontAddLimitClause) != 0;
  std::map<std::string, std::int64_t> ps_stats;
  std::vector<PSStage> ps_stages;
  std::vector<PSWait> ps_waits;
  bool query_ps_stats = collect_ps_statement_events();
  std::string query_ps_statement_events_error;
  std::string statement;
  int max_query_size_to_log = (int)bec::GRTManager::get()->get_app_option_int("DbSqlEditor:MaxQuerySizeToHistory", 0);
  int limit_rows = 0;
  if (bec::GRTManager::get()->get_app_option_int("SqlEditor:LimitRows") != 0)
    limit_rows = (int)bec::GRTManager::get()->get_app_option_int("SqlEditor:LimitRowsCount", 0);

  bec::GRTManager::get()->replace_status_text(_("Executing Query..."));

  std::shared_ptr<SqlEditorForm> self_ref = (self_ptr).lock();
  SqlEditorForm *self = (self_ref).get();
  if (!self) {
    logError("Couldn't aquire lock for SQL editor form\n");
    return grt::StringRef("");
  }

  // add_log_message() will increment this variable on errors or warnings
  _exec_sql_error_count = 0;

  bool interrupted = true;
  sql::Driver *dbc_driver = nullptr;
  try {
    RecMutexLock use_dbc_conn_mutex(ensure_valid_usr_connection());

    dbc_driver = _usr_dbc_conn->ref->getDriver();
    dbc_driver->threadInit();

    bool is_running_query = true;
    AutoSwap<bool> is_running_query_keeper(_is_running_query, is_running_query);
    update_menu_and_toolbar();

    _has_pending_log_messages = false;
    base::ScopeExitTrigger schedule_log_messages_refresh(std::bind(&SqlEditorForm::refresh_log_messages, this, true));

    SqlFacade::Ref sql_facade = SqlFacade::instance_for_rdbms(rdbms());
    Sql_syntax_check::Ref sql_syntax_check = sql_facade->sqlSyntaxCheck();
    Sql_specifics::Ref sql_specifics = sql_facade->sqlSpecifics();

    bool ran_set_sql_mode = false;
    bool logging_queries;
    std::vector<std::pair<std::size_t, std::size_t>> statement_ranges;
    sql_facade->splitSqlScript(sql->c_str(), sql->size(),
                               use_non_std_delimiter ? sql_specifics->non_std_sql_delimiter() : ";", statement_ranges);

    if (statement_ranges.size() > 1) {
      query_ps_stats = false;
      query_ps_statement_events_error = "Query stats can only be fetched when a single statement is executed.";
    }

    if (!max_query_size_to_log || max_query_size_to_log >= (int)sql->size()) {
      logging_queries = true;
    } else {
      std::list<std::string> warning;

      warning.push_back(base::strfmt("Skipping history entries for %li statements, total %li bytes",
                                     (long)statement_ranges.size(), (long)sql->size()));
      _history->add_entry(warning);
      logging_queries = false;
    }

    // Intentionally allow any value. For values <= 0 show no result set at all.
    ssize_t max_resultset_count = bec::GRTManager::get()->get_app_option_int("DbSqlEditor::MaxResultsets", 50);
    ssize_t total_result_count = (editor != nullptr) ? editor->resultset_count() : 0; // Consider pinned result sets.

    bool results_left = false;
    for (auto &statement_range : statement_ranges) {
      logDebug3("Executing statement range: %lu, %lu...\n", statement_range.first, statement_range.second);

      statement = sql->substr(statement_range.first, statement_range.second);
      std::list<std::string> sub_statements;
      sql_facade->splitSqlScript(statement, sub_statements);
      std::size_t multiple_statement_count = sub_statements.size();
      bool is_multiple_statement = (1 < multiple_statement_count);

      {
        statement = strip_text(statement, false, true);
        if (statement.empty())
          continue;

        Sql_syntax_check::Statement_type statement_type = sql_syntax_check->determine_statement_type(statement);

        logDebug3("Determined statement type: %u\n", statement_type);
        if (Sql_syntax_check::sql_empty == statement_type)
          continue;

        std::string schema_name;
        std::string table_name;

        if (logging_queries) {
          std::list<std::string> statements;
          statements.push_back(statement);
          _history->add_entry(statements);
        }

        Recordset_cdbc_storage::Ref data_storage;

        // for select queries add limit clause if specified by global option
        if (!is_multiple_statement && (Sql_syntax_check::sql_select == statement_type)) {
          data_storage = Recordset_cdbc_storage::create();
          data_storage->set_gather_field_info(true);
          data_storage->rdbms(rdbms());
          data_storage->setUserConnectionGetter(
            std::bind(&SqlEditorForm::getUserConnection, this, std::placeholders::_1, std::placeholders::_2));
          data_storage->setAuxConnectionGetter(
            std::bind(&SqlEditorForm::getAuxConnection, this, std::placeholders::_1, std::placeholders::_2));

          SqlFacade::String_tuple_list column_names;

          if (!table_name.empty() ||
              sql_facade->parseSelectStatementForEdit(statement, schema_name, table_name, column_names)) {
            data_storage->schema_name(schema_name.empty() ? _usr_dbc_conn->active_schema : schema_name);
            data_storage->table_name(table_name);
            logDebug3("Result will be editable\n");
          } else {
            data_storage->readonly_reason(
              "Statement must be a SELECT for columns of a single table with a primary key for its results to be "
              "editable.");
            logDebug3("Result will not be editable\n");
          }

          data_storage->sql_query(statement);

          {
            bool do_limit = !dont_add_limit_clause && limit_rows > 0;
            data_storage->limit_rows(do_limit);

            if (limit_rows > 0)
              data_storage->limit_rows_count(limit_rows);
          }
          statement = data_storage->decorated_sql_query();
        }

        {
          RowId log_message_index = add_log_message(DbSqlEditorLog::BusyMsg, _("Running..."), statement,
                                                    ((Sql_syntax_check::sql_select == statement_type) ? "? / ?" : "?"));

          bool statement_failed = false;
          long long updated_rows_count = -1;
          Timer statement_exec_timer(false);
          Timer statement_fetch_timer(false);
          std::shared_ptr<sql::Statement> dbc_statement(_usr_dbc_conn->ref->createStatement());
          bool is_result_set_first = false;

          if (_usr_dbc_conn->is_stop_query_requested)
            throw std::runtime_error(
              _("Query execution has been stopped, the connection to the DB server was not restarted, any open "
                "transaction remains open"));

          try {
            {
              base::ScopeExitTrigger schedule_statement_exec_timer_stop(std::bind(&Timer::stop, &statement_exec_timer));
              statement_exec_timer.run();
              is_result_set_first = dbc_statement->execute(statement);
            }
            logDebug3("Query executed successfully\n");

            updated_rows_count = dbc_statement->getUpdateCount();

            // XXX: coalesce all the special queries here and act on them *after* all queries have run.
            // Especially the drop command is redirected twice to idle tasks, kicking so in totally asynchronously
            // and killing any intermittent USE commands.
            // Updating the UI during a run of many commands is not useful either.
            if (Sql_syntax_check::sql_use == statement_type)
              cache_active_schema_name();
            if (Sql_syntax_check::sql_set == statement_type && statement.find("@sql_mode") != std::string::npos)
              ran_set_sql_mode = true;
            if (Sql_syntax_check::sql_drop == statement_type)
              update_live_schema_tree(statement);
          } catch (sql::SQLException &e) {
            std::string err_msg;
            // safe mode
            switch (e.getErrorCode()) {
              case 1046: // not default DB selected
                err_msg = strfmt(_("Error Code: %i. %s\nSelect the default DB to be used by double-clicking its name "
                                   "in the SCHEMAS list in the sidebar."),
                                 e.getErrorCode(), e.what());
                break;
              case 1175: // safe mode
                err_msg = strfmt(_("Error Code: %i. %s\nTo disable safe mode, toggle the option in Preferences -> SQL "
                                   "Editor and reconnect."),
                                 e.getErrorCode(), e.what());
                break;
              default:
                err_msg = strfmt(_("Error Code: %i. %s"), e.getErrorCode(), e.what());
                break;
            }
            set_log_message(log_message_index, DbSqlEditorLog::ErrorMsg, err_msg, statement,
                            statement_exec_timer.duration_formatted());
            statement_failed = true;
          } catch (std::exception &e) {
            std::string err_msg = strfmt(_("Error: %s"), e.what());
            set_log_message(log_message_index, DbSqlEditorLog::ErrorMsg, err_msg, statement,
                            statement_exec_timer.duration_formatted());
            statement_failed = true;
          }
          if (statement_failed) {
            if (_continueOnError)
              continue; // goto next statement
            else
              goto stop_processing_sql_script;
          }

          sql::mysql::MySQL_Connection *mysql_connection =
            dynamic_cast<sql::mysql::MySQL_Connection *>(dbc_statement->getConnection());
          sql::SQLString last_statement_info;
          if (mysql_connection != nullptr)
            last_statement_info = mysql_connection->getLastStatementInfo();
          if (updated_rows_count >= 0) {
            std::string message = strfmt(_("%lli row(s) affected"), updated_rows_count);
            bool has_warning = false;
            if (flags & ShowWarnings) {
              std::string warnings_message;
              const sql::SQLWarning *warnings = dbc_statement->getWarnings();
              if (warnings) {
                int count = 0;
                const sql::SQLWarning *w = warnings;
                while (w) {
                  warnings_message.append(strfmt("\n%i %s", w->getErrorCode(), w->getMessage().c_str()));
                  count++;
                  w = w->getNextWarning();
                }
                message.append(strfmt(_(", %i warning(s):"), count));
                has_warning = true;
              }
              if (!warnings_message.empty())
                message.append(warnings_message);
            }
            if (!last_statement_info->empty())
              message.append("\n").append(last_statement_info);
            set_log_message(log_message_index, has_warning ? DbSqlEditorLog::WarningMsg : DbSqlEditorLog::OKMsg,
                            message, statement, statement_exec_timer.duration_formatted());
          }

          logDebug2("Processing result sets\n");
          int resultset_count = 0;
          bool more_results = is_result_set_first;
          bool reuse_log_msg = false;
          if ((updated_rows_count < 0) || is_multiple_statement) {
            for (std::size_t processed_substatements_count = 0;
                 processed_substatements_count < multiple_statement_count; ++processed_substatements_count) {
              do {
                if (more_results) {
                  if (total_result_count == max_resultset_count) {
                    int result = mforms::Utilities::show_warning(
                      _("Maximum result count reached"),
                      "No further result tabs will be displayed as the maximm number has been reached. \nYou may stop "
                      "the operation, leaving the connection out of sync. You'll have to got o 'Query->Reconnect to "
                      "server' menu item to reset the state.\n\n Do you want to cancel the operation?",
                      "Yes", "No");
                    if (result == mforms::ResultOk) {
                      add_log_message(DbSqlEditorLog::ErrorMsg,
                                      "No more results could be displayed. Operation canceled by user.", statement,
                                      "");
                      dbc_statement->cancel();
                      dbc_statement->close();
                      return grt::StringRef("");
                    }
                    add_log_message(
                      DbSqlEditorLog::WarningMsg,
                      "No more results will be displayed because the maximum number of result sets was reached.",
                      statement, "");
                  }

                  if (!reuse_log_msg && ((updated_rows_count >= 0) || (resultset_count)))
                    log_message_index = add_log_message(DbSqlEditorLog::BusyMsg, _("Fetching..."), statement, "- / ?");
                  else
                    set_log_message(log_message_index, DbSqlEditorLog::BusyMsg, _("Fetching..."), statement,
                                    statement_exec_timer.duration_formatted() + " / ?");
                  reuse_log_msg = false;
                  std::shared_ptr<sql::ResultSet> dbc_resultset;
                  {
                    base::ScopeExitTrigger schedule_statement_fetch_timer_stop(
                      std::bind(&Timer::stop, &statement_fetch_timer));
                    statement_fetch_timer.run();

                    // need a separate exception catcher here, because sometimes a query error
                    // will only throw an exception after fetching starts, which causes the busy spinner
                    // to be active forever, since the exception is logged in a new log_id/row
                    // XXX this could also be caused by a bug in Connector/C++
                    try {
                      dbc_resultset.reset(dbc_statement->getResultSet());
                    } catch (sql::SQLException &e) {
                      std::string err_msg;
                      // safe mode
                      switch (e.getErrorCode()) {
                        case 1046: // not default DB selected
                          err_msg = strfmt(_("Error Code: %i. %s\nSelect the default DB to be used by double-clicking "
                                             "its name in the SCHEMAS list in the sidebar."),
                                           e.getErrorCode(), e.what());
                          break;
                        case 1175: // safe mode
                          err_msg = strfmt(_("Error Code: %i. %s\nTo disable safe mode, toggle the option in "
                                             "Preferences -> SQL Editor and reconnect."),
                                           e.getErrorCode(), e.what());
                          break;
                        default:
                          err_msg = strfmt(_("Error Code: %i. %s"), e.getErrorCode(), e.what());
                          break;
                      }

                      set_log_message(log_message_index, DbSqlEditorLog::ErrorMsg, err_msg, statement,
                                      statement_exec_timer.duration_formatted());

                      if (_continueOnError)
                        continue; // goto next statement
                      else
                        goto stop_processing_sql_script;
                    }
                  }

                  std::string exec_and_fetch_durations =
                    (((updated_rows_count >= 0) || (resultset_count)) ? std::string("-")
                                                                      : statement_exec_timer.duration_formatted()) +
                    " / " + statement_fetch_timer.duration_formatted();
                  if (total_result_count >= max_resultset_count)
                    set_log_message(log_message_index, DbSqlEditorLog::OKMsg, "Row count could not be verified",
                                    statement, exec_and_fetch_durations);
                  else if (dbc_resultset) {
                    if (!data_storage) {
                      data_storage = Recordset_cdbc_storage::create();
                      data_storage->set_gather_field_info(true);
                      data_storage->rdbms(rdbms());
                      data_storage->setUserConnectionGetter(std::bind(&SqlEditorForm::getUserConnection, this,
                                                                      std::placeholders::_1, std::placeholders::_2));
                      data_storage->setAuxConnectionGetter(std::bind(&SqlEditorForm::getAuxConnection, this,
                                                                     std::placeholders::_1, std::placeholders::_2));
                      if (table_name.empty())
                        data_storage->sql_query(statement);
                      data_storage->schema_name(schema_name);
                      data_storage->table_name(table_name);
                    }

                    data_storage->dbc_statement(dbc_statement);
                    data_storage->dbc_resultset(dbc_resultset);
                    data_storage->reloadable(!is_multiple_statement &&
                                             (Sql_syntax_check::sql_select == statement_type));

                    logDebug3("Creation and setup of a new result set...\n");

                    Recordset::Ref rs = Recordset::create(exec_sql_task);
                    rs->is_field_value_truncation_enabled(true);
                    rs->setPreserveRowFilter(
                      bec::GRTManager::get()->get_app_option_int("SqlEditor:PreserveRowFilter") == 1);
                    rs->apply_changes_cb =
                      std::bind(&SqlEditorForm::apply_changes_to_recordset, this, Recordset::Ptr(rs));
                    rs->generator_query(statement);

                    {
                      if (query_ps_stats) {
                        query_ps_statistics(_usr_dbc_conn->id, ps_stats);
                        ps_stages = query_ps_stages(ps_stats["EVENT_ID"]);
                        ps_waits = query_ps_waits(ps_stats["EVENT_ID"]);
                        query_ps_stats = false;
                      }
                      RecordsetData *rdata = new RecordsetData();
                      rdata->duration = statement_exec_timer.duration();
                      rdata->ps_stat_error = query_ps_statement_events_error;
                      rdata->ps_stat_info = ps_stats;
                      rdata->ps_stage_info = ps_stages;
                      rdata->ps_wait_info = ps_waits;
                      rs->set_client_data(rdata);
                    }

                    rs->data_storage(data_storage);
                    rs->reset(true);

                    if (data_storage->valid()) // query statement
                    {
                      if (result_list)
                        result_list->push_back(rs);

                      if (editor)
                        editor->add_panel_for_recordset_from_main(rs);

                      std::string statement_res_msg = std::to_string(rs->row_count()) + _(" row(s) returned");
                      if (!last_statement_info->empty())
                        statement_res_msg.append("\n").append(last_statement_info);

                      set_log_message(log_message_index, DbSqlEditorLog::OKMsg, statement_res_msg, statement,
                                      exec_and_fetch_durations);
                    }
                    ++resultset_count;
                  } else {
                    reuse_log_msg = true;
                  }
                  ++total_result_count;
                  data_storage.reset();
                }
              } while ((more_results = dbc_statement->getMoreResults()));
            }
          }

          if ((updated_rows_count < 0) && !(resultset_count))
            set_log_message(log_message_index, DbSqlEditorLog::OKMsg, _("OK"), statement,
                            statement_exec_timer.duration_formatted());
        }
      }
    } // statement range loop

    if (results_left) {
      exec_sql_task->execute_in_main_thread(
        std::bind(&mforms::Utilities::show_warning, _("Result set limit reached"),
                  _("There were more results than "
                    "result tabs could be opened, because the set maximum limit was reached. You can change this "
                    "limit in the preferences."),
                  _("OK"), "", ""),
        true, false);
    }

    bec::GRTManager::get()->replace_status_text(_("Query Completed"));
    interrupted = false;

  stop_processing_sql_script:
    if (interrupted)
      bec::GRTManager::get()->replace_status_text(_("Query interrupted"));
    // try to minimize the times this is called, since this will change the state of the connection
    // after a user query is ran (eg, it will reset all warnings)
    if (ran_set_sql_mode)
      cache_sql_mode();
  }
  CATCH_ANY_EXCEPTION_AND_DISPATCH(statement)

  if (dbc_driver)
    dbc_driver->threadEnd();

  logDebug("SQL execution finished\n");

  update_menu_and_toolbar();

  _usr_dbc_conn->is_stop_query_requested = false;

  return grt::StringRef("");
}