modules/devapi/mod_mysqlx_session.cc (933 lines of code) (raw):
/*
* Copyright (c) 2014, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms,
* as designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "mod_mysqlx_session.h"
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "modules/devapi/mod_mysqlx_constants.h"
#include "modules/devapi/mod_mysqlx_expression.h"
#include "modules/devapi/mod_mysqlx_schema.h"
#include "modules/devapi/mod_mysqlx_session.h"
#include "modules/devapi/mod_mysqlx_session_sql.h"
#include "modules/mod_utils.h"
#include "modules/mysqlxtest_utils.h"
#include "mysqlshdk/include/scripting/object_factory.h"
#include "mysqlshdk/include/scripting/proxy_object.h"
#include "mysqlshdk/include/scripting/type_info/custom.h"
#include "mysqlshdk/include/scripting/type_info/generic.h"
#include "mysqlshdk/include/shellcore/base_shell.h" // for options
#include "mysqlshdk/include/shellcore/shell_core.h"
#include "mysqlshdk/include/shellcore/shell_notifications.h"
#include "mysqlshdk/include/shellcore/utils_help.h"
#include "mysqlshdk/libs/utils/logger.h"
#include "mysqlshdk/libs/utils/utils_file.h"
#include "mysqlshdk/libs/utils/utils_general.h"
#include "mysqlshdk/libs/utils/utils_path.h"
#include "mysqlshdk/libs/utils/utils_sqlstring.h"
#include "mysqlshdk/libs/utils/utils_string.h"
#include "mysqlshdk/libs/utils/utils_uuid.h"
#if _MSC_VER
#define _CRT_SECURE_NO_WARNINGS
#define mystrdup _strdup
#else
#define mystrdup strdup
#endif
using namespace mysqlsh;
using namespace shcore;
using namespace mysqlsh::mysqlx;
#ifdef _WIN32
#define strcasecmp _stricmp
#endif
// Documentation of BaseSession class
REGISTER_HELP_CLASS(Session, mysqlx);
REGISTER_HELP(SESSION_BRIEF,
"Enables interaction with a MySQL Server using the X Protocol.");
REGISTER_HELP(SESSION_DETAIL,
"Document Store functionality can be used through this object, "
"in addition to SQL.");
REGISTER_HELP(SESSION_DETAIL1,
"This class allows performing database operations such as:");
REGISTER_HELP(SESSION_DETAIL2, "@li Schema management operations.");
REGISTER_HELP(SESSION_DETAIL3, "@li Access to relational tables.");
REGISTER_HELP(SESSION_DETAIL4, "@li Access to Document Store collections.");
REGISTER_HELP(SESSION_DETAIL5, "@li Enabling/disabling warning generation.");
REGISTER_HELP(SESSION_DETAIL6, "@li Retrieval of connection information.");
// Documentation of Session class
REGISTER_HELP(SESSION_GLOBAL_BRIEF,
"Represents the currently open MySQL session.");
Session::Session() : _case_sensitive_table_names(false), _savepoint_counter(0) {
init();
_session = mysqlshdk::db::mysqlx::Session::create();
}
Session::Session(std::shared_ptr<mysqlshdk::db::mysqlx::Session> session)
: _case_sensitive_table_names(false) {
init();
// TODO(alfredo) maybe can remove _connection_options ivar
_connection_options = session->get_connection_options();
_session = session;
_connection_id = _session->get_connection_id();
}
Session::Session(const Session &s)
: ShellBaseSession(s),
std::enable_shared_from_this<Session>(s),
_case_sensitive_table_names(false),
_savepoint_counter(0) {
init();
_session = mysqlshdk::db::mysqlx::Session::create();
}
Session::~Session() {
if (is_open()) close();
}
void Session::init() {
expose("close", &Session::close);
expose("setFetchWarnings", &Session::set_fetch_warnings, "enable");
expose("startTransaction", &Session::_start_transaction);
expose("commit", &Session::_commit);
expose("rollback", &Session::_rollback);
expose("createSchema", &Session::_create_schema, "name");
expose("getSchema", &Session::get_schema, "name");
expose("getSchemas", &Session::get_schemas);
expose("dropSchema", &Session::drop_schema, "name");
expose("setSavepoint", &Session::set_savepoint, "?name");
expose("releaseSavepoint", &Session::release_savepoint, "name");
expose("rollbackTo", &Session::rollback_to, "name");
add_property("uri", "getUri");
add_property("sshUri", "getSshUri");
add_property("defaultSchema", "getDefaultSchema");
add_property("currentSchema", "getCurrentSchema");
expose("isOpen", &Session::is_open);
expose("sql", &Session::sql, "statement");
expose("setCurrentSchema", &Session::_set_current_schema, "name");
expose("quoteName", &Session::quote_name, "id");
expose("runSql", &Session::run_sql, "query", "?args");
expose("_getSocketFd", &Session::_get_socket_fd);
expose("_fetchNotice", &Session::_fetch_notice);
expose("_enableNotices", &Session::_enable_notices, "noticeTypes");
_schemas.reset(new shcore::Value::Map_type);
// Prepares the cache handling
auto generator = [this](const std::string &name) {
return shcore::Value::wrap<Schema>(
std::make_shared<Schema>(shared_from_this(), name));
};
update_schema_cache = [generator, this](const std::string &name,
bool /* exists */) {
DatabaseObject::update_cache(name, generator, true, _schemas);
};
_session_uuid = shcore::Id_generator::new_document_id();
}
REGISTER_HELP_FUNCTION(isOpen, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_ISOPEN, R"*(
Returns true if session is known to be open.
@returns A boolean value indicating if the session is still open.
Returns true if the session is still open and false otherwise.
@note This function may return true if connection is lost.
)*");
/**
* $(SESSION_ISOPEN_BRIEF)
*
* $(SESSION_ISOPEN)
*/
#if DOXYGEN_JS
Bool Session::isOpen() {}
#elif DOXYGEN_PY
bool Session::is_open() {}
#endif
bool Session::is_open() const { return _session->is_open(); }
std::string Session::get_uuid() {
std::string new_uuid = shcore::Id_generator::new_document_id();
// To guarantee the same Node id us used
// (in case a random number is generated when MAC address is not available)
// We copy the _session_uuid on top of the new uuid node
std::copy(_session_uuid.begin(), _session_uuid.begin() + 12,
new_uuid.begin());
return new_uuid;
}
void Session::connect(const mysqlshdk::db::Connection_options &data) {
try {
_connection_options = data;
_session->connect(_connection_options);
_connection_id = _session->get_connection_id();
if (_connection_options.has_schema())
update_schema_cache(_connection_options.get_schema(), true);
}
CATCH_AND_TRANSLATE();
}
void Session::set_option(const char *option, int value) {
if (strcmp(option, "trace_protocol") == 0) {
if (is_open()) _session->enable_protocol_trace(value != 0);
} else {
throw shcore::Exception::argument_error(
std::string("Unknown option ").append(option));
}
}
uint64_t Session::get_connection_id() const {
return _session->get_connection_id();
}
bool Session::table_name_compare(const std::string &n1, const std::string &n2) {
if (_case_sensitive_table_names)
return n1 == n2;
else
return strcasecmp(n1.c_str(), n2.c_str()) == 0;
}
// Documentation of close function
REGISTER_HELP_FUNCTION(close, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_CLOSE, R"*(
Closes the session.
After closing the session it is still possible to make read only operations to
gather metadata info, like <<<getTable>>>(name) or <<<getSchemas>>>().
)*");
/**
* $(SESSION_CLOSE_BRIEF)
*
* $(SESSION_CLOSE)
*/
#if DOXYGEN_JS
Undefined Session::close() {}
#elif DOXYGEN_PY
None Session::close() {}
#endif
void Session::close() {
try {
// Connection must be explicitly closed, we can't rely on the
// automatic destruction because if shared across different objects
// it may remain open
log_debug(
"Closing session: %s",
uri(mysqlshdk::db::uri::formats::scheme_user_transport()).c_str());
if (_session->is_open()) {
_session->close();
}
} catch (const std::exception &e) {
log_warning("Error occurred closing session: %s", e.what());
}
_session = mysqlshdk::db::mysqlx::Session::create();
}
// Documentation of createSchema function
REGISTER_HELP_FUNCTION(createSchema, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_CREATESCHEMA, R"*(
Creates a schema on the database and returns the corresponding object.
@param name A string value indicating the schema name.
@returns The created schema object.
@throw A MySQL error is thrown if fails creating the schema.
)*");
/**
* $(SESSION_CREATESCHEMA_BRIEF)
*
* $(SESSION_CREATESCHEMA)
*/
#if DOXYGEN_JS
Schema Session::createSchema(String name) {}
#elif DOXYGEN_PY
Schema Session::create_schema(str name) {}
#endif
std::shared_ptr<Schema> Session::_create_schema(const std::string &name) {
create_schema(name);
return std::dynamic_pointer_cast<Schema>(_schemas->at(name).as_object());
}
void Session::create_schema(const std::string &name) {
std::string statement = sqlstring("create schema ! charset='utf8mb4'", 0)
<< name;
execute_sql(statement);
// if reached this point it indicates that there were no errors
update_schema_cache(name, true);
}
void Session::set_current_schema(const std::string &name) {
execute_sql(sqlstring("use !", 0) << name);
}
REGISTER_HELP_FUNCTION(setSavepoint, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_SETSAVEPOINT, R"*(
Creates or replaces a transaction savepoint with the given name.
@param name Optional string with the name to be assigned to the transaction
save point.
@returns The name of the transaction savepoint.
When working with transactions, using savepoints allows rolling back operations
executed after the savepoint without terminating the transaction.
Use this function to set a savepoint within a transaction.
If this function is called with the same name of another savepoint set
previously, the original savepoint will be deleted and a new one will be
created.
If the name is not provided an auto-generated name as 'TXSP#' will be assigned,
where # is a consecutive number that guarantees uniqueness of the savepoint at
Session level.
)*");
/**
* $(SESSION_SETSAVEPOINT_BRIEF)
*
* $(SESSION_SETSAVEPOINT)
*/
#if DOXYGEN_JS
String Session::setSavepoint(String name) {}
#elif DOXYGEN_PY
str Session::set_savepoint(str name) {}
#endif
std::string Session::set_savepoint(const std::string &name) {
std::string new_name(name);
if (new_name.empty())
new_name = "TXSP" + std::to_string(++_savepoint_counter);
const auto query = sqlstring("savepoint !", 0) << new_name;
_session->execute(query.str_view());
return new_name;
}
REGISTER_HELP_FUNCTION(releaseSavepoint, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_RELEASESAVEPOINT, R"*(
Removes a savepoint defined on a transaction.
@param name string with the name of the savepoint to be removed.
Removes a named savepoint from the set of savepoints defined on the current
transaction. This does not affect the operations executed on the transaction
since no commit or rollback occurs.
It is an error trying to remove a savepoint that does not exist.
)*");
/**
* $(SESSION_RELEASESAVEPOINT_BRIEF)
*
* $(SESSION_RELEASESAVEPOINT)
*/
#if DOXYGEN_JS
Undefined Session::releaseSavepoint(String name) {}
#elif DOXYGEN_PY
None Session::release_savepoint(str name) {}
#endif
void Session::release_savepoint(const std::string &name) {
const auto query = sqlstring("release savepoint !", 0) << name;
_session->execute(query.str_view());
}
REGISTER_HELP_FUNCTION(rollbackTo, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_ROLLBACKTO, R"*(
Rolls back the transaction to the named savepoint without terminating the
transaction.
@param name string with the name of the savepoint for the rollback operation.
Modifications that the current transaction made to rows after the savepoint was
defined will be rolled back.
The given savepoint will not be removed, but any savepoint defined after the
given savepoint was defined will be removed.
It is an error calling this operation with an unexisting savepoint.
)*");
/**
* $(SESSION_ROLLBACKTO_BRIEF)
*
* $(SESSION_ROLLBACKTO)
*/
#if DOXYGEN_JS
Undefined Session::rollbackTo(String name) {}
#elif DOXYGEN_PY
None Session::rollback_to(str name) {}
#endif
void Session::rollback_to(const std::string &name) {
const auto query = sqlstring("rollback to !", 0) << name;
_session->execute(query.str_view());
}
// Documentation of getDefaultSchema function
REGISTER_HELP_PROPERTY(defaultSchema, Session);
REGISTER_HELP(SESSION_DEFAULTSCHEMA_BRIEF,
"Retrieves the Schema configured as default for the session.");
REGISTER_HELP_FUNCTION(getDefaultSchema, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETDEFAULTSCHEMA, R"*(
Retrieves the Schema configured as default for the session.
@returns A Schema object or Null
)*");
#if DOXYGEN_JS || DOXYGEN_PY
/**
* $(SESSION_GETDEFAULTSCHEMA_BRIEF)
*
* $(SESSION_GETDEFAULTSCHEMA)
*/
#if DOXYGEN_JS
Schema Session::getDefaultSchema() {}
#elif DOXYGEN_PY
Schema Session::get_default_schema() {}
#endif
REGISTER_HELP_PROPERTY(uri, Session);
REGISTER_HELP(SESSION_URI_BRIEF, "Retrieves the URI for the current session.");
REGISTER_HELP_FUNCTION(getUri, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETURI, R"*(
Retrieves the URI for the current session.
@return A string representing the connection data.
)*");
/**
* $(SESSION_GETURI_BRIEF)
*
* $(SESSION_GETURI)
*/
#if DOXYGEN_JS
String Session::getUri() {}
#elif DOXYGEN_PY
str Session::get_uri() {}
#endif
#endif
// Documentation of getCurrentSchema function
REGISTER_HELP_PROPERTY(currentSchema, Session);
REGISTER_HELP(SESSION_CURRENTSCHEMA_BRIEF, "${SESSION_GETCURRENTSCHEMA_BRIEF}");
REGISTER_HELP_FUNCTION(getCurrentSchema, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETCURRENTSCHEMA, R"*(
Retrieves the active schema on the session.
@return A Schema object if a schema is active on the session.
)*");
/**
* $(SESSION_GETCURRENTSCHEMA_BRIEF)
*
* $(SESSION_GETCURRENTSCHEMA)
*/
#if DOXYGEN_JS
Schema Session::getCurrentSchema() {}
#elif DOXYGEN_PY
Schema Session::get_current_schema() {}
#endif
std::string Session::get_current_schema() {
if (is_open()) {
auto result = execute_sql("select schema()");
if (auto row = result->fetch_one()) {
if (row && !row->is_null(0)) return row->get_string(0);
}
} else {
throw std::logic_error("Not connected");
}
return "";
}
// Documentation of getSchema function
REGISTER_HELP_FUNCTION(getSchema, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETSCHEMA, R"*(
Retrieves a Schema object from the current session through it's name.
@param name The name of the Schema object to be retrieved.
@returns The Schema object with the given name.
@throw RuntimeError If the given name is not a valid schema.
)*");
/**
* $(SESSION_GETSCHEMA_BRIEF)
*
* $(SESSION_GETSCHEMA)
* \sa Schema
*/
#if DOXYGEN_JS
Schema Session::getSchema(String name) {}
#elif DOXYGEN_PY
Schema Session::get_schema(str name) {}
#endif
std::shared_ptr<Schema> Session::get_schema(const std::string &name) {
std::shared_ptr<Schema> ret_val;
std::string type = "Schema";
if (name.empty())
throw Exception::runtime_error("Schema name must be specified");
std::string found_name = db_object_exists(type, name, "");
if (!found_name.empty()) {
update_schema_cache(found_name, true);
ret_val =
std::dynamic_pointer_cast<Schema>(_schemas->at(found_name).as_object());
} else {
update_schema_cache(name, false);
throw Exception::runtime_error("Unknown database '" + name + "'");
}
if (current_shell_options()->get().devapi_schema_object_handles)
ret_val->update_cache();
return ret_val;
}
REGISTER_HELP_FUNCTION(getSchemas, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETSCHEMAS, R"*(
Retrieves the Schemas available on the session.
@returns A List containing the Schema objects available on the session.
)*");
/**
* $(SESSION_GETSCHEMAS_BRIEF)
*
* $(SESSION_GETSCHEMAS)
*/
#if DOXYGEN_JS
List Session::getSchemas() {}
#elif DOXYGEN_PY
list Session::get_schemas() {}
#endif
shcore::Array_t Session::get_schemas() {
auto schemas = shcore::make_array();
if (is_open()) {
auto result = execute_sql("show databases;");
while (const mysqlshdk::db::IRow *row = result->fetch_one()) {
std::string schema_name;
if (!row->is_null(0)) schema_name = row->get_string(0);
// TODO(alfredo) review whether caching of all schemas shouldn't be off
if (!schema_name.empty()) {
update_schema_cache(schema_name, true);
schemas->push_back((*_schemas)[schema_name]);
}
}
} else {
throw std::logic_error("Not connected");
}
return schemas;
}
REGISTER_HELP_FUNCTION(setFetchWarnings, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_SETFETCHWARNINGS, R"*(
Enables or disables warning generation.
@param enable Boolean value to enable or disable the warnings.
Warnings are generated sometimes when database operations are executed, such as
SQL statements.
On a Node session the warning generation is disabled by default. This function
can be used to enable or disable the warning generation based on the received
parameter.
When warning generation is enabled, the warnings will be available through the
result object returned on the executed operation.
)*");
/**
* $(SESSION_SETFETCHWARNINGS_BRIEF)
*
* $(SESSION_SETFETCHWARNINGS)
*/
#if DOXYGEN_JS
Result Session::setFetchWarnings(Boolean enable) {}
#elif DOXYGEN_PY
Result Session::set_fetch_warnings(bool enable) {}
#endif
std::shared_ptr<Result> Session::set_fetch_warnings(bool enable) {
std::string command = enable ? "enable_notices" : "disable_notices";
auto notices = shcore::make_array();
notices->emplace_back("warnings");
auto command_args = shcore::make_dict();
command_args->emplace("notice", std::move(notices));
return std::make_shared<Result>(execute_mysqlx_stmt(command, command_args));
}
REGISTER_HELP_FUNCTION(dropSchema, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_DROPSCHEMA, R"*(
Drops the schema with the specified name.
@param name The name of the schema to be dropped.
@returns Nothing.
)*");
/**
* $(SESSION_DROPSCHEMA_BRIEF)
*
* $(SESSION_DROPSCHEMA)
*/
#if DOXYGEN_JS
Undefined Session::dropSchema(String name) {}
#elif DOXYGEN_PY
None Session::drop_schema(str name) {}
#endif
void Session::drop_schema(const std::string &name) {
execute_sql(sqlstring("drop schema if exists !", 0) << name);
if (_schemas->find(name) != _schemas->end()) _schemas->erase(name);
}
/*
* This function verifies if the given object exist in the database, works for
* schemas, tables, views and collections.
* The check for tables, views and collections is done is done based on the
* type. If type is not specified and an object with the name is found, the type
* will be returned.
*
* Returns the name of the object as exists in the database.
*/
std::string Session::db_object_exists(std::string &type,
const std::string &name,
const std::string &owner) {
Interruptible intr(this);
// match must be exact, since both branches below use LIKE and both escape
// their arguments it's enough to just escape the wildcards
std::string escaped_name = escape_wildcards(name);
if (type == "Schema") {
auto result =
execute_sql(sqlstring("show databases like ?", 0) << escaped_name);
if (auto row = result->fetch_one()) {
return row->get_string(0);
}
} else {
shcore::Dictionary_t args = shcore::make_dict();
args->set("schema", shcore::Value(owner));
args->set("pattern", shcore::Value(escaped_name));
auto result = execute_mysqlx_stmt("list_objects", args);
if (auto row = result->fetch_one()) {
std::string object_name = row->get_string(0);
std::string object_type = row->get_string(1);
if (type.empty()) {
type = object_type;
return object_name;
} else {
type = shcore::str_upper(type);
if (type == object_type) return object_name;
}
}
}
return "";
}
shcore::Value::Map_type_ref Session::get_status() {
shcore::Value::Map_type_ref status(new shcore::Value::Map_type);
(*status)["SESSION_TYPE"] = shcore::Value("X");
(*status)["NODE_TYPE"] = shcore::Value(get_node_type());
(*status)["DEFAULT_SCHEMA"] = shcore::Value(
_connection_options.has_schema() ? _connection_options.get_schema() : "");
try {
auto result = execute_sql("select DATABASE(), USER() limit 1");
const mysqlshdk::db::IRow *row = result->fetch_one();
(*status)["CURRENT_SCHEMA"] =
shcore::Value(row->is_null(0) ? "" : row->get_string(0));
(*status)["CURRENT_USER"] =
shcore::Value(row->is_null(1) ? "" : row->get_string(1));
(*status)["CONNECTION_ID"] = shcore::Value(_session->get_connection_id());
const char *cipher = _session->get_ssl_cipher();
if (cipher) {
result = execute_sql("show session status like 'mysqlx_ssl_version';");
row = result->fetch_one();
(*status)["SSL_CIPHER"] =
shcore::Value(std::string(cipher) + " " + row->get_string(1));
}
//(*status)["SKIP_UPDATES"] = shcore::Value(???);
// (*status)["SERVER_INFO"] = shcore::Value(_conn->get_server_info());
(*status)["PROTOCOL_VERSION"] = shcore::Value("X protocol");
unsigned long ver = mysql_get_client_version();
std::stringstream sv;
sv << ver / 10000 << "." << (ver % 10000) / 100 << "." << ver % 100;
(*status)["CLIENT_LIBRARY"] = shcore::Value(sv.str());
// (*status)["INSERT_ID"] = shcore::Value(???);
result = execute_sql(
"select @@character_set_client, @@character_set_connection, "
"@@character_set_server, @@character_set_database, "
"concat(@@version, ' ', @@version_comment) as version, "
"@@mysqlx_socket, @@mysqlx_port, @@datadir, @@character_set_results "
"limit 1");
row = result->fetch_one();
if (row) {
(*status)["CLIENT_CHARSET"] =
shcore::Value(row->is_null(0) ? "" : row->get_string(0));
(*status)["CONNECTION_CHARSET"] =
shcore::Value(row->is_null(1) ? "" : row->get_string(1));
(*status)["SERVER_CHARSET"] =
shcore::Value(row->is_null(2) ? "" : row->get_string(2));
(*status)["SCHEMA_CHARSET"] =
shcore::Value(row->is_null(3) ? "" : row->get_string(3));
(*status)["SERVER_VERSION"] =
shcore::Value(row->is_null(4) ? "" : row->get_string(4));
(*status)["RESULTS_CHARSET"] =
shcore::Value(row->is_null(8) ? "" : row->get_string(8));
mysqlshdk::db::Transport_type transport_type =
mysqlshdk::db::Transport_type::Socket;
if (_connection_options.has_transport_type()) {
transport_type = _connection_options.get_transport_type();
}
(*status)["CONNECTION"] = shcore::Value(_session->get_connection_info());
if (transport_type == mysqlshdk::db::Transport_type::Tcp) {
(*status)["TCP_PORT"] =
shcore::Value(row->is_null(6) ? "" : row->get_as_string(6));
} else if (transport_type == mysqlshdk::db::Transport_type::Socket) {
const std::string datadir = (row->is_null(5) ? "" : row->get_string(7));
const std::string socket = (row->is_null(5) ? "" : row->get_string(5));
const std::string socket_abs_path = shcore::path::normalize(
shcore::path::join_path(std::vector<std::string>{datadir, socket}));
(*status)["UNIX_SOCKET"] = shcore::Value(socket_abs_path);
}
}
result = execute_sql(
"show status where variable_name in "
"('Mysqlx_bytes_sent_compressed_payload', "
"'Mysqlx_compression_algorithm');");
row = result->fetch_one();
if (row && !row->is_null(1) &&
shcore::lexical_cast<int>(row->get_string(1), 0) > 0) {
std::string compr_status = "Enabled";
row = result->fetch_one();
if (row && !row->is_null(1) && !row->get_string(1).empty())
compr_status += " (" + row->get_string(1) + ")";
(*status)["COMPRESSION"] = shcore::Value(compr_status);
} else {
(*status)["COMPRESSION"] = shcore::Value(std::string("Disabled"));
}
result = execute_sql("SHOW GLOBAL STATUS LIKE 'Uptime';");
row = result->fetch_one();
std::stringstream su;
su << "Uptime: " << row->get_string(1) << " \n";
(*status)["SERVER_STATS"] = shcore::Value(su.str());
// (*status)["PROTOCOL_COMPRESSED"] = row->get_value(3);
// SAFE UPDATES
} catch (const shcore::Exception &e) {
(*status)["STATUS_ERROR"] = shcore::Value(e.format());
}
return status;
}
// Documentation of startTransaction function
REGISTER_HELP_FUNCTION(startTransaction, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_STARTTRANSACTION, R"*(
Starts a transaction context on the server.
@returns A SqlResult object.
Calling this function will turn off the autocommit mode on the server.
All the operations executed after calling this function will take place only
when commit() is called.
All the operations executed after calling this function, will be discarded if
rollback() is called.
When commit() or rollback() are called, the server autocommit mode will return
back to it's state before calling <<<startTransaction>>>().
)*");
/**
* $(SESSION_STARTTRANSACTION_BRIEF)
*
* $(SESSION_STARTTRANSACTION)
*/
#if DOXYGEN_JS
Result Session::startTransaction() {}
#elif DOXYGEN_PY
Result Session::start_transaction() {}
#endif
void Session::start_transaction() {
if (_tx_deep == 0) execute_sql("start transaction");
_tx_deep++;
}
std::shared_ptr<SqlResult> Session::_start_transaction() {
auto result = std::dynamic_pointer_cast<mysqlshdk::db::mysqlx::Result>(
execute_sql("start transaction"));
return std::make_shared<SqlResult>(result);
}
// Documentation of commit function
REGISTER_HELP_FUNCTION(commit, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_COMMIT, R"*(
Commits all the operations executed after a call to <<<startTransaction>>>().
@returns A SqlResult object.
All the operations executed after calling <<<startTransaction>>>() will take
place when this function is called.
The server autocommit mode will return back to it's state before calling
<<<startTransaction>>>().
)*");
/**
* $(SESSION_COMMIT_BRIEF)
*
* $(SESSION_COMMIT)
*/
#if DOXYGEN_JS
Result Session::commit() {}
#elif DOXYGEN_PY
Result Session::commit() {}
#endif
void Session::commit() {
_tx_deep--;
assert(_tx_deep >= 0);
if (_tx_deep == 0) execute_sql("commit");
}
std::shared_ptr<SqlResult> Session::_commit() {
auto result = std::dynamic_pointer_cast<mysqlshdk::db::mysqlx::Result>(
execute_sql("commit"));
return std::make_shared<SqlResult>(result);
}
// Documentation of rollback function
REGISTER_HELP_FUNCTION(rollback, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_ROLLBACK, R"*(
Discards all the operations executed after a call to <<<startTransaction>>>().
@returns A SqlResult object.
All the operations executed after calling <<<startTransaction>>>() will be
discarded when this function is called.
The server autocommit mode will return back to it's state before calling
<<<startTransaction>>>().
)*");
/**
* $(SESSION_ROLLBACK_BRIEF)
*
* $(SESSION_ROLLBACK)
*/
#if DOXYGEN_JS
Result Session::rollback() {}
#elif DOXYGEN_PY
Result Session::rollback() {}
#endif
void Session::rollback() {
_tx_deep--;
assert(_tx_deep >= 0);
if (_tx_deep == 0) execute_sql("rollback");
}
std::shared_ptr<SqlResult> Session::_rollback() {
auto result = std::dynamic_pointer_cast<mysqlshdk::db::mysqlx::Result>(
execute_sql("rollback"));
return std::make_shared<SqlResult>(result);
}
std::string Session::query_one_string(const std::string &query, int field) {
auto result = execute_sql(query);
if (auto row = result->fetch_one()) {
if (!row->is_null(field)) {
return row->get_string(field);
}
}
return "";
}
void Session::kill_query() {
uint64_t cid = get_connection_id();
auto session = mysqlshdk::db::mysqlx::Session::create();
try {
session->connect(_connection_options);
const auto query = shcore::sqlstring("kill query ?", 0) << cid;
session->query(query.str_view());
} catch (const std::exception &e) {
log_warning("Error cancelling SQL query: %s", e.what());
}
}
static ::xcl::Argument_value convert(const shcore::Value &value);
static ::xcl::Argument_object convert_map(const shcore::Dictionary_t &args) {
::xcl::Argument_object object;
for (const auto &iter : *args) {
object[iter.first] = convert(iter.second);
}
return object;
}
static ::xcl::Argument_array convert_array(const shcore::Array_t &args) {
::xcl::Argument_array object;
for (const auto &iter : *args) {
object.push_back(convert(iter));
}
return object;
}
static ::xcl::Argument_value convert(const shcore::Value &value) {
switch (value.type) {
case shcore::Bool:
return xcl::Argument_value(value.as_bool());
case shcore::UInteger:
return xcl::Argument_value(value.as_uint());
case shcore::Integer:
return xcl::Argument_value(value.as_int());
case shcore::String:
return xcl::Argument_value(value.get_string());
case shcore::Float:
return xcl::Argument_value(value.as_double());
case shcore::Map:
return xcl::Argument_value(convert_map(value.as_map()));
case shcore::Array:
return xcl::Argument_value(convert_array(value.as_array()));
case shcore::Null:
return xcl::Argument_value();
case shcore::Binary:
return xcl::Argument_value(value.get_string(),
xcl::Argument_value::String_type::k_octets);
break;
case shcore::Object:
case shcore::MapRef:
case shcore::Function:
case shcore::Undefined:
break;
}
throw std::invalid_argument("Invalid argument value: " + value.descr());
}
static ::xcl::Argument_array convert_args(const shcore::Dictionary_t &args) {
::xcl::Argument_array cargs;
cargs.push_back(xcl::Argument_value(convert_map(args)));
return cargs;
}
static ::xcl::Argument_array convert_args(const shcore::Array_t &args) {
::xcl::Argument_array cargs;
if (args) {
for (const shcore::Value &arg : *args) {
cargs.push_back(convert(arg));
}
}
return cargs;
}
shcore::Value Session::_execute_mysqlx_stmt(const std::string &command,
const shcore::Dictionary_t &args) {
return _execute_stmt("mysqlx", command, convert_args(args), true);
}
std::shared_ptr<mysqlshdk::db::mysqlx::Result> Session::execute_mysqlx_stmt(
const std::string &command, const shcore::Dictionary_t &args) {
return execute_stmt("mysqlx", command, convert_args(args));
}
shcore::Value Session::_execute_stmt(const std::string &ns,
const std::string &command,
const ::xcl::Argument_array &args,
bool expect_data) {
if (expect_data) {
SqlResult *result = new SqlResult(execute_stmt(ns, command, args));
return shcore::Value::wrap(result);
} else {
Result *result = new Result(execute_stmt(ns, command, args));
return shcore::Value::wrap(result);
}
return {};
}
std::shared_ptr<mysqlshdk::db::mysqlx::Result> Session::execute_stmt(
const std::string &ns, const std::string &command,
const ::xcl::Argument_array &args) {
Interruptible intr(this);
auto result = std::static_pointer_cast<mysqlshdk::db::mysqlx::Result>(
_session->execute_stmt(ns, command, args));
result->pre_fetch_rows();
return result;
}
std::shared_ptr<mysqlshdk::db::IResult> Session::execute_sql(
const std::string &statement, const shcore::Array_t &args) {
try {
return execute_stmt("sql", statement, convert_args(args));
} catch (const mysqlshdk::db::Error &error) {
throw shcore::Exception::mysql_error_with_code_and_state(
error.what(), error.code(), error.sqlstate());
}
}
std::shared_ptr<shcore::Object_bridge> Session::create(
const mysqlshdk::db::Connection_options &co_) {
auto co = co_;
// DevAPI getSession uses ssl-mode = REQUIRED by default if no
// ssl-ca or ssl-capath are specified
if (!co.get_ssl_options().has_mode() &&
!co.has_value(mysqlshdk::db::kSslCa) &&
!co.has_value(mysqlshdk::db::kSslCaPath)) {
co.get_ssl_options().set_default(mysqlshdk::db::kSslMode,
mysqlshdk::db::kSslModeRequired);
}
// before creating a normal session we need to establish ssh if needed:
co.set_default_data();
auto &ssh = co.get_ssh_options_handle(mysqlshdk::db::k_default_mysql_x_port);
if (ssh.has_data()) {
mysqlshdk::ssh::current_ssh_manager()->create_tunnel(&ssh);
}
const auto session = std::make_shared<Session>();
session->connect(co);
shcore::ShellNotifications::get()->notify("SN_SESSION_CONNECTED", session);
return session;
}
// Documentation of sql function
REGISTER_HELP_FUNCTION(sql, Session);
REGISTER_HELP(SESSION_SQL_CHAINED, "SqlExecute.sql.[bind].[execute]");
REGISTER_HELP_FUNCTION_TEXT(SESSION_SQL, R"*(
Creates a SqlExecute object to allow running the received SQL statement on the
target MySQL Server.
@param sql A string containing the SQL statement to be executed.
@return A SqlExecute object.
This method creates an SqlExecute object which is a SQL execution handler.
The SqlExecute class has functions that allow defining the way the statement
will be executed and allows doing parameter binding.
The received SQL is set on the execution handler.
)*");
/**
* $(SESSION_SQL_BRIEF)
*
* $(SESSION_SQL)
*
* JavaScript Example
* \code{.js}
* var sql = session.sql("select * from mydb.students where age > ?");
* var result = sql.bind(18).execute();
* \endcode
* \sa SqlExecute
*/
#if DOXYGEN_JS
SqlExecute Session::sql(String sql) {}
#elif DOXYGEN_PY
SqlExecute Session::sql(str sql) {}
#endif
std::shared_ptr<SqlExecute> Session::sql(const std::string &statement) {
auto sql_execute = std::make_shared<SqlExecute>(
std::static_pointer_cast<Session>(shared_from_this()));
return sql_execute->sql(statement);
}
REGISTER_HELP_FUNCTION(runSql, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_RUNSQL, R"*(
Executes a query and returns the corresponding SqlResult object.
@param query the SQL query to execute against the database.
@param args Optional list of literals to use when replacing ? placeholders in
the query string.
@returns An SqlResult object.
@throw LogicError if there's no open session.
@throw ArgumentError if the parameters are invalid.
)*");
/**
* $(SESSION_RUNSQL_BRIEF)
*
* $(SESSION_RUNSQL)
*/
#if DOXYGEN_JS
SqlResult Session::runSql(String query, Array args) {}
#elif DOXYGEN_PY
SqlResult Session::run_sql(str query, list args) {}
#endif
std::shared_ptr<SqlResult> Session::run_sql(const std::string sql,
const shcore::Array_t &args) {
// Will return the result of the SQL execution
// In case of error will be Undefined
if (!_session || !_session->is_open())
throw Exception::logic_error("Not connected.");
Interruptible intr(this);
auto sql_execute = std::make_shared<SqlExecute>(
std::static_pointer_cast<Session>(shared_from_this()));
sql_execute->set_sql(sql);
if (args) {
for (const auto &value : *args) {
sql_execute->add_bind(value);
}
}
return sql_execute->execute();
}
REGISTER_HELP_PROPERTY(uri, Session);
REGISTER_HELP(SESSION_URI_BRIEF, "${SESSION_GETURI_BRIEF}");
REGISTER_HELP_FUNCTION(getUri, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETURI, R"*(
Retrieves the URI for the current session.
@return A string representing the connection data.
)*");
/**
* $(SESSION_GETURI_BRIEF)
*
* $(SESSION_GETURI)
*/
#if DOXYGEN_JS
String Session::getSshUri() {}
#elif DOXYGEN_PY
str Session::get_ssh_uri() {}
#endif
REGISTER_HELP_PROPERTY(sshUri, Session);
REGISTER_HELP(SESSION_SSHURI_BRIEF, "${SESSION_GETSSHURI_BRIEF}");
REGISTER_HELP_FUNCTION(getSshUri, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_GETSSHURI, R"*(
Retrieves the SSH URI for the current session.
@return A string representing the SSH connection data.
)*");
/**
* $(SESSION_GETSSHURI_BRIEF)
*
* $(SESSION_GETSSHURI)
*/
#if DOXYGEN_JS
String Session::getSshUri() {}
#elif DOXYGEN_PY
str Session::get_ssh_uri() {}
#endif
Value Session::get_member(const std::string &prop) const {
Value ret_val;
Session *session = const_cast<Session *>(this);
if (prop == "__connection_info") {
// FIXME: temporary code until ISession refactoring
std::string info = _session->get_connection_info();
return Value(info);
}
if (prop == "uri") {
ret_val = shcore::Value(uri());
} else if (prop == "sshUri") {
ret_val = shcore::Value(ssh_uri());
} else if (prop == "defaultSchema") {
if (_connection_options.has_schema()) {
ret_val =
shcore::Value(session->get_schema(_connection_options.get_schema()));
} else {
ret_val = Value::Null();
}
} else if (prop == "currentSchema") {
std::string name = session->get_current_schema();
if (!name.empty()) {
ret_val = shcore::Value(session->get_schema(name));
} else {
ret_val = Value::Null();
}
} else {
ret_val = ShellBaseSession::get_member(prop);
}
return ret_val;
}
// Documentation of quoteName function
REGISTER_HELP_FUNCTION(quoteName, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_QUOTENAME, R"*(
Escapes the passed identifier.
@param id The identifier to be quoted.
@return A String containing the escaped identifier.
)*");
/**
* $(SESSION_QUOTENAME_BRIEF)
*
* $(SESSION_QUOTENAME)
*/
#if DOXYGEN_JS
String Session::quoteName(String id) {}
#elif DOXYGEN_PY
str Session::quote_name(str id) {}
#endif
std::string Session::quote_name(const std::string &id) {
return get_quoted_name(id);
}
// Documentation of setCurrentSchema function
REGISTER_HELP_FUNCTION(setCurrentSchema, Session);
REGISTER_HELP_FUNCTION_TEXT(SESSION_SETCURRENTSCHEMA, R"*(
Sets the current schema for this session, and returns the schema object for it.
@param name the name of the new schema to switch to.
@return the Schema object for the new schema.
At the database level, this is equivalent at issuing the following SQL query:
use <new-default-schema>;
)*");
/**
* $(SESSION_SETCURRENTSCHEMA_BRIEF)
*
* $(SESSION_SETCURRENTSCHEMA)
*/
#if DOXYGEN_JS
Schema Session::setCurrentSchema(String name) {}
#elif DOXYGEN_PY
Schema Session::set_current_schema(str name) {}
#endif
std::shared_ptr<Schema> Session::_set_current_schema(const std::string &name) {
if (is_open()) {
set_current_schema(name);
} else {
throw std::logic_error("Not connected");
}
return get_schema(name);
}
socket_t Session::_get_socket_fd() const {
if (!_session || !_session->is_open())
throw std::invalid_argument("Session is not open");
return _session->get_socket_fd();
}
// This is a temporary API that may change
void Session::_enable_notices(const std::vector<std::string> ¬ices) {
std::vector<mysqlshdk::db::mysqlx::GlobalNotice::Type> types;
for (const auto &n : notices) {
if (n == "GRViewChanged") {
types.push_back(mysqlshdk::db::mysqlx::GlobalNotice::GRViewChanged);
} else {
throw std::invalid_argument("Unknown notice type " + n);
}
}
if (!types.empty()) {
_session->enable_notices(types);
if (!m_notices_enabled) {
m_notices_enabled = true;
_session->add_notice_listener(
[this](const mysqlshdk::db::mysqlx::GlobalNotice ¬ice) {
if (notice.type ==
mysqlshdk::db::mysqlx::GlobalNotice::GRViewChanged) {
m_notices.push_back(shcore::make_dict(
"type", shcore::Value("GRViewChanged"), "view_id",
shcore::Value(notice.info.gr_view_change.view_id)));
}
return true;
});
}
}
}
shcore::Dictionary_t Session::_fetch_notice() {
if (!m_notices.empty()) {
auto tmp = m_notices.front();
m_notices.pop_front();
return tmp;
}
return nullptr;
}