modules/adminapi/common/instance_pool.cc (931 lines of code) (raw):
/*
* Copyright (c) 2019, 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms,
* as designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "modules/adminapi/common/instance_pool.h"
#include <errmsg.h>
#include <mysql.h>
#include <stack>
#include "modules/adminapi/common/dba_errors.h"
#include "modules/adminapi/common/errors.h"
#include "modules/adminapi/common/global_topology.h"
#include "modules/adminapi/common/metadata_storage.h"
#include "modules/adminapi/common/sql.h"
#include "modules/mod_utils.h"
#include "mysqlshdk/include/scripting/types.h" // exceptions
#include "mysqlshdk/include/shellcore/console.h"
#include "mysqlshdk/include/shellcore/shell_options.h"
#include "mysqlshdk/libs/mysql/instance.h"
#include "mysqlshdk/libs/utils/debug.h"
namespace mysqlsh {
namespace dba {
namespace {
// default SQL_MODE as of 8.0.19
constexpr const char *k_default_sql_mode =
"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,"
"NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION";
// Constants with the names used to lock instances.
constexpr char k_lock[] = "AdminAPI_lock";
constexpr char k_lock_name_instance[] = "AdminAPI_instance";
// default wait_timeout
constexpr uint64_t k_default_wait_timeout = 28800;
int64_t default_adminapi_connect_timeout() {
return mysqlsh::current_shell_options()->get().dba_connect_timeout * 1000;
}
std::shared_ptr<mysqlshdk::db::ISession> connect_session(
const mysqlshdk::db::Connection_options &opts, bool interactive) {
if (SessionType::X == opts.get_session_type()) {
throw make_unsupported_protocol_error();
}
try {
return establish_mysql_session(opts, interactive);
} catch (const shcore::Exception &e) {
if (CR_VERSION_ERROR == e.code() ||
(CR_SERVER_LOST == e.code() &&
// TODO(alfredo) - when connection timeout is enabled, the error
// returned for a timeout seems to be this too. This error handling
// should probably be removed from here and the connect timeout setting
// should be moved from connect_raw to this function
0 == strcmp("Lost connection to MySQL server at 'waiting for initial "
"communication packet', system error: 110",
e.what()))) {
throw make_unsupported_protocol_error();
} else {
throw;
}
}
}
} // namespace
std::shared_ptr<Instance> Instance::connect_raw(
const mysqlshdk::db::Connection_options &opts, bool interactive) {
mysqlshdk::db::Connection_options op(opts);
if (!op.has_value(mysqlshdk::db::kConnectTimeout)) {
op.set(mysqlshdk::db::kConnectTimeout,
std::to_string(default_adminapi_connect_timeout()));
}
return std::make_shared<Instance>(connect_session(op, interactive));
}
std::shared_ptr<Instance> Instance::connect(
const mysqlshdk::db::Connection_options &opts, bool interactive) {
const auto instance = connect_raw(opts, interactive);
instance->prepare_session();
return instance;
}
void Instance::prepare_session() {
// prepare the session to be used by the AdminAPI
// change autocommit to the default value, which we assume throughout the
// code, but can be changed globally by the user
// Bug#30202883 and Bug#30324461 are caused by the lack of this
set_sysvar("autocommit", static_cast<int64_t>(1),
mysqlshdk::mysql::Var_qualifier::SESSION);
// change sql_mode to the default value, in case the user has some
// non-standard and incompatible setting
set_sysvar("sql_mode", std::string(k_default_sql_mode),
mysqlshdk::mysql::Var_qualifier::SESSION);
auto version = get_version();
// Handle the consistency levels != "EVENTUAL" and
// "BEFORE_ON_PRIMARY_FAILOVER" on the target instance session:
//
// Any query executed on a member that is still RECOVERING and has a
// consistency level of BEFORE, AFTER or BEFORE_AND_AFTER will result in an
// error. As documented: "You can only use the consistency levels BEFORE,
// AFTER and BEFORE_AND_AFTER on ONLINE members, attempting to use them on
// members in other states causes a session error." For that reason, we must
// set the target instance consistency level to EVENTUAL on those cases to
// avoid the error for any query executed using that session.
//
// This handling must be done to all AdminAPI sessions to ensure that
// concurrent command calls do not results in errors due to higher consistency
// levels (BUG#30394258, BUG#30401048)
if (version >= mysqlshdk::utils::Version("8.0.14")) {
set_sysvar("group_replication_consistency", std::string("EVENTUAL"),
mysqlshdk::mysql::Var_qualifier::SESSION);
}
// There maybe cases where the default collation ("utf8mb4_0900_ai_ci") used
// in libmysqlclient 8.0, isn't negotiated between the server and the client
// (i.e.: the server was configured with skip-character-set-client-handshake
// options). This in turn would create problems like "Illegal mix of
// collations". To prevent this, we explicitly set the session collation.
if (version >= mysqlshdk::utils::Version(8, 0, 0))
set_sysvar("collation_connection", std::string("utf8mb4_0900_ai_ci"),
mysqlshdk::mysql::Var_qualifier::SESSION);
else
set_sysvar("collation_connection", std::string("utf8mb4_general_ci"),
mysqlshdk::mysql::Var_qualifier::SESSION);
// Make sure that the limit for 'group_concat_max_len' is increased from the
// default 1024 to 1GB. There's no "correct" value here, hence no constant for
// the variable.
set_sysvar("group_concat_max_len", static_cast<int64_t>(1024 * 1024 * 1024),
mysqlshdk::mysql::Var_qualifier::SESSION);
// Ensure the default value for 'wait_timeout' is kept for the AdminAPI
// accounts, avoiding any possible timeouts in command execution due to
// user-configured low values.
// NOTE: set directly to 28800, instead of using DEFAULT, because the user
// might have configured directly on the configuration file the value for the
// timeout and that becomes the compiled DEFAULT value.
// Also, only set if the instance has a value set that is lower than 28800.
// This is important to ensure higher values set by the user are honored.
if (get_sysvar_int("wait_timeout", mysqlshdk::mysql::Var_qualifier::SESSION) <
k_default_wait_timeout) {
set_sysvar("wait_timeout", static_cast<int64_t>(k_default_wait_timeout),
mysqlshdk::mysql::Var_qualifier::SESSION);
}
// Cache the hostname, port, and UUID to avoid errors accessing this data if
// the instance fails during an operation.
get_canonical_address();
get_uuid();
}
void Instance::reconnect_if_needed(const char *what) {
try {
query("SELECT 1");
} catch (const shcore::Error &err) {
if (mysqlshdk::db::is_mysql_client_error(err.code())) {
log_info("%s connection to %s lost: %s. Reconnecting...", what,
descr().c_str(), err.format().c_str());
get_session()->connect(get_connection_options());
prepare_session();
} else {
log_info("%s connection to %s lost: %s", what, descr().c_str(),
err.format().c_str());
throw;
}
}
}
Instance::Instance(const std::shared_ptr<mysqlshdk::db::ISession> &session)
: mysqlshdk::mysql::Instance(session), m_retain_count(-1) {}
Instance::Instance(Instance_pool *owner,
const std::shared_ptr<mysqlshdk::db::ISession> &session)
: mysqlshdk::mysql::Instance(session), m_pool(owner) {}
void Instance::retain() noexcept {
DBUG_TRACE;
if (m_retain_count > -1) m_retain_count++;
}
void Instance::release() {
DBUG_TRACE;
if (m_retain_count > -1) {
if (m_retain_count <= 1) {
if (m_pool) {
refresh(); // clear cached values
m_pool->return_instance(this);
} else {
get_session()->close();
}
} else {
m_retain_count--;
}
}
}
// The instance is expected to have been acquired directly by the caller and
// nobody else holds references to it (no Scoped_instances either).
void Instance::steal() {
DBUG_TRACE;
if (m_pool) m_pool->forget_instance(this);
m_pool = nullptr;
}
std::shared_ptr<mysqlshdk::db::IResult> Instance::query(const std::string &sql,
bool buffered) const {
return mysqlshdk::mysql::Instance::query(sql, buffered);
}
std::shared_ptr<mysqlshdk::db::IResult> Instance::query_udf(
const std::string &sql, bool buffered) const {
return mysqlshdk::mysql::Instance::query_udf(sql, buffered);
}
void Instance::execute(const std::string &sql) const {
mysqlshdk::mysql::Instance::execute(sql);
}
bool Instance::ensure_lock_service_is_installed(bool can_disable_sro) {
if (is_lock_service_installed()) return true;
try {
// Check if instance is read-only otherwise it will fail to install
// the lock service (e.g., it might happen for clusters created with an
// older MySQL Shell version).
bool super_read_only = get_sysvar_bool("super_read_only", false);
if (super_read_only) {
if (!can_disable_sro) {
log_info(
"Unable to install the lock service because the instance '%s' has "
"super_read_only enabled.",
descr().c_str());
return false;
}
set_sysvar("super_read_only", false,
mysqlshdk::mysql::Var_qualifier::GLOBAL);
}
shcore::on_leave_scope restore_read_only([super_read_only, this]() {
if (super_read_only)
set_sysvar("super_read_only", true,
mysqlshdk::mysql::Var_qualifier::GLOBAL);
});
log_debug("Installing the lock service on '%s'", descr().c_str());
mysqlshdk::mysql::install_lock_service(this);
return true;
} catch (const std::exception &err) {
log_warning("Failed to install the lock service on '%s': %s",
descr().c_str(), err.what());
return false;
}
}
bool Instance::is_lock_service_installed() const {
return mysqlshdk::mysql::has_lock_service(*this);
}
/**
* Try to acquire a specific lock on the instance.
*
* @param mode type of lock to acquire: READ (shared) or WRITE (exclusive).
* @param timeout maximum time in seconds to wait for the lock to be
* available if it cannot be obtained immediately. By default 0,
* meaning that it will not wait if the lock cannot be acquired
* immediately, issuing an error.
*
* @throw shcore::Exception if the lock cannot be acquired. The method tries to
* acquire the lock only when the lock service is available)
*
* @return the requested lock (which might be empty if it couldn't be obtained)
*/
mysqlshdk::mysql::Lock_scoped Instance::get_lock(
mysqlshdk::mysql::Lock_mode mode, std::chrono::seconds timeout) {
if (auto has_lock_service = is_lock_service_installed(); !has_lock_service) {
try {
switch (get_gr_instance_type(*this)) {
case TargetType::Standalone:
has_lock_service = ensure_lock_service_is_installed(true);
break;
default:
break;
}
} catch (const shcore::Error &e) {
log_debug("Unable to check and install the lock service: %s", e.what());
}
if (!has_lock_service) {
log_warning(
"The required MySQL Locking Service isn't installed on instance "
"'%s'. The operation will continue without concurrent execution "
"protection.",
descr().c_str());
return nullptr;
}
}
DBUG_EXECUTE_IF("dba_locking_timeout_one",
{ timeout = std::chrono::seconds{1}; });
// Try to acquire the specified lock.
// NOTE: Only one lock per namespace is used because lock release is
// performed by namespace.
try {
log_debug("Acquiring %s lock ('%s', '%s') on '%s'.",
mysqlshdk::mysql::to_string(mode).c_str(), k_lock_name_instance,
k_lock, descr().c_str());
mysqlshdk::mysql::get_lock(*this, k_lock_name_instance, k_lock, mode,
timeout.count());
} catch (const shcore::Error &err) {
// Abort the operation in case the required lock cannot be acquired.
log_info("Failed to get %s lock ('%s', '%s') on '%s': %s",
mysqlshdk::mysql::to_string(mode).c_str(), k_lock_name_instance,
k_lock, descr().c_str(), err.what());
if (err.code() == ER_LOCKING_SERVICE_TIMEOUT) {
current_console()->print_error(shcore::str_format(
"The operation cannot be executed because it failed to acquire the "
"lock on instance '%s'. Another operation requiring access to the "
"instance is still in progress, please wait for it to finish and try "
"again.",
descr().c_str()));
throw shcore::Exception(
shcore::str_format("Failed to acquire lock on instance '%s'",
descr().c_str()),
SHERR_DBA_LOCK_GET_FAILED);
} else {
current_console()->print_error(shcore::str_format(
"The operation cannot be executed because it failed to acquire the "
"lock on instance '%s': %s",
descr().c_str(), err.what()));
throw;
}
}
auto release_cb = [this]() {
// Release all instance locks in the k_lock_name_instance namespace.
// NOTE: Only perform the operation if the lock service is
// available
// otherwise do nothing (ignore if concurrent execution is not
// supported, e.g., lock service plugin not available).
try {
log_debug("Releasing locks for '%s' on %s.", k_lock_name_instance,
descr().c_str());
mysqlshdk::mysql::release_lock(*this, k_lock_name_instance);
} catch (const shcore::Error &error) {
// Ignore any error trying to release locks (e.g., might have not
// been previously acquired due to lack of permissions).
log_error("Unable to release '%s' locks for '%s': %s",
k_lock_name_instance, descr().c_str(), error.what());
}
};
return mysqlshdk::mysql::Lock_scoped{std::move(release_cb)};
}
mysqlshdk::mysql::Lock_scoped Instance::get_lock_shared(
std::chrono::seconds timeout) {
return get_lock(mysqlshdk::mysql::Lock_mode::SHARED, timeout);
}
mysqlshdk::mysql::Lock_scoped Instance::get_lock_exclusive(
std::chrono::seconds timeout) {
return get_lock(mysqlshdk::mysql::Lock_mode::EXCLUSIVE, timeout);
}
struct Instance_pool::Metadata_cache {
std::vector<Instance_metadata> instances;
std::vector<Cluster_metadata> clusters;
const Cluster_metadata *try_get_cluster(const Cluster_id &id) const {
for (const auto &c : clusters) {
if (c.cluster_id == id) return &c;
}
return nullptr;
}
const Instance_metadata *get_cluster_primary(const Cluster_id &id) const {
for (const auto &i : instances) {
if (i.cluster_id == id && i.primary_master && !i.invalidated) return &i;
}
throw shcore::Exception("Async cluster has no PRIMARY",
SHERR_DBA_ASYNC_PRIMARY_UNDEFINED);
}
const Instance_metadata *get_instance_with_uuid(
const std::string &uuid) const {
for (const auto &i : instances) {
if (i.uuid == uuid) return &i;
}
// not supposed to happen
throw std::logic_error("internal error");
}
const std::string &get_cluster_name(const std::string &group_name) const {
for (const auto &c : clusters) {
if (c.group_name == group_name) return c.cluster_name;
}
throw shcore::Exception(
"Could not find metadata for cluster with group_name '" + group_name +
"'",
SHERR_DBA_METADATA_MISSING);
}
};
Instance_pool::Instance_pool(bool allow_password_prompt)
: m_allow_password_prompt(allow_password_prompt) {
DBUG_TRACE;
m_mdcache = new Metadata_cache();
}
Instance_pool::~Instance_pool() {
DBUG_TRACE;
delete m_mdcache;
for (const auto &inst : m_pool) {
#ifndef NDEBUG
if (inst.leased) {
std::cerr << inst.instance->descr() << " ("
<< inst.instance->get_session()->get_connection_id()
<< ") not returned to pool. " << inst.instance->m_retain_count
<< " refs\n";
DBUG_EXECUTE_IF("ipool_trap_session_leak", DEBUG_TRAP;);
}
#endif
inst.instance->close_session();
}
m_pool.clear();
}
void Instance_pool::set_default_auth_options(Auth_options opts) {
m_default_auth_opts = std::move(opts);
}
void Instance_pool::set_auth_opts(const Auth_options &auth,
mysqlshdk::db::Connection_options *opts) {
if (auth.user.empty()) {
if (m_default_auth_opts.user.empty())
throw std::invalid_argument("Missing authentication info");
m_default_auth_opts.set(opts);
} else {
auth.set(opts);
}
}
void Instance_pool::set_metadata(std::shared_ptr<MetadataStorage> md) {
DBUG_TRACE;
m_metadata = std::move(md);
if (m_metadata) refresh_metadata_cache();
}
/**
* Find a suitable metadata server that contains the server the session is
* connected to.
*
* The following is considered in order:
* - the target server
* - any member of the group of the target server
* - any member of a replica group
* - the original target server if nothing better is found
*
* A server is suitable if it's ONLINE and part of a majority.
*
* The process is aborted and an exception thrown if the original instance or
* group is found to not belong to the cluster anymore.
*/
std::shared_ptr<MetadataStorage> Instance_pool::find_metadata_server(
const std::shared_ptr<mysqlshdk::db::ISession> &session) {
DBUG_TRACE;
auto console = current_console();
auto instance = std::make_shared<Instance>(session);
log_info("Looking for a suitable metadata server for instance %s",
instance->descr().c_str());
auto md = std::make_shared<MetadataStorage>(instance);
if (!md->check_version()) {
console->print_warning("InnoDB cluster metadata does not exist at " +
session->get_connection_options().uri_endpoint());
throw shcore::Exception("Metadata schema missing",
SHERR_DBA_METADATA_MISSING);
}
auto old_md = m_metadata;
set_metadata(md);
try {
auto member = connect_cluster_member_of(instance);
member->steal();
set_metadata(old_md);
if (member == instance) {
return md;
} else {
md = std::make_shared<MetadataStorage>(member);
return md;
}
} catch (...) {
set_metadata(old_md);
throw;
}
}
void Instance_pool::refresh_metadata_cache() {
DBUG_TRACE;
if (!m_metadata) throw std::logic_error("metadata object not set");
log_debug("Refreshing metadata cache from '%s'",
m_metadata->get_md_server()->descr().c_str());
m_mdcache->instances = m_metadata->get_all_instances();
m_mdcache->clusters = m_metadata->get_all_clusters(true);
for (const auto &i : m_mdcache->instances) {
auto it = std::find_if(
m_mdcache->clusters.begin(), m_mdcache->clusters.end(),
[&i](const auto &c) { return (c.cluster_id == i.cluster_id); });
if (it == m_mdcache->clusters.end())
log_debug("I) %s %s", i.label.c_str(), i.endpoint.c_str());
else
log_debug("I) %s %s (%s)", i.label.c_str(), i.endpoint.c_str(),
it->cluster_name.c_str());
}
for (const auto &i : m_mdcache->clusters) {
log_debug("C) %s '%s'", i.group_name.c_str(), i.cluster_name.c_str());
}
log_debug("DONE!");
}
std::shared_ptr<Instance> Instance_pool::adopt(
const std::shared_ptr<Instance> &instance) {
DBUG_TRACE;
return add_leased_instance(instance);
}
// Connect to the specified instance without doing any checks
std::shared_ptr<Instance> Instance_pool::connect_unchecked(
const mysqlshdk::db::Connection_options &opts) {
DBUG_TRACE;
for (auto &inst : m_pool) {
if (!inst.leased && inst.instance->get_connection_options() == opts) {
inst.leased = true;
return inst.instance;
}
}
return Instance::connect(opts, m_allow_password_prompt);
}
std::shared_ptr<Instance> Instance_pool::connect_unchecked_endpoint(
const std::string &endpoint, bool allow_url) {
DBUG_TRACE;
mysqlshdk::db::Connection_options opts(endpoint);
if (allow_url) {
if (!opts.has_user()) {
m_default_auth_opts.set(&opts);
}
} else {
// check that there's only a host:port
mysqlshdk::db::Connection_options tmp(opts);
tmp.clear_host();
tmp.clear_port();
tmp.clear_socket();
if (tmp.has_data())
throw std::invalid_argument(
"Target instance must be specified as host:port");
m_default_auth_opts.set(&opts);
}
try {
return connect_unchecked(opts);
}
CATCH_AND_THROW_CONNECTION_ERROR(endpoint)
}
std::shared_ptr<Instance> Instance_pool::connect_unchecked_uuid(
const std::string &uuid) {
DBUG_TRACE;
Auth_options auth = m_default_auth_opts;
for (auto &inst : m_pool) {
Auth_options iauth;
iauth.get(inst.instance->get_connection_options());
if (!inst.leased && inst.instance->get_uuid() == uuid && iauth == auth) {
inst.leased = true;
return inst.instance;
}
}
for (const auto &inst : m_mdcache->instances) {
if (inst.uuid == uuid) {
if (inst.endpoint.empty())
throw shcore::Exception(
"missing endpoint information for instance " + uuid,
SHERR_DBA_METADATA_INFO_MISSING);
mysqlshdk::db::Connection_options opts(inst.endpoint);
set_auth_opts(auth, &opts);
try {
return connect_unchecked(opts);
}
CATCH_AND_THROW_CONNECTION_ERROR(inst.endpoint)
}
}
throw shcore::Exception("Unable to find metadata for instance " + uuid,
SHERR_DBA_MEMBER_METADATA_MISSING);
}
std::shared_ptr<Instance> Instance_pool::connect_unchecked(
const topology::Node *node) {
DBUG_TRACE;
const Cluster_metadata *cluster =
m_mdcache->try_get_cluster(node->cluster_id);
assert(cluster);
if (!cluster) {
throw shcore::Exception::logic_error("Invalid node " + node->label);
}
if (cluster->type == Cluster_type::GROUP_REPLICATION) {
assert(0);
return {};
} else {
return connect_unchecked_uuid(node->get_primary_member()->uuid);
}
}
std::shared_ptr<Instance> Instance_pool::connect_async_cluster_primary(
Cluster_id cluster_id) {
const Instance_metadata *md = m_mdcache->get_cluster_primary(cluster_id);
return connect_unchecked_endpoint(md->endpoint);
}
void Instance_pool::check_group_member(
const mysqlshdk::mysql::IInstance &instance, bool allow_recovering,
std::string *out_member_id, std::string *out_group_name,
bool *out_single_primary_mode, bool *out_is_primary) {
DBUG_TRACE;
mysqlshdk::gr::Member_state member_state;
bool has_quorum;
if (!mysqlshdk::gr::get_group_information(
instance, &member_state, out_member_id, out_group_name, nullptr,
out_single_primary_mode, &has_quorum, out_is_primary)) {
throw shcore::Exception(
"Group Replication is not running at instance " + instance.descr(),
SHERR_DBA_GROUP_REPLICATION_NOT_RUNNING);
}
if (!has_quorum && member_state != mysqlshdk::gr::Member_state::OFFLINE) {
throw shcore::Exception("Member " +
label_for_server_uuid(instance.get_uuid()) +
" is not part of a majority group",
SHERR_DBA_GROUP_MEMBER_NOT_IN_QUORUM);
}
if ((member_state == mysqlshdk::gr::Member_state::ONLINE) ||
(member_state == mysqlshdk::gr::Member_state::RECOVERING &&
allow_recovering))
return;
throw shcore::Exception(
"member " + label_for_server_uuid(instance.get_uuid()) + " is in state " +
mysqlshdk::gr::to_string(member_state),
SHERR_DBA_GROUP_MEMBER_NOT_ONLINE);
}
std::shared_ptr<Instance> Instance_pool::connect_primary(
const topology::Node *node) {
DBUG_TRACE;
const Cluster_metadata *cluster =
m_mdcache->try_get_cluster(node->cluster_id);
assert(cluster);
if (!cluster) {
throw shcore::Exception::logic_error("Invalid cluster " + node->label);
}
if (cluster->type == Cluster_type::GROUP_REPLICATION) {
return connect_group_primary(cluster->group_name);
} else {
auto server = static_cast<const topology::Server *>(node);
return connect_unchecked_uuid(server->get_primary_member()->uuid);
}
}
std::shared_ptr<Instance> Instance_pool::connect_group_primary(
const std::string &group_name) {
DBUG_TRACE;
std::vector<std::string> candidates;
try {
// first try to find someone that was the primary last time we saw it
for (const auto &i : m_mdcache->instances) {
if (i.group_name == group_name) {
if (m_recent_primaries.find(i.uuid) != m_recent_primaries.end()) {
// instance was a PRIMARY last time we saw it, try using it as a
// starting point to find the current PRIMARY
try {
auto instance = try_connect_primary_through_member(i.uuid);
if (instance) {
return instance;
}
} catch (const shcore::Exception &e) {
log_warning("Could not connect to %s: %s", i.endpoint.c_str(),
e.format().c_str());
candidates.push_back(i.uuid);
}
} else {
candidates.push_back(i.uuid);
}
}
}
if (candidates.empty()) {
const auto &cname = m_mdcache->get_cluster_name(group_name);
log_warning(
"Could not find any members in metadata for cluster '%s', group_name "
"%s",
cname.c_str(), group_name.c_str());
throw shcore::Exception(
"No managed members found for cluster '" + cname + "'",
SHERR_DBA_METADATA_MISSING);
}
// we don't know any possible primaries, so just try everyone one by one
for (const auto &uuid : candidates) {
try {
auto instance = try_connect_primary_through_member(uuid);
if (instance) {
return instance;
}
} catch (const shcore::Exception &err) {
if (err.code() != SHERR_DBA_GROUP_REPLICATION_NOT_RUNNING) throw;
}
}
} catch (const shcore::Exception &e) {
if (e.code() == SHERR_DBA_GROUP_HAS_NO_QUORUM) {
throw shcore::Exception("Cluster '" +
m_mdcache->get_cluster_name(group_name) +
"' has no quorum",
SHERR_DBA_GROUP_HAS_NO_QUORUM);
}
throw;
}
// there are no primaries anywhere
throw shcore::Exception("Could not connect to a PRIMARY member of cluster '" +
m_mdcache->get_cluster_name(group_name) + "'",
SHERR_DBA_GROUP_HAS_NO_PRIMARY);
}
std::shared_ptr<Instance> Instance_pool::connect_group_secondary(
const std::string &group_name) {
DBUG_TRACE;
int num_reachable = 0;
for (const auto &i : m_mdcache->instances) {
if (i.group_name == group_name) {
std::shared_ptr<Instance> instance;
try {
instance = connect_unchecked_uuid(i.uuid);
num_reachable++;
} catch (const shcore::Exception &e) {
log_warning("%s: %s", i.endpoint.c_str(), e.format().c_str());
if (e.code() > CR_ERROR_LAST) {
num_reachable++;
}
continue;
}
bool single_primary;
bool is_primary;
try {
check_group_member(*instance, false, nullptr, nullptr, &single_primary,
&is_primary);
} catch (const shcore::Exception &e) {
instance->release();
log_warning("%s: %s", i.endpoint.c_str(), e.what());
continue;
}
if (!is_primary) return instance;
instance->release();
if (!single_primary) {
throw shcore::Exception::runtime_error(
"Target cluster '" + m_mdcache->get_cluster_name(group_name) +
"' is configured for multi-primary mode and has no "
"SECONDARY members");
}
}
}
if (num_reachable == 0)
throw shcore::Exception(
"Could not connect to any SECONDARY member of the target InnoDB "
"cluster '" +
m_mdcache->get_cluster_name(group_name) + "'",
SHERR_DBA_GROUP_UNREACHABLE);
throw shcore::Exception(
"Could not find any SECONDARY member in the target InnoDB cluster '" +
m_mdcache->get_cluster_name(group_name) + "'",
SHERR_DBA_GROUP_UNAVAILABLE);
}
std::shared_ptr<Instance> Instance_pool::connect_group_member(
const std::string &group_name) {
DBUG_TRACE;
int num_reachable = 0;
bool seen_no_quorum = false;
for (const auto &i : m_mdcache->instances) {
if (i.group_name == group_name) {
std::shared_ptr<Instance> instance;
try {
instance = connect_unchecked_uuid(i.uuid);
num_reachable++;
} catch (const shcore::Exception &e) {
log_warning("%s: %s", i.endpoint.c_str(), e.format().c_str());
if (e.code() > CR_ERROR_LAST) {
num_reachable++;
}
continue;
}
try {
check_group_member(*instance, true);
} catch (const shcore::Exception &e) {
instance->release();
log_warning("%s: %s", i.endpoint.c_str(), e.what());
if (e.code() == SHERR_DBA_GROUP_MEMBER_NOT_IN_QUORUM) {
seen_no_quorum = true;
}
continue;
}
return instance;
}
}
if (num_reachable == 0)
throw shcore::Exception("Could not connect to any member of cluster '" +
m_mdcache->get_cluster_name(group_name) + "'",
SHERR_DBA_GROUP_UNREACHABLE);
if (seen_no_quorum)
throw shcore::Exception("Cluster '" +
m_mdcache->get_cluster_name(group_name) +
"' has no quorum",
SHERR_DBA_GROUP_HAS_NO_QUORUM);
throw shcore::Exception("Could not find any available member in cluster '" +
m_mdcache->get_cluster_name(group_name) + "'",
SHERR_DBA_GROUP_UNAVAILABLE);
}
std::shared_ptr<Instance>
Instance_pool::try_connect_cluster_primary_with_fallback(
const Cluster_id &cluster_id,
Cluster_availability *out_cluster_availability) {
DBUG_TRACE;
// The metadata is refreshed when the pool is instantiated (with every
// command call), refresh again only if the instances/clusters cache is empty
if (m_mdcache->instances.empty() || m_mdcache->clusters.empty()) {
refresh_metadata_cache();
}
auto cluster_md = m_mdcache->try_get_cluster(cluster_id);
if (!cluster_md) {
throw shcore::Exception(
shcore::str_format("Could not find metadata for Cluster '%s'",
cluster_id.c_str()),
SHERR_DBA_METADATA_MISSING);
}
std::shared_ptr<Instance> best;
std::shared_ptr<Instance> secondary;
std::shared_ptr<Instance> not_in_quorum;
std::shared_ptr<Instance> not_online;
int num_offline = 0;
int num_total = 0;
for (const auto &i : m_mdcache->instances) {
if (i.group_name != cluster_md->group_name) continue;
num_total++;
std::shared_ptr<Instance> instance;
try {
instance = connect_unchecked_uuid(i.uuid);
} catch (const shcore::Exception &e) {
log_warning("%s: %s", i.endpoint.c_str(), e.format().c_str());
// client errors are expected for non-running servers etc, but server
// side errors are not
if (e.code() > CR_ERROR_LAST) {
current_console()->print_warning("While connecting to " + i.endpoint +
": " + e.format());
}
continue;
}
try {
bool is_single_primary = false;
bool is_primary = false;
check_group_member(*instance, true, nullptr, nullptr, &is_single_primary,
&is_primary);
if (!is_single_primary || is_primary) {
*out_cluster_availability = Cluster_availability::ONLINE;
best = instance;
break;
} else {
if (!secondary)
secondary = instance;
else
instance->release();
}
} catch (const shcore::Exception &e) {
switch (e.code()) {
case SHERR_DBA_GROUP_REPLICATION_NOT_RUNNING:
case SHERR_DBA_GROUP_MEMBER_NOT_ONLINE:
num_offline++;
if (!not_online)
not_online = instance;
else
instance->release();
break;
case SHERR_DBA_GROUP_MEMBER_NOT_IN_QUORUM:
if (!not_in_quorum)
not_in_quorum = instance;
else
instance->release();
break;
default:
assert(0);
current_console()->print_error(
"Unexpected error checking membership status of " + i.endpoint +
":" + e.format());
break;
}
}
}
if (!best && secondary) {
best = secondary;
*out_cluster_availability = Cluster_availability::ONLINE_NO_PRIMARY;
}
if (!best && not_in_quorum) {
best = not_in_quorum;
*out_cluster_availability = Cluster_availability::NO_QUORUM;
}
if (!best && not_online) {
best = not_online;
if (num_offline == num_total)
*out_cluster_availability = Cluster_availability::OFFLINE;
else
*out_cluster_availability = Cluster_availability::SOME_UNREACHABLE;
}
if (best != secondary && secondary) secondary->release();
if (best != not_in_quorum && not_in_quorum) not_in_quorum->release();
if (best != not_online && not_online) not_online->release();
if (!best) {
*out_cluster_availability = Cluster_availability::UNREACHABLE;
}
return best;
}
std::shared_ptr<Instance> Instance_pool::try_connect_primary_through_member(
const std::string &member_uuid) {
DBUG_TRACE;
bool has_quorum;
bool single_primary;
std::vector<mysqlshdk::gr::Member> members;
try {
std::shared_ptr<Instance> instance(connect_unchecked_uuid(member_uuid));
try {
// info from the candidate member
members =
mysqlshdk::gr::get_members(*instance, &single_primary, &has_quorum);
} catch (const std::exception &e) {
instance->release();
if (shcore::str_beginswith(
e.what(), "Group replication does not seem to be active"))
throw shcore::Exception(e.what(),
SHERR_DBA_GROUP_REPLICATION_NOT_RUNNING);
throw;
}
if (has_quorum) {
for (const auto &m : members) {
if (m.uuid == member_uuid) {
// if this member is a primary, return it
if (m.role == mysqlshdk::gr::Member_role::PRIMARY) {
m_recent_primaries.insert(member_uuid);
return instance;
}
// otherwise, we remove from list of recent primaries
m_recent_primaries.erase(member_uuid);
} else {
// new primary is in the list of peers, return a session to it
if (m.role == mysqlshdk::gr::Member_role::PRIMARY) {
m_recent_primaries.insert(m.uuid);
instance->release();
// we don't need to check if the target is a member because we
// just checked it from this fresh member list... there's a race
// condition possible, but checking again here won't help with
// that
return connect_unchecked_uuid(m.uuid);
}
}
}
} else {
throw shcore::Exception(
"Target cluster does not have enough ONLINE members to form a "
"quorum.",
SHERR_DBA_GROUP_HAS_NO_QUORUM);
}
// member_uuid is probably either out of the group or in a partition
// with no quorum
instance->release();
return {};
} catch (const shcore::Error &err) {
// ignore any DB connect errors that could be due to lack of
// availability
if (err.code() >= CR_MIN_ERROR && err.code() <= CR_MAX_ERROR) {
return {};
}
throw;
}
}
std::shared_ptr<Instance> Instance_pool::connect_cluster_member_of(
const std::shared_ptr<Instance> &instance) {
DBUG_TRACE;
const Instance_metadata *i =
m_mdcache->get_instance_with_uuid(instance->get_uuid());
const Cluster_metadata *c = m_mdcache->try_get_cluster(i->cluster_id);
if (!c) throw std::logic_error("internal error");
if (c->type == Cluster_type::GROUP_REPLICATION) {
std::string group_name;
try {
check_group_member(*instance, false, nullptr, &group_name);
return instance;
} catch (const shcore::Exception &e) {
log_warning("%s: %s", instance->descr().c_str(), e.what());
}
return connect_group_member(group_name);
} else {
return instance;
}
}
std::shared_ptr<Instance> Instance_pool::add_leased_instance(
std::shared_ptr<Instance> instance) {
DBUG_TRACE;
Pool_entry entry;
entry.instance = instance;
entry.leased = true;
m_pool.emplace_back(entry);
return instance;
}
void Instance_pool::return_instance(Instance *instance) {
DBUG_TRACE;
for (auto i = m_pool.begin(); i != m_pool.end(); ++i) {
if (i->instance.get() == instance) {
if (!i->leased) throw std::logic_error("Returning unleased instance");
i->leased = false;
break;
}
}
}
std::shared_ptr<Instance> Instance_pool::forget_instance(Instance *instance) {
DBUG_TRACE;
for (auto i = m_pool.begin(); i != m_pool.end(); ++i) {
if (i->instance.get() == instance) {
auto ptr = i->instance;
m_pool.erase(i);
return ptr;
}
}
throw std::logic_error("Trying to steal non-managed instance");
}
std::string Instance_pool::label_for_server_uuid(const std::string &uuid) {
DBUG_TRACE;
for (const auto &i : m_mdcache->instances) {
if (i.uuid == uuid) return i.label;
}
return uuid;
}
namespace {
template <typename T>
class Scoped_storage {
public:
std::shared_ptr<T> get() const {
if (m_objects.empty())
throw std::logic_error("Instance_pool not initialized!");
return m_objects.top();
}
void push(const std::shared_ptr<T> &object) {
assert(object);
m_objects.push(object);
}
void pop(const std::shared_ptr<T> &object) {
assert(!m_objects.empty() && m_objects.top() == object);
(void)object;
m_objects.pop();
}
private:
std::stack<std::shared_ptr<T>> m_objects;
};
Scoped_storage<Instance_pool> g_ipool_storage;
} // namespace
Scoped_instance_pool::Scoped_instance_pool(std::shared_ptr<Instance_pool> ipool)
: m_pool{std::move(ipool)} {
g_ipool_storage.push(m_pool);
}
Scoped_instance_pool::~Scoped_instance_pool() noexcept {
g_ipool_storage.pop(m_pool);
}
std::shared_ptr<Instance_pool> current_ipool() {
DBUG_TRACE;
return g_ipool_storage.get();
}
[[nodiscard]] mysqlshdk::mysql::Lock_scoped_list get_instance_lock_shared(
const std::list<std::shared_ptr<Instance>> &instances,
std::chrono::seconds timeout, std::string_view skip_uuid) {
mysqlshdk::mysql::Lock_scoped_list locks;
for (const auto &instance : instances) {
// Skip the instance with the given UUID (if not empty).
if (!skip_uuid.empty() && instance->get_uuid() == skip_uuid) continue;
try {
// Add the corresponding release operation to the revert list.
if (auto i_lock = instance->get_lock_shared(timeout); i_lock)
locks.push_back(std::move(i_lock), instance);
} catch (...) {
// Release any previously acquired lock.
locks.invoke();
throw;
}
}
return locks;
}
[[nodiscard]] mysqlshdk::mysql::Lock_scoped_list get_instance_lock_exclusive(
const std::list<std::shared_ptr<Instance>> &instances,
std::chrono::seconds timeout, std::string_view skip_uuid) {
mysqlshdk::mysql::Lock_scoped_list locks;
for (const auto &instance : instances) {
// Skip the instance with the given UUID (if not empty).
if (!skip_uuid.empty() && instance->get_uuid() == skip_uuid) continue;
try {
// Add the corresponding release operation to the revert list.
if (auto i_lock = instance->get_lock_exclusive(timeout); i_lock)
locks.push_back(std::move(i_lock), instance);
} catch (...) {
// Release any previously acquired lock.
locks.invoke();
throw;
}
}
return locks;
}
} // namespace dba
} // namespace mysqlsh