void XSession_impl::connect()

in mysqlshdk/libs/db/mysqlx/xsession.cc [153:488]


void XSession_impl::connect(const mysqlshdk::db::Connection_options &data) {
  _mysql = ::xcl::create_session();
  if (_enable_trace) _trace_handler = do_enable_trace(_mysql.get());

  _connection_options = data;

  if (_connection_options.has(mysqlshdk::db::kGetServerPublicKey)) {
    _mysql.reset();
    throw std::runtime_error(
        "X Protocol: Option get-server-public-key is not supported.");
  }

  if (_connection_options.has(mysqlshdk::db::kServerPublicKeyPath)) {
    _mysql.reset();
    throw std::runtime_error(
        "X Protocol: Option server-public-key-path is not supported.");
  }

  auto &ssl_options(_connection_options.get_ssl_options());

  std::string ssl_mode;

  // In shell, the default ssl-mode is preferred, but in the case of DevAPI
  // the default mode is required and it is set at mysqlx.getSession()
  if (ssl_options.has_default(mysqlshdk::db::kSslMode))
    ssl_mode = ssl_options.get_default(mysqlshdk::db::kSslMode);
  else
    ssl_mode = mysqlshdk::db::kSslModePreferred;

  if (ssl_options.has_data()) {
    // If no mode is specified and either ssl-ca or ssl-capath are specified
    // then it uses VERIFY_CA
    if (!ssl_options.has_mode()) {
      if (ssl_options.has_value(mysqlshdk::db::kSslCa) ||
          ssl_options.has_value(mysqlshdk::db::kSslCaPath)) {
        ssl_mode = mysqlshdk::db::kSslModeVerifyCA;
      }
      // If ssl-mode is specified, then it is used
    } else {
      ssl_mode = ssl_options.get_mode_name();
    }

    ssl_options.validate();

    if (ssl_options.has_ca())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_ca,
                               ssl_options.get_ca());

    if (ssl_options.has_cert())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_cert,
                               ssl_options.get_cert());

    if (ssl_options.has_key())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_key,
                               ssl_options.get_key());

    if (ssl_options.has_capath())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_ca_path,
                               ssl_options.get_capath());

    if (ssl_options.has_crl())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_crl,
                               ssl_options.get_crl());

    if (ssl_options.has_crlpath())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_crl_path,
                               ssl_options.get_crlpath());

    if (ssl_options.has_tls_version())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Allowed_tls,
                               ssl_options.get_tls_version());

    if (ssl_options.has_cipher())
      _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_cipher,
                               ssl_options.get_cipher());
  }

  _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Ssl_mode, ssl_mode);

  _mysql->set_capability(xcl::XSession::Capability_can_handle_expired_password,
                         true);

  auto algs =
      _connection_options.has_compression_algorithms()
          ? shcore::str_lower(_connection_options.get_compression_algorithms())
          : "";
  if (_connection_options.has_compression()) {
    _mysql->set_mysql_option(
        xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
        _connection_options.get_compression());
  } else {
    if (algs == "uncompressed")
      _mysql->set_mysql_option(
          xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
          kCompressionDisabled);
    else if (algs.empty() || algs.find("uncompressed") != std::string::npos)
      _mysql->set_mysql_option(
          xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
          kCompressionPreferred);
    else
      _mysql->set_mysql_option(
          xcl::XSession::Mysqlx_option::Compression_negotiation_mode,
          kCompressionRequired);
  }

  // default ["deflate_stream","lz4_message","zstd_stream"]
  if (!algs.empty()) {
    std::vector<std::string> av;
    for (const auto &a : shcore::split_string(algs, ",")) {
      if (a == "zlib")
        av.emplace_back("deflate_stream");
      else if (a == "zstd")
        av.emplace_back("zstd_stream");
      else if (a == "lz4")
        av.emplace_back("lz4_message");
      else if (a != "uncompressed")
        av.emplace_back(a);
    }
    _mysql->set_mysql_option(
        xcl::XSession::Mysqlx_option::Compression_algorithms, av);
  }

  if (_connection_options.has_compression_level()) {
    auto level = _connection_options.get_compression_level();
    _mysql->set_mysql_option(
        xcl::XSession::Mysqlx_option::Compression_level_server, level);
    _mysql->set_mysql_option(
        xcl::XSession::Mysqlx_option::Compression_level_client, level);
  }

  bool user_defined_connection_attributes = false;
  if (_connection_options.is_connection_attributes_enabled()) {
    auto attrs = _mysql->get_connect_attrs();
    attrs.emplace_back("program_name", xcl::Argument_value{"mysqlsh"});

    if (!_connection_options.get_connection_attributes().empty()) {
      user_defined_connection_attributes = true;

      for (const auto &att : _connection_options.get_connection_attributes()) {
        std::string attribute = att.first;
        std::string value;
        if (att.second.has_value()) {
          value = *att.second;
        }

        attrs.emplace_back(attribute, xcl::Argument_value{value});
      }
    }
    _mysql->set_capability(xcl::XSession::Capability_session_connect_attrs,
                           attrs);
  }

  // If a specific authentication type was given, it is used
  if (_connection_options.has(mysqlshdk::db::kAuthMethod)) {
    const auto error = _mysql->set_mysql_option(
        xcl::XSession::Mysqlx_option::Authentication_method,
        _connection_options.get(mysqlshdk::db::kAuthMethod));

    if (error) {
      _mysql.reset();
      store_error_and_throw(
          Error(std::string{"Failed to set the authentication method: "} +
                    error.what(),
                error.error()));
    }
  } else {
    // In 8.0.4, trying to connect without SSL to a caching_sha2_password
    // account will not work. The error message that's given is also confusing,
    // because there's no hint the error is because of no SSL instead of wrong
    // password So as a workaround, we force the PLAIN auth type to be always
    // attempted at last, at least until libmysqlxclient is fixed to produce a
    // specific error msg. In addition, in servers < 8.0.4 the plugin kicks the
    // user after the frst authentication attempt, so it is required that
    // MYSQL41 is used as the first authentication attempt in order the
    // connection to suceed on those servers.
    _mysql->set_mysql_option(
        xcl::XSession::Mysqlx_option::Authentication_method,
        std::vector<std::string>{"MYSQL41", "SHA256_MEMORY", "PLAIN"});
  }

  // Sets the connection timeout
  int64_t connect_timeout = mysqlshdk::db::default_connect_timeout();
  if (_connection_options.has(kConnectTimeout)) {
    connect_timeout = std::stoi(_connection_options.get(kConnectTimeout));
  }
  _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Connect_timeout,
                           connect_timeout);
  // Set read timeout
  if (_connection_options.has(kNetReadTimeout)) {
    int64_t read_timeout = std::stoi(_connection_options.get(kNetReadTimeout));
    _mysql->set_mysql_option(xcl::XSession::Mysqlx_option::Read_timeout,
                             read_timeout);
  }

  auto handler_id = _mysql->get_protocol().add_notice_handler(
      [this](xcl::XProtocol *, const bool,
             const Mysqlx::Notice::Frame::Type type, const char *payload,
             const uint32_t payload_len) {
        if (type == Mysqlx::Notice::Frame_Type_SESSION_STATE_CHANGED) {
          Mysqlx::Notice::SessionStateChanged change;
          change.ParseFromArray(payload, payload_len);
          if (!change.IsInitialized()) {
            log_error("Protocol error: Invalid notice received from server: %s",
                      change.InitializationErrorString().c_str());
          } else if (change.param() ==
                     Mysqlx::Notice::SessionStateChanged::ACCOUNT_EXPIRED) {
            _expired_account = true;
          }
        }
        return xcl::Handler_result::Continue;
      });

  auto unregister_handler_id = shcore::Scoped_callback([this, &handler_id]() {
    if (_mysql) _mysql->get_protocol().remove_notice_handler(handler_id);
  });

  std::vector<Mysqlx::Error> xproto_errors;

  auto xproto_errors_handler_id =
      _mysql->get_protocol().add_received_message_handler(
          [&xproto_errors](
              xcl::XProtocol *,
              const xcl::XProtocol::Server_message_type_id type_id,
              const xcl::XProtocol::Message &msg) -> xcl::Handler_result {
            if (type_id == Mysqlx::ServerMessages::ERROR) {
              const Mysqlx::Error e = *static_cast<const Mysqlx::Error *>(&msg);
              xproto_errors.push_back(e);
            }

            return xcl::Handler_result::Continue;
          });

  auto unregister_xproto_errors_handler_id =
      shcore::Scoped_callback([this, &xproto_errors_handler_id]() {
        if (_mysql)
          _mysql->get_protocol().remove_received_message_handler(
              xproto_errors_handler_id);
      });

  xcl::XError err;

  std::string host = _connection_options.has_host() &&
                             !_connection_options.get_ssh_options().has_data()
                         ? _connection_options.get_host()
                         : std::string{"localhost"};

  DBUG_LOG("sqlall", "CONNECT: " << data.uri_endpoint());

  if (!_connection_options.has_transport_type() ||
      _connection_options.get_transport_type() != mysqlshdk::db::Tcp) {
    err = _mysql->connect(
        data.has_socket()
            ? (data.get_socket().empty() ? MYSQLX_UNIX_ADDR
                                         : data.get_socket().c_str())
            : nullptr,
        data.has_user() ? data.get_user().c_str() : "",
        data.has_password() ? data.get_password().c_str() : "",
        data.has_schema() ? data.get_schema().c_str() : "");
#ifdef _WIN32
    _connection_info = "Localhost via Named pipe";
#else
    _connection_info = "Localhost via UNIX socket";
#endif
  } else {
    err =
        _mysql->connect(host.c_str(), _connection_options.get_target_port(),
                        data.has_user() ? data.get_user().c_str() : "",
                        data.has_password() ? data.get_password().c_str() : "",
                        data.has_schema() ? data.get_schema().c_str() : "");
    _connection_info = host + " via TCP/IP";

    // When neither port or socket were specified on the connection data
    // it means it was able to use default connection data
    if (!_connection_options.has_port())
      _connection_options.set_port(MYSQLX_TCP_PORT);
  }
  if (err) {
    _mysql.reset();

    if (err.error() == CR_MALFORMED_PACKET &&
        strstr(err.what(), "Unexpected response received from server")) {
      std::string message = "Requested session assumes MySQL X Protocol but '" +
                            data.as_uri(uri::formats::only_transport()) +
                            "' seems to speak the classic MySQL protocol";
      message.append(" (").append(err.what()).append(")");
      store_error_and_throw(Error(message.c_str(), CR_MALFORMED_PACKET));
    } else if (!user_defined_connection_attributes &&
               err.error() == ER_X_CAPABILITY_NOT_FOUND &&
               strstr(err.what(), "session_connect_attrs")) {
      log_warning(
          "Server does not support connection attributes, retrying with then "
          "disabled: (%d) %s",
          err.error(), err.what());

      // When connection attributes is not supported, and the user did not
      // explicitly request the registration of connection attributes, a second
      // connection attempt with them disabled will be done.
      mysqlshdk::db::Connection_options connection_data(data);
      connection_data.set(kConnectionAttributes, "false");
      connect(connection_data);
      return;
    } else if (err.error() == CR_SERVER_GONE_ERROR &&
               _connection_options.get_ssh_options().has_data()) {
      // When the connection is done through SSH tunnel and the tunnel fails to
      // open, this error is received from the server
      store_error_and_throw(
          Error(shcore::str_format("Error MySQL Session through SSH tunnel: %s",
                                   err.what()),
                err.error()));
    } else {
      if (!xproto_errors.empty()) {
        auto e = xproto_errors.back();
        store_error_and_throw(Error(e.msg().c_str(), e.code()));
      }
      store_error_and_throw(Error(err.what(), err.error()));
    }
  }

  if (ssl_options.has_tls_ciphersuites())
    mysqlsh::current_console()->print_warning(
        "X Protocol: tls-ciphersuites option is not yet supported and will "
        "be ignored.");

  // If the account is not expired, retrieves additional session information
  if (!_expired_account) load_session_info();

  DBUG_LOG("sql", get_thread_id() << ": CONNECTED: " << data.uri_endpoint());
  {
    auto log_sql_handler = shcore::current_log_sql();
    log_sql_handler->log_connect(data.uri_endpoint(), get_thread_id());
  }

  // fill in defaults
  if (!_connection_options.has_scheme())
    _connection_options.set_scheme("mysqlx");
}