in mysqlshdk/libs/db/mysqlx/xsession.cc [153:488]
void XSession_impl::connect(const mysqlshdk::db::Connection_options &data) {
_mysql = ::xcl::create_session();
if (_enable_trace) _trace_handler = do_enable_trace(_mysql.get());
_connection_options = data;
if (_connection_options.has(mysqlshdk::db::kGetServerPublicKey)) {
_mysql.reset();
throw std::runtime_error(
"X Protocol: Option get-server-public-key is not supported.");
}
if (_connection_options.has(mysqlshdk::db::kServerPublicKeyPath)) {
_mysql.reset();
throw std::runtime_error(
"X Protocol: Option server-public-key-path is not supported.");
}
auto &ssl_options(_connection_options.get_ssl_options());
std::string ssl_mode;
// In shell, the default ssl-mode is preferred, but in the case of DevAPI
// the default mode is required and it is set at mysqlx.getSession()
if (ssl_options.has_default(mysqlshdk::db::kSslMode))
ssl_mode = ssl_options.get_default(mysqlshdk::db::kSslMode);
else
ssl_mode = mysqlshdk::db::kSslModePreferred;
if (ssl_options.has_data()) {
// If no mode is specified and either ssl-ca or ssl-capath are specified
// then it uses VERIFY_CA
if (!ssl_options.has_mode()) {
if (ssl_options.has_value(mysqlshdk::db::kSslCa) ||
ssl_options.has_value(mysqlshdk::db::kSslCaPath)) {
ssl_mode = mysqlshdk::db::kSslModeVerifyCA;
}
// If ssl-mode is specified, then it is used
} else {
ssl_mode = ssl_options.get_mode_name();
}
ssl_options.validate();
if (ssl_options.has_ca())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_ca,
ssl_options.get_ca());
if (ssl_options.has_cert())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_cert,
ssl_options.get_cert());
if (ssl_options.has_key())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_key,
ssl_options.get_key());
if (ssl_options.has_capath())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_ca_path,
ssl_options.get_capath());
if (ssl_options.has_crl())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_crl,
ssl_options.get_crl());
if (ssl_options.has_crlpath())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_crl_path,
ssl_options.get_crlpath());
if (ssl_options.has_tls_version())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Allowed_tls,
ssl_options.get_tls_version());
if (ssl_options.has_cipher())
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_cipher,
ssl_options.get_cipher());
}
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_mode, ssl_mode);
_mysql->set_capability(xcl::XSession::Capability_can_handle_expired_password,
true);
auto algs =
_connection_options.has_compression_algorithms()
? shcore::str_lower(_connection_options.get_compression_algorithms())
: "";
if (_connection_options.has_compression()) {
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
_connection_options.get_compression());
} else {
if (algs == "uncompressed")
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
kCompressionDisabled);
else if (algs.empty() || algs.find("uncompressed") != std::string::npos)
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
kCompressionPreferred);
else
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
kCompressionRequired);
}
// default ["deflate_stream","lz4_message","zstd_stream"]
if (!algs.empty()) {
std::vector<std::string> av;
for (const auto &a : shcore::split_string(algs, ",")) {
if (a == "zlib")
av.emplace_back("deflate_stream");
else if (a == "zstd")
av.emplace_back("zstd_stream");
else if (a == "lz4")
av.emplace_back("lz4_message");
else if (a != "uncompressed")
av.emplace_back(a);
}
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_algorithms, av);
}
if (_connection_options.has_compression_level()) {
auto level = _connection_options.get_compression_level();
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_level_server, level);
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Compression_level_client, level);
}
bool user_defined_connection_attributes = false;
if (_connection_options.is_connection_attributes_enabled()) {
auto attrs = _mysql->get_connect_attrs();
attrs.emplace_back("program_name", xcl::Argument_value{"mysqlsh"});
if (!_connection_options.get_connection_attributes().empty()) {
user_defined_connection_attributes = true;
for (const auto &att : _connection_options.get_connection_attributes()) {
std::string attribute = att.first;
std::string value;
if (att.second.has_value()) {
value = *att.second;
}
attrs.emplace_back(attribute, xcl::Argument_value{value});
}
}
_mysql->set_capability(xcl::XSession::Capability_session_connect_attrs,
attrs);
}
// If a specific authentication type was given, it is used
if (_connection_options.has(mysqlshdk::db::kAuthMethod)) {
const auto error = _mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Authentication_method,
_connection_options.get(mysqlshdk::db::kAuthMethod));
if (error) {
_mysql.reset();
store_error_and_throw(
Error(std::string{"Failed to set the authentication method: "} +
error.what(),
error.error()));
}
} else {
// In 8.0.4, trying to connect without SSL to a caching_sha2_password
// account will not work. The error message that's given is also confusing,
// because there's no hint the error is because of no SSL instead of wrong
// password So as a workaround, we force the PLAIN auth type to be always
// attempted at last, at least until libmysqlxclient is fixed to produce a
// specific error msg. In addition, in servers < 8.0.4 the plugin kicks the
// user after the frst authentication attempt, so it is required that
// MYSQL41 is used as the first authentication attempt in order the
// connection to suceed on those servers.
_mysql->set_mysql_option(
xcl::XSession::Mysqlx_option::Authentication_method,
std::vector<std::string>{"MYSQL41", "SHA256_MEMORY", "PLAIN"});
}
// Sets the connection timeout
int64_t connect_timeout = mysqlshdk::db::default_connect_timeout();
if (_connection_options.has(kConnectTimeout)) {
connect_timeout = std::stoi(_connection_options.get(kConnectTimeout));
}
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Connect_timeout,
connect_timeout);
// Set read timeout
if (_connection_options.has(kNetReadTimeout)) {
int64_t read_timeout = std::stoi(_connection_options.get(kNetReadTimeout));
_mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Read_timeout,
read_timeout);
}
auto handler_id = _mysql->get_protocol().add_notice_handler(
[this](xcl::XProtocol *, const bool,
const Mysqlx::Notice::Frame::Type type, const char *payload,
const uint32_t payload_len) {
if (type == Mysqlx::Notice::Frame_Type_SESSION_STATE_CHANGED) {
Mysqlx::Notice::SessionStateChanged change;
change.ParseFromArray(payload, payload_len);
if (!change.IsInitialized()) {
log_error("Protocol error: Invalid notice received from server: %s",
change.InitializationErrorString().c_str());
} else if (change.param() ==
Mysqlx::Notice::SessionStateChanged::ACCOUNT_EXPIRED) {
_expired_account = true;
}
}
return xcl::Handler_result::Continue;
});
auto unregister_handler_id = shcore::Scoped_callback([this, &handler_id]() {
if (_mysql) _mysql->get_protocol().remove_notice_handler(handler_id);
});
std::vector<Mysqlx::Error> xproto_errors;
auto xproto_errors_handler_id =
_mysql->get_protocol().add_received_message_handler(
[&xproto_errors](
xcl::XProtocol *,
const xcl::XProtocol::Server_message_type_id type_id,
const xcl::XProtocol::Message &msg) -> xcl::Handler_result {
if (type_id == Mysqlx::ServerMessages::ERROR) {
const Mysqlx::Error e = *static_cast<const Mysqlx::Error *>(&msg);
xproto_errors.push_back(e);
}
return xcl::Handler_result::Continue;
});
auto unregister_xproto_errors_handler_id =
shcore::Scoped_callback([this, &xproto_errors_handler_id]() {
if (_mysql)
_mysql->get_protocol().remove_received_message_handler(
xproto_errors_handler_id);
});
xcl::XError err;
std::string host = _connection_options.has_host() &&
!_connection_options.get_ssh_options().has_data()
? _connection_options.get_host()
: std::string{"localhost"};
DBUG_LOG("sqlall", "CONNECT: " << data.uri_endpoint());
if (!_connection_options.has_transport_type() ||
_connection_options.get_transport_type() != mysqlshdk::db::Tcp) {
err = _mysql->connect(
data.has_socket()
? (data.get_socket().empty() ? MYSQLX_UNIX_ADDR
: data.get_socket().c_str())
: nullptr,
data.has_user() ? data.get_user().c_str() : "",
data.has_password() ? data.get_password().c_str() : "",
data.has_schema() ? data.get_schema().c_str() : "");
#ifdef _WIN32
_connection_info = "Localhost via Named pipe";
#else
_connection_info = "Localhost via UNIX socket";
#endif
} else {
err =
_mysql->connect(host.c_str(), _connection_options.get_target_port(),
data.has_user() ? data.get_user().c_str() : "",
data.has_password() ? data.get_password().c_str() : "",
data.has_schema() ? data.get_schema().c_str() : "");
_connection_info = host + " via TCP/IP";
// When neither port or socket were specified on the connection data
// it means it was able to use default connection data
if (!_connection_options.has_port())
_connection_options.set_port(MYSQLX_TCP_PORT);
}
if (err) {
_mysql.reset();
if (err.error() == CR_MALFORMED_PACKET &&
strstr(err.what(), "Unexpected response received from server")) {
std::string message = "Requested session assumes MySQL X Protocol but '" +
data.as_uri(uri::formats::only_transport()) +
"' seems to speak the classic MySQL protocol";
message.append(" (").append(err.what()).append(")");
store_error_and_throw(Error(message.c_str(), CR_MALFORMED_PACKET));
} else if (!user_defined_connection_attributes &&
err.error() == ER_X_CAPABILITY_NOT_FOUND &&
strstr(err.what(), "session_connect_attrs")) {
log_warning(
"Server does not support connection attributes, retrying with then "
"disabled: (%d) %s",
err.error(), err.what());
// When connection attributes is not supported, and the user did not
// explicitly request the registration of connection attributes, a second
// connection attempt with them disabled will be done.
mysqlshdk::db::Connection_options connection_data(data);
connection_data.set(kConnectionAttributes, "false");
connect(connection_data);
return;
} else if (err.error() == CR_SERVER_GONE_ERROR &&
_connection_options.get_ssh_options().has_data()) {
// When the connection is done through SSH tunnel and the tunnel fails to
// open, this error is received from the server
store_error_and_throw(
Error(shcore::str_format("Error MySQL Session through SSH tunnel: %s",
err.what()),
err.error()));
} else {
if (!xproto_errors.empty()) {
auto e = xproto_errors.back();
store_error_and_throw(Error(e.msg().c_str(), e.code()));
}
store_error_and_throw(Error(err.what(), err.error()));
}
}
if (ssl_options.has_tls_ciphersuites())
mysqlsh::current_console()->print_warning(
"X Protocol: tls-ciphersuites option is not yet supported and will "
"be ignored.");
// If the account is not expired, retrieves additional session information
if (!_expired_account) load_session_info();
DBUG_LOG("sql", get_thread_id() << ": CONNECTED: " << data.uri_endpoint());
{
auto log_sql_handler = shcore::current_log_sql();
log_sql_handler->log_connect(data.uri_endpoint(), get_thread_id());
}
// fill in defaults
if (!_connection_options.has_scheme())
_connection_options.set_scheme("mysqlx");
}