mysqlshdk/libs/db/mysqlx/xsession.cc (703 lines of code) (raw):
/*
* Copyright (c) 2017, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms,
* as designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <mysqlx_version.h>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
#include "mysqlshdk/include/shellcore/console.h"
#include "mysqlshdk/libs/db/mysqlx/session.h"
#include "mysqlshdk/libs/db/mysqlx/xpl_error.h"
#include "mysqlshdk/libs/utils/debug.h"
#include "mysqlshdk/libs/utils/fault_injection.h"
#include "mysqlshdk/libs/utils/log_sql.h"
#include "mysqlshdk/libs/utils/profiling.h"
#include "mysqlshdk/libs/utils/utils_general.h"
namespace mysqlshdk {
namespace db {
namespace mysqlx {
FI_DEFINE(mysqlx, [](const mysqlshdk::utils::FI::Args &args) {
if (args.get_int("abort", 0)) {
abort();
}
if (args.get_int("code", -1) < 0) {
throw std::logic_error(args.get_string("msg"));
}
throw mysqlshdk::db::Error(args.get_string("msg").c_str(),
args.get_int("code"),
args.get_string("state", {""}).c_str());
});
namespace {
template <typename Message_type>
std::string message_to_text(const std::string &binary_message) {
std::string result;
Message_type msg;
msg.ParseFromString(binary_message);
google::protobuf::TextFormat::PrintToString(msg, &result);
return msg.GetDescriptor()->full_name() + " { " + result + " }";
}
std::ostream &operator<<(std::ostream &os,
const xcl::XProtocol::Message &message) {
std::string output;
std::string name;
google::protobuf::TextFormat::Printer printer;
printer.SetInitialIndentLevel(1);
// special handling for nested messages (at least for Notices)
if (message.GetDescriptor()->full_name() == "Mysqlx.Notice.Frame") {
Mysqlx::Notice::Frame frame =
*static_cast<const Mysqlx::Notice::Frame *>(&message);
switch (frame.type()) {
case ::Mysqlx::Notice::Frame_Type_WARNING: {
const auto payload_as_text =
message_to_text<Mysqlx::Notice::Warning>(frame.payload());
frame.set_payload(payload_as_text);
break;
}
case ::Mysqlx::Notice::Frame_Type_SESSION_VARIABLE_CHANGED: {
const auto payload_as_text =
message_to_text<Mysqlx::Notice::SessionVariableChanged>(
frame.payload());
frame.set_payload(payload_as_text);
break;
}
case ::Mysqlx::Notice::Frame_Type_SESSION_STATE_CHANGED: {
const auto payload_as_text =
message_to_text<Mysqlx::Notice::SessionStateChanged>(
frame.payload());
frame.set_payload(payload_as_text);
break;
}
}
printer.PrintToString(frame, &output);
} else {
printer.PrintToString(message, &output);
}
return os << message.GetDescriptor()->full_name() << " {\n"
<< output << "}\n";
}
std::pair<xcl::XProtocol::Handler_id, xcl::XProtocol::Handler_id>
do_enable_trace(xcl::XSession *session) {
xcl::XProtocol::Handler_id rh, sh;
auto &protocol = session->get_protocol();
rh = protocol.add_received_message_handler(
[](xcl::XProtocol *, const xcl::XProtocol::Server_message_type_id,
const xcl::XProtocol::Message &msg) -> xcl::Handler_result {
std::cout << "<<<< RECEIVE " << msg << "\n";
return xcl::Handler_result::Continue;
});
sh = protocol.add_send_message_handler(
[](xcl::XProtocol *, const xcl::XProtocol::Client_message_type_id,
const xcl::XProtocol::Message &msg) -> xcl::Handler_result {
std::cout << ">>>> SEND " << msg << "\n";
return xcl::Handler_result::Continue;
});
return {sh, rh};
}
} // namespace
//-------------------------- Session Implementation ----------------------------
DEBUG_OBJ_ENABLE(db_mysqlx_Session);
XSession_impl::XSession_impl() : m_prep_stmt_count(0) {
_enable_trace = false;
DEBUG_OBJ_ALLOC(db_mysqlx_Session);
}
void XSession_impl::connect(const mysqlshdk::db::Connection_options &data) {
_mysql = ::xcl::create_session();
if (_enable_trace) _trace_handler = do_enable_trace(_mysql.get());
_connection_options = data;
if (_connection_options.has(mysqlshdk::db::kGetServerPublicKey)) {
_mysql.reset();
throw std::runtime_error(
"X Protocol: Option get-server-public-key is not supported.");
}
if (_connection_options.has(mysqlshdk::db::kServerPublicKeyPath)) {
_mysql.reset();
throw std::runtime_error(
"X Protocol: Option server-public-key-path is not supported.");
}
auto &ssl_options(_connection_options.get_ssl_options());
std::string ssl_mode;
// In shell, the default ssl-mode is preferred, but in the case of DevAPI
// the default mode is required and it is set at mysqlx.getSession()
if (ssl_options.has_default(mysqlshdk::db::kSslMode))
ssl_mode = ssl_options.get_default(mysqlshdk::db::kSslMode);
else
ssl_mode = mysqlshdk::db::kSslModePreferred;
if (ssl_options.has_data()) {
// If no mode is specified and either ssl-ca or ssl-capath are specified
// then it uses VERIFY_CA
if (!ssl_options.has_mode()) {
if (ssl_options.has_value(mysqlshdk::db::kSslCa) ||
ssl_options.has_value(mysqlshdk::db::kSslCaPath)) {
ssl_mode = mysqlshdk::db::kSslModeVerifyCA;
}
// If ssl-mode is specified, then it is used
} else {
ssl_mode = ssl_options.get_mode_name();
}
ssl_options.validate();
if (ssl_options.has_ca())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_ca,
ssl_options.get_ca());
if (ssl_options.has_cert())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_cert,
ssl_options.get_cert());
if (ssl_options.has_key())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_key,
ssl_options.get_key());
if (ssl_options.has_capath())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_ca_path,
ssl_options.get_capath());
if (ssl_options.has_crl())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_crl,
ssl_options.get_crl());
if (ssl_options.has_crlpath())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_crl_path,
ssl_options.get_crlpath());
if (ssl_options.has_tls_version())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Allowed_tls,
ssl_options.get_tls_version());
if (ssl_options.has_cipher())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_cipher,
ssl_options.get_cipher());
}
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_mode, ssl_mode);
_mysql->set_capability(xcl::XSession::Capability_can_handle_expired_password,
true);
auto algs =
_connection_options.has_compression_algorithms()
? shcore::str_lower(_connection_options.get_compression_algorithms())
: "";
if (_connection_options.has_compression()) {
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
_connection_options.get_compression());
} else {
if (algs == "uncompressed")
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
kCompressionDisabled);
else if (algs.empty() || algs.find("uncompressed") != std::string::npos)
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
kCompressionPreferred);
else
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
kCompressionRequired);
}
// default ["deflate_stream","lz4_message","zstd_stream"]
if (!algs.empty()) {
std::vector<std::string> av;
for (const auto &a : shcore::split_string(algs, ",")) {
if (a == "zlib")
av.emplace_back("deflate_stream");
else if (a == "zstd")
av.emplace_back("zstd_stream");
else if (a == "lz4")
av.emplace_back("lz4_message");
else if (a != "uncompressed")
av.emplace_back(a);
}
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_algorithms, av);
}
if (_connection_options.has_compression_level()) {
auto level = _connection_options.get_compression_level();
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_level_server, level);
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_level_client, level);
}
bool user_defined_connection_attributes = false;
if (_connection_options.is_connection_attributes_enabled()) {
auto attrs = _mysql->get_connect_attrs();
attrs.emplace_back("program_name", xcl::Argument_value{"mysqlsh"});
if (!_connection_options.get_connection_attributes().empty()) {
user_defined_connection_attributes = true;
for (const auto &att : _connection_options.get_connection_attributes()) {
std::string attribute = att.first;
std::string value;
if (att.second.has_value()) {
value = *att.second;
}
attrs.emplace_back(attribute, xcl::Argument_value{value});
}
}
_mysql->set_capability(xcl::XSession::Capability_session_connect_attrs,
attrs);
}
// If a specific authentication type was given, it is used
if (_connection_options.has(mysqlshdk::db::kAuthMethod)) {
const auto error = _mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Authentication_method,
_connection_options.get(mysqlshdk::db::kAuthMethod));
if (error) {
_mysql.reset();
store_error_and_throw(
Error(std::string{"Failed to set the authentication method: "} +
error.what(),
error.error()));
}
} else {
// In 8.0.4, trying to connect without SSL to a caching_sha2_password
// account will not work. The error message that's given is also confusing,
// because there's no hint the error is because of no SSL instead of wrong
// password So as a workaround, we force the PLAIN auth type to be always
// attempted at last, at least until libmysqlxclient is fixed to produce a
// specific error msg. In addition, in servers < 8.0.4 the plugin kicks the
// user after the frst authentication attempt, so it is required that
// MYSQL41 is used as the first authentication attempt in order the
// connection to suceed on those servers.
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Authentication_method,
std::vector<std::string>{"MYSQL41", "SHA256_MEMORY", "PLAIN"});
}
// Sets the connection timeout
int64_t connect_timeout = mysqlshdk::db::default_connect_timeout();
if (_connection_options.has(kConnectTimeout)) {
connect_timeout = std::stoi(_connection_options.get(kConnectTimeout));
}
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Connect_timeout,
connect_timeout);
// Set read timeout
if (_connection_options.has(kNetReadTimeout)) {
int64_t read_timeout = std::stoi(_connection_options.get(kNetReadTimeout));
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Read_timeout,
read_timeout);
}
auto handler_id = _mysql->get_protocol().add_notice_handler(
[this](xcl::XProtocol *, const bool,
const Mysqlx::Notice::Frame::Type type, const char *payload,
const uint32_t payload_len) {
if (type == Mysqlx::Notice::Frame_Type_SESSION_STATE_CHANGED) {
Mysqlx::Notice::SessionStateChanged change;
change.ParseFromArray(payload, payload_len);
if (!change.IsInitialized()) {
log_error("Protocol error: Invalid notice received from server: %s",
change.InitializationErrorString().c_str());
} else if (change.param() ==
Mysqlx::Notice::SessionStateChanged::ACCOUNT_EXPIRED) {
_expired_account = true;
}
}
return xcl::Handler_result::Continue;
});
auto unregister_handler_id = shcore::Scoped_callback([this, &handler_id]() {
if (_mysql) _mysql->get_protocol().remove_notice_handler(handler_id);
});
std::vector<Mysqlx::Error> xproto_errors;
auto xproto_errors_handler_id =
_mysql->get_protocol().add_received_message_handler(
[&xproto_errors](
xcl::XProtocol *,
const xcl::XProtocol::Server_message_type_id type_id,
const xcl::XProtocol::Message &msg) -> xcl::Handler_result {
if (type_id == Mysqlx::ServerMessages::ERROR) {
const Mysqlx::Error e = *static_cast<const Mysqlx::Error *>(&msg);
xproto_errors.push_back(e);
}
return xcl::Handler_result::Continue;
});
auto unregister_xproto_errors_handler_id =
shcore::Scoped_callback([this, &xproto_errors_handler_id]() {
if (_mysql)
_mysql->get_protocol().remove_received_message_handler(
xproto_errors_handler_id);
});
xcl::XError err;
std::string host = _connection_options.has_host() &&
!_connection_options.get_ssh_options().has_data()
? _connection_options.get_host()
: std::string{"localhost"};
DBUG_LOG("sqlall", "CONNECT: " << data.uri_endpoint());
if (!_connection_options.has_transport_type() ||
_connection_options.get_transport_type() != mysqlshdk::db::Tcp) {
err = _mysql->connect(
data.has_socket()
? (data.get_socket().empty() ? MYSQLX_UNIX_ADDR
: data.get_socket().c_str())
: nullptr,
data.has_user() ? data.get_user().c_str() : "",
data.has_password() ? data.get_password().c_str() : "",
data.has_schema() ? data.get_schema().c_str() : "");
#ifdef _WIN32
_connection_info = "Localhost via Named pipe";
#else
_connection_info = "Localhost via UNIX socket";
#endif
} else {
err =
_mysql->connect(host.c_str(), _connection_options.get_target_port(),
data.has_user() ? data.get_user().c_str() : "",
data.has_password() ? data.get_password().c_str() : "",
data.has_schema() ? data.get_schema().c_str() : "");
_connection_info = host + " via TCP/IP";
// When neither port or socket were specified on the connection data
// it means it was able to use default connection data
if (!_connection_options.has_port())
_connection_options.set_port(MYSQLX_TCP_PORT);
}
if (err) {
_mysql.reset();
if (err.error() == CR_MALFORMED_PACKET &&
strstr(err.what(), "Unexpected response received from server")) {
std::string message = "Requested session assumes MySQL X Protocol but '" +
data.as_uri(uri::formats::only_transport()) +
"' seems to speak the classic MySQL protocol";
message.append(" (").append(err.what()).append(")");
store_error_and_throw(Error(message.c_str(), CR_MALFORMED_PACKET));
} else if (!user_defined_connection_attributes &&
err.error() == ER_X_CAPABILITY_NOT_FOUND &&
strstr(err.what(), "session_connect_attrs")) {
log_warning(
"Server does not support connection attributes, retrying with then "
"disabled: (%d) %s",
err.error(), err.what());
// When connection attributes is not supported, and the user did not
// explicitly request the registration of connection attributes, a second
// connection attempt with them disabled will be done.
mysqlshdk::db::Connection_options connection_data(data);
connection_data.set(kConnectionAttributes, "false");
connect(connection_data);
return;
} else if (err.error() == CR_SERVER_GONE_ERROR &&
_connection_options.get_ssh_options().has_data()) {
// When the connection is done through SSH tunnel and the tunnel fails to
// open, this error is received from the server
store_error_and_throw(
Error(shcore::str_format("Error MySQL Session through SSH tunnel: %s",
err.what()),
err.error()));
} else {
if (!xproto_errors.empty()) {
auto e = xproto_errors.back();
store_error_and_throw(Error(e.msg().c_str(), e.code()));
}
store_error_and_throw(Error(err.what(), err.error()));
}
}
if (ssl_options.has_tls_ciphersuites())
mysqlsh::current_console()->print_warning(
"X Protocol: tls-ciphersuites option is not yet supported and will "
"be ignored.");
// If the account is not expired, retrieves additional session information
if (!_expired_account) load_session_info();
DBUG_LOG("sql", get_thread_id() << ": CONNECTED: " << data.uri_endpoint());
{
auto log_sql_handler = shcore::current_log_sql();
log_sql_handler->log_connect(data.uri_endpoint(), get_thread_id());
}
// fill in defaults
if (!_connection_options.has_scheme())
_connection_options.set_scheme("mysqlx");
}
void XSession_impl::close() {
// This should be logged, for now commenting to
// avoid having unneeded output on the script mode
if (auto result = _prev_result.lock()) {
while (result->next_resultset()) {
}
}
auto prep_ids = m_prepared_statements;
for (auto stmt_id : prep_ids) {
deallocate_prep_stmt(stmt_id);
}
if (_enable_trace) {
enable_trace(false);
_enable_trace = true;
}
DBUG_LOG("sql", get_thread_id() << ": DISCONNECT");
m_prep_stmt_count = 0;
_connection_id = 0;
_connection_info.clear();
_ssl_cipher.clear();
_mysql.reset();
_version = utils::Version();
_expired_account = false;
_case_sensitive_table_names = false;
_prev_result.reset();
_connection_options = Connection_options();
}
void XSession_impl::enable_trace(bool flag) {
_enable_trace = flag;
if (_mysql) {
if (flag) {
if (_trace_handler.first == 0 && _trace_handler.second == 0)
_trace_handler = do_enable_trace(_mysql.get());
} else {
_mysql->get_protocol().remove_received_message_handler(
_trace_handler.first);
_mysql->get_protocol().remove_received_message_handler(
_trace_handler.second);
}
}
}
void XSession_impl::load_session_info() {
static constexpr char sql[] =
"select @@lower_case_table_names, @@version, connection_id(), "
"variable_value from performance_schema.session_status where "
"variable_name = 'mysqlx_ssl_cipher'";
std::shared_ptr<IResult> result(query(sql, sizeof(sql) - 1));
const IRow *row = result->fetch_one();
if (!row) throw std::logic_error("Unexpected empty result");
_case_sensitive_table_names = row->get_uint(0) == 0;
if (!row->is_null(1)) {
_version = mysqlshdk::utils::Version(row->get_string(1));
}
if (!row->is_null(2)) {
_connection_id = row->get_uint(2);
}
if (!row->is_null(3)) {
_ssl_cipher = row->get_string(3);
}
}
XSession_impl::~XSession_impl() {
DEBUG_OBJ_DEALLOC(db_mysqlx_Session);
_prev_result.reset();
close();
}
void XSession_impl::before_query() {
if (!_mysql) throw std::logic_error("Not connected");
if (auto result = _prev_result.lock()) {
if (result->has_resultset()) {
// buffer the previous result to remove it from the connection
// in case it's still active
result->pre_fetch_rows(false);
}
// if there's no data, we need to call next_resultset() anyway
// (bug#26581651 was filed for this)
// if there are multiple resultsets, they need to drained as well
// (BUG#30825330)
result->drain_resultset();
}
}
std::shared_ptr<IResult> XSession_impl::after_query(
std::unique_ptr<xcl::XQuery_result> result, bool buffered) {
std::shared_ptr<Result> res(new Result(std::move(result)));
res->fetch_metadata();
_prev_result = res;
if (buffered) res->pre_fetch_rows(buffered);
return std::static_pointer_cast<IResult>(res);
}
std::shared_ptr<IResult> XSession_impl::query(const char *sql, size_t len,
bool buffered) {
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("query");
before_query();
FI_TRIGGER_TRAP(mysqlx, mysqlshdk::utils::FI::Trigger_options(
{{"sql", std::string(sql, len)},
{"uri", _connection_options.uri_endpoint()}}));
xcl::XError error;
std::unique_ptr<xcl::XQuery_result> xresult;
{
::Mysqlx::Sql::StmtExecute stmt;
stmt.set_stmt(sql, len);
auto log_sql_handler = shcore::current_log_sql();
log_sql_handler->log(get_thread_id(), std::string_view{sql, len});
DBUG_LOG("sqlall", get_thread_id() << ": QUERY: " << stmt.stmt());
xresult = _mysql->get_protocol().execute_stmt(stmt, &error);
if (error) {
auto err = Error(error.what(), error.error());
log_sql_handler->log(get_thread_id(), std::string_view{sql, len}, err);
}
check_error_and_throw(error, stmt.stmt().c_str());
}
auto result = after_query(std::move(xresult), buffered);
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
void XSession_impl::execute(const char *sql, size_t len) {
std::shared_ptr<IResult> result = query(sql, len, false);
while (result->next_resultset()) {
}
}
std::shared_ptr<IResult> XSession_impl::execute_stmt(
const std::string &ns, const std::string &stmt,
const xcl::Argument_array &args) {
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("execute_stmt");
before_query();
FI_TRIGGER_TRAP(mysqlx, mysqlshdk::utils::FI::Trigger_options(
{{"sql", stmt},
{"uri", _connection_options.uri_endpoint()}}));
xcl::XError error;
if (ns.empty() || ns == "sql") {
auto log_sql_handler = shcore::current_log_sql();
log_sql_handler->log(get_thread_id(), stmt);
DBUG_LOG("sqlall", get_thread_id() << ": QUERY: " << stmt);
}
std::unique_ptr<xcl::XQuery_result> xresult(
_mysql->execute_stmt(ns, stmt, args, &error));
if (error) {
auto log_sql_handler = shcore::current_log_sql();
auto err = Error(error.what(), error.error());
log_sql_handler->log(get_thread_id(), stmt, err);
}
check_error_and_throw(error, stmt.c_str());
auto result = after_query(std::move(xresult));
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
std::shared_ptr<IResult> XSession_impl::execute_crud(
const ::Mysqlx::Crud::Insert &msg) {
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("Mysqlx::Crud::Insert");
before_query();
xcl::XError error;
std::unique_ptr<xcl::XQuery_result> xresult(
_mysql->get_protocol().execute_insert(msg, &error));
check_error_and_throw(error);
auto result = after_query(std::move(xresult));
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
std::shared_ptr<IResult> XSession_impl::execute_crud(
const ::Mysqlx::Crud::Update &msg) {
before_query();
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("Mysqlx::Crud::Update");
xcl::XError error;
std::unique_ptr<xcl::XQuery_result> xresult(
_mysql->get_protocol().execute_update(msg, &error));
check_error_and_throw(error);
auto result = after_query(std::move(xresult));
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
std::shared_ptr<IResult> XSession_impl::execute_crud(
const ::Mysqlx::Crud::Delete &msg) {
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("Mysqlx::Crud::Delete");
before_query();
xcl::XError error;
std::unique_ptr<xcl::XQuery_result> xresult(
_mysql->get_protocol().execute_delete(msg, &error));
check_error_and_throw(error);
auto result = after_query(std::move(xresult));
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
std::shared_ptr<IResult> XSession_impl::execute_crud(
const ::Mysqlx::Crud::Find &msg) {
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("Mysqlx::Crud::Find");
before_query();
xcl::XError error;
std::unique_ptr<xcl::XQuery_result> xresult(
_mysql->get_protocol().execute_find(msg, &error));
check_error_and_throw(error);
auto result = after_query(std::move(xresult));
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
void XSession_impl::prepare_stmt(const ::Mysqlx::Prepare::Prepare &msg) {
before_query();
xcl::XError error = _mysql->get_protocol().send(msg);
check_error_and_throw(error);
error = _mysql->get_protocol().recv_ok();
check_error_and_throw(error);
m_prepared_statements.insert(msg.stmt_id());
}
std::shared_ptr<IResult> XSession_impl::execute_prep_stmt(
const ::Mysqlx::Prepare::Execute &msg) {
mysqlshdk::utils::Profile_timer timer;
timer.stage_begin("execute_prep_stmt");
before_query();
xcl::XError error;
std::unique_ptr<xcl::XQuery_result> xresult(
_mysql->get_protocol().execute_prep_stmt(msg, &error));
check_error_and_throw(error);
auto result = after_query(std::move(xresult));
timer.stage_end();
result->set_execution_time(timer.total_seconds_elapsed());
return result;
}
void XSession_impl::deallocate_prep_stmt(uint32_t stmt_id) {
before_query();
::Mysqlx::Prepare::Deallocate deallocate;
deallocate.set_stmt_id(stmt_id);
xcl::XError error = _mysql->get_protocol().send(deallocate);
check_error_and_throw(error);
error = _mysql->get_protocol().recv_ok();
check_error_and_throw(error);
// Removes the prepared statement from the list
m_prepared_statements.erase(stmt_id);
}
void XSession_impl::enable_notices(
const std::vector<GlobalNotice::Type> &types) {
if (!m_handler_installed) {
m_handler_installed = true;
_mysql->get_protocol().add_notice_handler(
[this](const xcl::XProtocol *protocol, const bool is_global,
const Mysqlx::Notice::Frame::Type type, const char *data,
const uint32_t data_length) -> xcl::Handler_result {
return global_notice_handler(protocol, is_global, type, data,
data_length);
});
}
xcl::XError err;
xcl::Argument_value::Object arg_obj;
xcl::Argument_value arg_value;
arg_value = arg_obj;
for (auto type : types) {
switch (type) {
case GlobalNotice::GRViewChanged:
arg_obj["notice"] = xcl::Argument_value::Arguments{
xcl::Argument_value("group_replication/membership/quorum_loss",
xcl::Argument_value::String_type::k_string),
xcl::Argument_value("group_replication/membership/view",
xcl::Argument_value::String_type::k_string),
xcl::Argument_value("group_replication/status/role_change",
xcl::Argument_value::String_type::k_string),
xcl::Argument_value("group_replication/status/state_change",
xcl::Argument_value::String_type::k_string)};
_mysql->execute_stmt("mysqlx", "enable_notices",
{xcl::Argument_value(arg_obj)}, &err);
check_error_and_throw(err);
break;
}
}
}
xcl::Handler_result XSession_impl::global_notice_handler(
const xcl::XProtocol *, const bool is_global,
const Mysqlx::Notice::Frame::Type type, const char *payload,
const uint32_t payload_size) {
if (is_global) {
bool notify = false;
GlobalNotice notice;
if (type == Mysqlx::Notice::Frame::GROUP_REPLICATION_STATE_CHANGED) {
Mysqlx::Notice::GroupReplicationStateChanged change;
change.ParseFromArray(payload, static_cast<int>(payload_size));
notice.type = GlobalNotice::GRViewChanged;
notice.info.gr_view_change.view_id = change.view_id();
notify = true;
}
if (notify) {
for (auto l = m_notice_listeners.begin();
l != m_notice_listeners.end();) {
auto current = l++;
if (!(*current)(notice)) {
m_notice_listeners.erase(current);
}
}
}
}
return xcl::Handler_result::Continue;
}
void XSession_impl::add_notice_listener(
const std::function<bool(const GlobalNotice &)> &listener) {
m_notice_listeners.push_back(listener);
}
void XSession_impl::check_error_and_throw(const xcl::XError &error,
const char *context) {
if (error) {
store_error_and_throw(Error(error.what(), error.error()), context);
} else {
m_last_error.reset(nullptr);
}
}
void XSession_impl::store_error_and_throw(const Error &error,
const char *context) {
if (DBUG_EVALUATE_IF("sqlall", 1, 0) || !context) {
DBUG_LOG("sql", get_thread_id() << ": ERROR: " << error.format());
} else {
DBUG_LOG("sql", get_thread_id() << ": ERROR: " << error.format()
<< "\n\twhile executing: " << context);
}
m_last_error.reset(new Error(error));
throw error;
}
void XSession_impl::setup_default_character_set() {
const auto version = get_server_version();
if (version >= mysqlshdk::utils::Version(8, 0, 0) &&
version < mysqlshdk::utils::Version(8, 0, 20)) {
execute("SET @@SESSION.collation_connection = 'utf8mb4_0900_ai_ci';");
}
}
std::function<std::shared_ptr<Session>()> g_session_factory;
std::string Session::escape_string(std::string_view s) const {
std::string res;
res.resize(s.size() * 2 + 1);
size_t l = mysql_escape_string(&res[0], s.data(), s.size());
res.resize(l);
return res;
}
std::function<std::shared_ptr<Session>()> Session::set_factory_function(
std::function<std::shared_ptr<Session>()> factory) {
auto old = g_session_factory;
g_session_factory = factory;
return old;
}
std::shared_ptr<Session> Session::create() {
if (g_session_factory) return g_session_factory();
return std::shared_ptr<Session>(new Session());
}
} // namespace mysqlx
} // namespace db
} // namespace mysqlshdk