modules/adminapi/replica_set/replica_set_impl.cc (2,340 lines of code) (raw):

/* * Copyright (c) 2019, 2024, Oracle and/or its affiliates. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2.0, * as published by the Free Software Foundation. * * This program is designed to work with certain software (including * but not limited to OpenSSL) that is licensed under separate terms, * as designated in a particular file or component or in included license * documentation. The authors of MySQL hereby grant you an additional * permission to link the program and your derivative works with the * separately licensed software that they have either included with * the program or referenced in the documentation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See * the GNU General Public License, version 2.0, for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "modules/adminapi/replica_set/replica_set_impl.h" #include <mysql.h> #include <mysqld_error.h> #include <future> #include <thread> #include <tuple> #include <utility> #include "modules/adminapi/common/async_topology.h" #include "modules/adminapi/common/async_utils.h" #include "modules/adminapi/common/base_cluster_impl.h" #include "modules/adminapi/common/common.h" #include "modules/adminapi/common/connectivity_check.h" #include "modules/adminapi/common/dba_errors.h" #include "modules/adminapi/common/errors.h" #include "modules/adminapi/common/global_topology_check.h" #include "modules/adminapi/common/gtid_validations.h" #include "modules/adminapi/common/instance_monitoring.h" #include "modules/adminapi/common/instance_validations.h" #include "modules/adminapi/common/member_recovery_monitoring.h" #include "modules/adminapi/common/metadata_management_mysql.h" #include "modules/adminapi/common/metadata_storage.h" #include "modules/adminapi/common/server_features.h" #include "modules/adminapi/common/sql.h" #include "modules/adminapi/common/star_global_topology_manager.h" #include "modules/adminapi/common/validations.h" #include "modules/adminapi/replica_set/replica_set_status.h" #include "modules/mysqlxtest_utils.h" #include "mysqlshdk/include/shellcore/console.h" #include "mysqlshdk/include/shellcore/interrupt_handler.h" #include "mysqlshdk/include/shellcore/scoped_contexts.h" #include "mysqlshdk/include/shellcore/utils_help.h" #include "mysqlshdk/libs/mysql/async_replication.h" #include "mysqlshdk/libs/mysql/clone.h" #include "mysqlshdk/libs/mysql/replication.h" #include "mysqlshdk/libs/mysql/utils.h" #include "mysqlshdk/libs/textui/textui.h" #include "mysqlshdk/libs/utils/debug.h" #include "mysqlshdk/libs/utils/utils_general.h" #include "mysqlshdk/libs/utils/utils_net.h" #include "mysqlshdk/shellcore/shell_console.h" #include "scripting/types.h" namespace mysqlsh { namespace dba { constexpr const char *k_async_cluster_user_name = "mysql_innodb_rs_"; constexpr const char k_replica_set_attribute_ssl_mode[] = "opt_replicationSslMode"; // # of seconds to wait until clone starts constexpr const int k_clone_start_timeout = 30; namespace { std::unique_ptr<topology::Server_global_topology> discover_unmanaged_topology( Instance *instance) { auto topology = std::make_unique<topology::Server_global_topology>( k_replicaset_channel_name); // perform initial discovery topology->discover_from_unmanaged(instance); topology->check_gtid_consistency(false); return topology; } void validate_version(const Instance &target_server) { if (target_server.get_version() < mysqlshdk::utils::Version(8, 0, 11)) { current_console()->print_info( "MySQL version " + target_server.get_version().get_full() + " detected at " + target_server.get_canonical_address() + ", but 8.0.11 is required for InnoDB ReplicaSet."); throw shcore::Exception("Unsupported MySQL version", SHERR_DBA_BADARG_VERSION_NOT_SUPPORTED); } } void validate_instance(Instance *target_server) { // Check instance configuration and state ensure_ar_instance_configuration_valid(target_server); // Check if GR is running mysqlshdk::gr::Member_state state; if (mysqlshdk::gr::get_group_information(*target_server, &state, nullptr, nullptr, nullptr) && state != mysqlshdk::gr::Member_state::OFFLINE) { current_console()->print_error( target_server->descr() + " has Group Replication active, which cannot be mixed " "with ReplicaSets."); throw shcore::Exception("group_replication active", SHERR_DBA_INVALID_SERVER_CONFIGURATION); } } std::vector<std::pair<mysqlshdk::mysql::Slave_host, std::string>> get_valid_slaves(mysqlshdk::mysql::IInstance *instance) { auto slaves = get_slaves(*instance); std::vector<std::pair<mysqlshdk::mysql::Slave_host, std::string>> real_slaves; auto ipool = current_ipool(); for (const auto &ch : slaves) { // This is a bit unnecessary, but we do it to avoid false positives, // specially in tests. The problem is that SHOW SLAVE HOSTS will include // slaves that have stopped replicating, so we need to double-check that // the channel is still active to prevent false positives. std::string endpoint = mysqlshdk::utils::make_host_and_port(ch.host, ch.port); std::string channel_name; bool ghost_slave = true; // if the instances hasn't report-host or report-port configured, the slave // will have an empty "host" (and possibly port with 0). In this case, we // should ignore the slave rather than connect to the incorrect instance (no // host defaults to localhost) if (!ch.has_valid_endpoint()) { log_info("Ignoring '%s' (%s), which is listed as a replica for '%s'", endpoint.c_str(), ch.uuid.c_str(), instance->descr().c_str()); real_slaves.push_back({ch, ""}); continue; } try { Scoped_instance slave(ipool->connect_unchecked_endpoint(endpoint)); auto slave_channels = mysqlshdk::mysql::get_incoming_channels(*slave); for (const auto &slave_channel : slave_channels) { if (slave_channel.source_uuid == instance->get_uuid()) { channel_name = slave_channel.channel_name; ghost_slave = false; break; } } } catch (const shcore::Exception &e) { log_info( "Could not connect to %s, which is listed as a replica for %s: %s", endpoint.c_str(), instance->descr().c_str(), e.format().c_str()); real_slaves.push_back({ch, ""}); continue; } if (ghost_slave) { log_info("Ignoring stale replica host %s for %s", endpoint.c_str(), instance->descr().c_str()); continue; } real_slaves.push_back({ch, "'" + channel_name + "'"}); } return real_slaves; } void validate_instance_is_standalone(Instance *target_server, bool add_op = false) { auto console = current_console(); // Look for unexpected replication channels auto channels = mysqlshdk::mysql::get_incoming_channels(*target_server); auto slaves = get_valid_slaves(target_server); if (!channels.empty() || !slaves.empty()) { console->print_error("Extraneous replication channels found at " + target_server->descr() + ":"); for (const auto &ch : channels) { console->print_info( "- channel '" + ch.channel_name + "' from " + mysqlshdk::utils::make_host_and_port(ch.host, ch.port)); } for (const auto &sl : slaves) { if (!sl.first.has_valid_endpoint()) console->print_info(shcore::str_format( "- %s replicates from this instance but has an address that can't " "be reached: '%s'", sl.first.uuid.c_str(), mysqlshdk::utils::make_host_and_port(sl.first.host, sl.first.port) .c_str())); else if (sl.second.empty()) console->print_info( "- " + mysqlshdk::utils::make_host_and_port(sl.first.host, sl.first.port) + " replicates from this instance"); else console->print_info( "- " + mysqlshdk::utils::make_host_and_port(sl.first.host, sl.first.port) + " replicates from this instance (channel " + sl.second + ")"); } console->print_info(); std::string info_msg = "Unmanaged replication channels are not supported in a replicaset. If " "you'd like to manage an existing MySQL replication topology with the " "Shell, use the <<<createReplicaSet>>>() operation with the " "adoptFromAR option."; if (add_op) { std::string replica_term = mysqlshdk::mysql::get_replica_keyword(target_server->get_version()); info_msg.append( " If the <<<addInstance>>>() operation previously failed for the " "target instance and you are trying to add it again, then after " "fixing the issue you should reset the current replication settings " "before retrying to execute the operation. To reset the replication " "settings on the target instance execute the following statements: " "'STOP " + replica_term + ";' and 'RESET " + replica_term + " ALL;'."); } console->print_para(info_msg); throw shcore::Exception("Unexpected replication channel", SHERR_DBA_INVALID_SERVER_CONFIGURATION); } } void validate_adopted_topology(Global_topology_manager *topology, const topology::Node **out_primary) { auto console = current_console(); auto ipool = current_ipool(); // check config of all instances console->print_info("* Checking configuration of discovered instances..."); for (const auto *server : topology->topology()->nodes()) { Scoped_instance instance(ipool->connect_unchecked_endpoint( server->get_primary_member()->endpoint)); validate_version(*instance); validate_instance(instance.get()); } console->print_info(); // check topology topology->validate_adopt_cluster(out_primary); console->print_info(); console->print_info("Validations completed successfully."); console->print_info(); } } // namespace Replica_set_impl::Replica_set_impl( const std::string &cluster_name, const std::shared_ptr<Instance> &target, const std::shared_ptr<MetadataStorage> &metadata_storage, Global_topology_type topology_type) : Base_cluster_impl(cluster_name, target, metadata_storage), m_topology_type(topology_type) { set_description("Default ReplicaSet"); } Replica_set_impl::Replica_set_impl( const Cluster_metadata &cm, const std::shared_ptr<Instance> &target, const std::shared_ptr<MetadataStorage> &metadata_storage) : Base_cluster_impl(cm.cluster_name, target, metadata_storage), m_topology_type(cm.async_topology_type) { m_id = cm.cluster_id; m_description = cm.description; } std::shared_ptr<Replica_set_impl> Replica_set_impl::create( const std::string &full_cluster_name, Global_topology_type topology_type, const std::shared_ptr<Instance> &target_server, const Create_replicaset_options &options) { // Acquire required locks on target instance. // No "write" operation allowed to be executed concurrently on the target // instance. auto i_lock = target_server->get_lock_exclusive(); // Validate the cluster_name mysqlsh::dba::validate_cluster_name(full_cluster_name, Cluster_type::ASYNC_REPLICATION); // if adopting, memberAuth only support "password" if (options.adopt && (options.member_auth_options.member_auth_type != Replication_auth_type::PASSWORD)) throw shcore::Exception::argument_error( shcore::str_format("Cannot set '%s' to a value other than 'PASSWORD' " "if '%s' is set to true.", kMemberAuthType, kAdoptFromAR)); std::string domain_name; std::string cluster_name; parse_fully_qualified_cluster_name(full_cluster_name, &domain_name, nullptr, &cluster_name); if (domain_name.empty()) domain_name = k_default_domain_name; auto console = current_console(); if (options.adopt) { console->print_info("A new replicaset with the topology visible from '" + target_server->descr() + "' will be created.\n"); } else { console->print_info("A new replicaset with instance '" + target_server->descr() + "' will be created.\n"); } if (options.dry_run) { console->print_note( "dryRun option was specified. Validations will be executed, " "but no changes will be applied."); } auto metadata = std::make_shared<MetadataStorage>(target_server); auto cluster = std::make_shared<Replica_set_impl>(cluster_name, target_server, metadata, topology_type); if (options.adopt) { console->print_info("* Scanning replication topology..."); auto topology = std::make_unique<Star_global_topology_manager>( 0, discover_unmanaged_topology(target_server.get())); console->print_info(); cluster->adopt(topology.get(), options, options.dry_run); } else { cluster->create(options, options.dry_run); } if (!options.dry_run) { metadata->update_cluster_attribute( cluster->get_id(), k_cluster_attribute_assume_gtid_set_complete, options.gtid_set_is_complete ? shcore::Value::True() : shcore::Value::False()); } return cluster; } void Replica_set_impl::create(const Create_replicaset_options &options, bool dry_run) { auto console = current_console(); console->print_info("* Checking MySQL instance at " + m_cluster_server->descr()); validate_instance(m_cluster_server.get()); validate_instance_is_standalone(m_cluster_server.get()); console->print_info(); log_info("Unfencing PRIMARY %s", m_cluster_server->descr().c_str()); if (!dry_run) unfence_instance(m_cluster_server.get(), true); // target is the primary m_primary_master = m_cluster_server; m_primary_master->retain(); Cluster_ssl_mode ssl_mode = options.ssl_mode; resolve_ssl_mode_option("replicationSslMode", "ReplicaSet", *m_cluster_server, &ssl_mode); // check if member auth request mode is supported validate_instance_member_auth_type( *m_primary_master, false, ssl_mode, "replicationSslMode", options.member_auth_options.member_auth_type); // check if SSL options are valid validate_instance_member_auth_options( "replicaset", options.member_auth_options.member_auth_type, options.member_auth_options.cert_issuer, options.member_auth_options.cert_subject); if (current_shell_options()->get().dba_connectivity_checks) { console->print_info("* Checking connectivity and SSL configuration..."); test_self_connection(*m_cluster_server, "", ssl_mode, options.member_auth_options.member_auth_type, options.member_auth_options.cert_issuer, options.member_auth_options.cert_subject, ""); } console->print_info("* Updating metadata..."); try { // First we need to create the Metadata Schema prepare_metadata_schema(m_cluster_server, dry_run); // Update metadata log_info("Creating replicaset metadata..."); if (!dry_run) { MetadataStorage::Transaction trx(m_metadata_storage); auto id = m_metadata_storage->create_async_cluster_record(this, false); m_metadata_storage->update_cluster_attribute( id, k_cluster_attribute_replication_allowed_host, options.replication_allowed_host.empty() ? shcore::Value("%") : shcore::Value(options.replication_allowed_host)); m_metadata_storage->update_cluster_attribute( id, k_cluster_attribute_member_auth_type, shcore::Value( to_string(options.member_auth_options.member_auth_type))); m_metadata_storage->update_cluster_attribute( id, k_cluster_attribute_cert_issuer, shcore::Value(options.member_auth_options.cert_issuer)); m_metadata_storage->update_cluster_attribute( id, k_replica_set_attribute_ssl_mode, shcore::Value(to_string(ssl_mode))); trx.commit(); } // create repl user to be used in the future auto user = create_replication_user( m_cluster_server.get(), options.member_auth_options.cert_subject, dry_run); log_info("Recording metadata for %s", m_cluster_server->descr().c_str()); if (!dry_run) { manage_instance(m_cluster_server.get(), {user.first.user, user.second}, options.instance_label, 0, true); get_metadata_storage()->update_instance_attribute( m_cluster_server.get()->get_uuid(), k_instance_attribute_cert_subject, shcore::Value(options.member_auth_options.cert_subject)); } console->print_info(); } catch (...) { console->print_error( "Failed to update the metadata. Please fix the issue and drop the " "metadata using dba.<<<dropMetadataSchema>>>() before retrying to " "execute the operation."); throw; } if (dry_run) { console->print_info("dryRun finished."); console->print_info(); } } void Replica_set_impl::adopt(Global_topology_manager *topology, const Create_replicaset_options &options, bool dry_run) { auto console = current_console(); console->print_info( "* Discovering async replication topology starting with " + m_cluster_server->descr()); console->print_info("Discovered topology:"); topology->topology()->show_raw(); console->print_info(); const topology::Node *primary = nullptr; validate_adopted_topology(topology, &primary); // Only allow adopt from the primary, to avoid surprises where we accidentally // adopt the wrong instance because of some secondary replication channel if (primary->get_primary_member()->uuid != m_metadata_storage->get_md_server()->get_uuid()) { console->print_error( "Active connection must be to the PRIMARY when adopting an " "existing replication topology."); throw shcore::Exception("Target instance is not the PRIMARY", SHERR_DBA_BADARG_INSTANCE_NOT_PRIMARY); } auto ipool = current_ipool(); Scoped_instance primary_instance_scoped(ipool->connect_unchecked_endpoint( primary->get_primary_member()->endpoint)); std::shared_ptr<Instance> primary_instance = primary_instance_scoped; log_info("Unfencing PRIMARY %s", primary_instance->descr().c_str()); if (!dry_run) unfence_instance(primary_instance.get(), true); // target is the primary m_primary_master = m_cluster_server; m_primary_master->retain(); console->print_info("* Updating metadata..."); try { // First we need to create the Metadata Schema prepare_metadata_schema(primary_instance, dry_run); // Update metadata log_info("Creating replicaset metadata..."); if (!dry_run) { MetadataStorage::Transaction trx(m_metadata_storage); auto id = m_metadata_storage->create_async_cluster_record(this, true); if (!options.replication_allowed_host.empty()) m_metadata_storage->update_cluster_attribute( id, k_cluster_attribute_replication_allowed_host, shcore::Value(options.replication_allowed_host)); m_metadata_storage->update_instance_attribute( id, k_instance_attribute_cert_subject, shcore::Value(options.member_auth_options.cert_subject)); trx.commit(); } // Create rpl user to be used in the future (for primary). auto user = create_replication_user( m_cluster_server.get(), options.member_auth_options.cert_subject, dry_run); Instance_id primary_id = 0; if (!dry_run) primary_id = manage_instance(primary_instance.get(), {}, "", 0, true); console->print_info(); for (const auto *server : topology->topology()->nodes()) { if (server == primary) continue; Scoped_instance instance(ipool->connect_unchecked_endpoint( server->get_primary_member()->endpoint)); std::string cert_subject; if (!dry_run) cert_subject = query_cluster_instance_auth_cert_subject(*instance); // Create rpl user to be used in the future (for secondary). user = create_replication_user(instance.get(), cert_subject, dry_run); log_info("Fencing SECONDARY %s", server->label.c_str()); if (!dry_run) fence_instance(instance.get()); log_info("Recording metadata for %s", server->label.c_str()); if (!dry_run) manage_instance(instance.get(), {}, "", primary_id, false); } } catch (...) { console->print_error( "Failed to update the metadata. Please fix the issue and drop the " "metadata using dba.<<<dropMetadataSchema>>>() before retrying to " "execute the operation."); throw; } if (dry_run) { console->print_info("dryRun finished."); console->print_info(); } } void Replica_set_impl::read_replication_options( Async_replication_options *ar_options) { // Get the ClusterSet configured SSL mode { shcore::Value value; get_metadata_storage()->query_cluster_attribute( get_id(), k_replica_set_attribute_ssl_mode, &value); if (value) ar_options->ssl_mode = to_cluster_ssl_mode(value.as_string()); else ar_options->ssl_mode = Cluster_ssl_mode::DISABLED; } // get the recovery auth mode ar_options->auth_type = query_cluster_auth_type(); } void Replica_set_impl::validate_add_instance( Global_topology_manager *topology, mysqlshdk::mysql::IInstance * /*master*/, Instance *target_instance, Async_replication_options *ar_options, Clone_options *clone_options, const std::string &cert_subject, bool interactive) { auto validate_not_managed = [this](const Instance &target) { // Check if the instance is already a member of this replicaset Instance_metadata md; bool already_managed = false; try { md = m_metadata_storage->get_instance_by_uuid(target.get_uuid()); already_managed = true; } catch (const shcore::Exception &e) { log_info("Error querying metadata for %s: %s\n", target.get_uuid().c_str(), e.what()); } bool is_invalidated = false; if (!already_managed) { // The instance is not a member of this replicaset, but it has // the metadata schema. Check if this isn't an invalidated member. MetadataStorage metadata(target); if (metadata.check_version()) { try { md = metadata.get_instance_by_uuid(target.get_uuid()); // Check whether the other instance was once part of the same rs if (md.cluster_id == get_id()) is_invalidated = true; } catch (const std::exception &e) { log_info("Error querying local copy of metadata for %s: %s", target.get_uuid().c_str(), e.what()); } } } if (is_invalidated) { // If this is a removed invalidated member of the replicaset, we can // only add it back if the GTID set has not diverged. // The GTID check will happen later on, as part of the regular checks. log_info("%s appears to be an invalidated ex-member of the replicaset", target.descr().c_str()); } else { // Check whether the instance we found is really the target one // or if it's just a case of duplicate UUID if (!md.cluster_id.empty() && !mysqlshdk::utils::are_endpoints_equal( md.address, target.get_canonical_address())) { log_info("%s: UUID %s already in used by %s", target.descr().c_str(), target.get_uuid().c_str(), md.address.c_str()); throw shcore::Exception("server_uuid in " + target.descr() + " is expected to be unique, but " + md.address + " already uses the same value", SHERR_DBA_INVALID_SERVER_CONFIGURATION); } if (md.cluster_id == get_id()) { throw shcore::Exception( target.descr() + " is already a member of this replicaset.", SHERR_DBA_BADARG_INSTANCE_ALREADY_MANAGED); } else if (!md.cluster_id.empty()) { std::string label = md.group_name.empty() ? "replicaset." : "cluster."; throw shcore::Exception( target.descr() + " is already managed by a " + label, SHERR_DBA_BADARG_INSTANCE_ALREADY_MANAGED); } } }; auto validate_unique_server_id = [](Instance *target, Global_topology_manager *topology_mgr) { std::string server_uuid = target->get_uuid(); uint32_t server_id = target->get_server_id(); for (const auto &node : topology_mgr->topology()->nodes()) { for (const auto &m : node->members()) { // uniqueness of uuid is checked in validate_not_managed() if (m.server_id.value_or(0) == server_id) { throw shcore::Exception("server_id in " + target->descr() + " is expected to be unique, but " + m.label + " already uses the same value", SHERR_DBA_INVALID_SERVER_CONFIGURATION); } } } }; // Validate the Clone options. clone_options->check_option_values(target_instance->get_version()); // Check version early on, so that other checks don't need to care about it validate_version(*target_instance); // Check if the target is already managed by us or some other cluster validate_not_managed(*target_instance); // regular instance checks validate_instance(target_instance); validate_instance_is_standalone(target_instance, true); validate_unique_server_id(target_instance, topology); validate_instance_ssl_mode(Cluster_type::ASYNC_REPLICATION, *target_instance, ar_options->ssl_mode); auto console = current_console(); // check consistency of the global topology console->print_info(); topology->validate_add_replica(nullptr, target_instance, *ar_options); std::shared_ptr<Instance> donor_instance = get_cluster_server(); console->print_info(); if (current_shell_options()->get().dba_connectivity_checks) { console->print_info("* Checking connectivity and SSL configuration..."); mysqlshdk::mysql::Set_variable sro(*target_instance, "super_read_only", 0, true); auto member_auth = query_cluster_auth_type(); std::string cert_issuer = query_cluster_auth_cert_issuer(); test_peer_connection( *target_instance, "", cert_subject, *m_primary_master, "", query_cluster_instance_auth_cert_subject(m_primary_master->get_uuid()), ar_options->ssl_mode, member_auth, cert_issuer, ""); } console->print_info(); console->print_info("* Checking transaction state of the instance..."); clone_options->recovery_method = validate_instance_recovery( Member_op_action::ADD_INSTANCE, donor_instance.get(), target_instance, *clone_options->recovery_method, get_gtid_set_is_complete(), interactive); DBUG_EXECUTE_IF("dba_abort_async_add_replica", { throw std::logic_error("debug"); }); } void Replica_set_impl::add_instance( const std::string &instance_def, const Async_replication_options &ar_options_, const Clone_options &clone_options_, const std::string &label, const std::string &auth_cert_subject, Recovery_progress_style progress_style, int sync_timeout, bool interactive, bool dry_run) { check_preconditions_and_primary_availability("addInstance"); Async_replication_options ar_options(ar_options_); Clone_options clone_options(clone_options_); // Testing hack to set a replication delay. This should be removed // if/when a replication delay option is ever added. DBUG_EXECUTE_IF("dba_add_instance_master_delay", { ar_options.master_delay = 3; }); // Connect to the target Instance. const auto target_instance = Scoped_instance(connect_target_instance(instance_def)); const auto target_uuid = target_instance->get_uuid(); // Acquire required locks on target instance (acquired on primary after). // No "write" operation allowed to be executed concurrently on the target // instance, but the primary can be "shared" by other operations on different // target instances. auto i_lock = target_instance->get_lock_exclusive(); // NOTE: Acquire a shared lock on the primary only if the UUID is different // from the target instance. mysqlshdk::mysql::Lock_scoped plock; if (target_uuid.empty() || target_uuid != get_primary_master()->get_uuid()) plock = get_primary_master()->get_lock_shared(); auto topology = setup_topology_manager(); auto console = current_console(); console->print_info("Adding instance to the replicaset..."); console->print_info(); read_replication_options(&ar_options); // recovery auth checks { auto auth_type = query_cluster_auth_type(); // check if member auth request mode is supported validate_instance_member_auth_type(*target_instance, false, ar_options.ssl_mode, "replicationSslMode", auth_type); // check if certSubject was correctly supplied validate_instance_member_auth_options("replicaset", false, auth_type, auth_cert_subject); } console->print_info("* Performing validation checks"); validate_add_instance(topology.get(), get_primary_master().get(), target_instance.get(), &ar_options, &clone_options, auth_cert_subject, interactive); console->print_info("* Updating topology"); // Create the recovery account auto user = create_replication_user(target_instance.get(), auth_cert_subject, dry_run); ar_options.repl_credentials = user.first; try { if (*clone_options.recovery_method == Member_recovery_method::CLONE) { // Do and monitor the clone handle_clone(target_instance, clone_options, ar_options, user.second, auth_cert_subject, progress_style, sync_timeout, dry_run); // When clone is used, the target instance will restart and all // connections are closed so we need to test if the connection to the // target instance and MD are closed and re-open if necessary target_instance->reconnect_if_needed("Target"); m_metadata_storage->get_md_server()->reconnect_if_needed("Metadata"); // Clone will copy all tables, including the replication settings stored // in mysql.slave_master_info. MySQL will start replication by default if // the replication setting are not empty, so in a fast system or if // --skip-slave-start is not enabled replication will start and the slave // threads will be up-and-running before we issue the new CHANGE MASTER. // This will result in an error: MySQL Error 3081 (HY000): (...) This // operation cannot be performed with running replication threads; run // STOP SLAVE FOR CHANNEL '' first (BUG#30632029) // // To avoid this situation, we must stop the slave and reset the // replication channels. remove_channel(target_instance.get(), k_replicaset_channel_name, dry_run); } // Update the global topology first, which means setting replication // channel to the primary in the master replica, so we can know if that // works. Async replication between DCs has many things that can go wrong // (bad address, private vs public address, network issue, account issues, // firewalls etc), so we do this first to keep an eventual rollback to a // minimum while retries would also be simpler to handle. async_add_replica(get_primary_master().get(), target_instance.get(), k_replicaset_channel_name, ar_options, true, dry_run); console->print_info( "** Waiting for new instance to synchronize with PRIMARY..."); if (!dry_run) { try { // Sync and check whether the slave started OK sync_transactions(*target_instance, {k_replicaset_channel_name}, sync_timeout); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_GTID_SYNC_TIMEOUT) { console->print_info( "You may increase or disable the transaction sync timeout with " "the timeout option for <<<addInstance>>>()"); } throw; } catch (const cancel_sync &) { // Throw it up throw; } } Instance_id master_id = static_cast<const topology::Server *>( topology->topology()->get_primary_master_node()) ->instance_id; log_info("Recording metadata for %s", target_instance->descr().c_str()); if (!dry_run) { manage_instance(target_instance.get(), {user.first.user, user.second}, label, master_id, false); get_metadata_storage()->update_instance_attribute( target_instance->get_uuid(), k_instance_attribute_cert_subject, shcore::Value(auth_cert_subject)); } } catch (const cancel_sync &) { if (!dry_run) { revert_topology_changes(target_instance.get(), true, false); } console->print_info(); console->print_info("Changes successfully reverted."); return; } catch (const std::exception &e) { console->print_error("Error adding instance to replicaset: " + format_active_exception()); log_warning("While adding async instance: %s", e.what()); if (!dry_run) { revert_topology_changes(target_instance.get(), true, false); } console->print_info(); console->print_info("Changes successfully reverted."); console->print_error(target_instance->descr() + " could not be added to the replicaset"); throw; } console->print_info(shcore::str_format( "The instance '%s' was added to the replicaset and is replicating " "from %s.\n", target_instance->descr().c_str(), get_primary_master()->get_canonical_address().c_str())); // Wait for the new replica to catch up metadata state // Errors after this point don't rollback. if (target_instance && !dry_run && sync_timeout >= 0) { console->print_info( "* Waiting for instance '" + target_instance->descr() + "' to synchronize the Metadata updates with the PRIMARY..."); sync_transactions(*target_instance, {k_replicaset_channel_name}, sync_timeout); } if (dry_run) { console->print_info("dryRun finished."); console->print_info(); } } void Replica_set_impl::validate_rejoin_instance( Global_topology_manager *topology_mng, Instance *target, Clone_options *clone_options, Instance_metadata *out_instance_md, bool interactive) { auto console = current_console(); // Validate the Clone options. clone_options->check_option_values(target->get_version()); // Check if the target instance belongs to the replicaset (MD). std::string target_address = target->get_canonical_address(); try { *out_instance_md = m_metadata_storage->get_instance_by_address(target_address); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_MEMBER_METADATA_MISSING) { console->print_error( "Cannot rejoin an instance that does not belong to the replicaset. " "Please confirm the specified address or execute the operation " "against the correct ReplicaSet object."); throw shcore::Exception( "Instance " + target_address + " does not belong to the replicaset", SHERR_DBA_BADARG_INSTANCE_NOT_IN_CLUSTER); } throw; } // regular instance checks validate_instance(target); // Check instance status and consistency with the global topology topology_mng->validate_rejoin_replica(target); std::shared_ptr<Instance> donor_instance = get_cluster_server(); console->print_info("** Checking transaction state of the instance..."); clone_options->recovery_method = validate_instance_recovery( Member_op_action::REJOIN_INSTANCE, donor_instance.get(), target, *clone_options->recovery_method, get_gtid_set_is_complete(), interactive); } void Replica_set_impl::rejoin_instance(const std::string &instance_def, const Clone_options &clone_options_, Recovery_progress_style progress_style, int sync_timeout, bool interactive, bool dry_run) { log_debug("Checking rejoin instance preconditions."); check_preconditions_and_primary_availability("rejoinInstance"); Clone_options clone_options(clone_options_); auto console = current_console(); log_debug("Connecting to target instance."); const auto target_instance = Scoped_instance(connect_target_instance(instance_def)); // Acquire required locks on target instance (already acquired on primary). // No "write" operation allowed to be executed concurrently on the target // instance, but the primary can be "shared" by other operations on different // target instances. auto i_lock = target_instance->get_lock_exclusive(); log_debug("Setting up topology manager."); auto topology_mng = setup_topology_manager(nullptr, true); log_debug("Get current PRIMARY and ensure it is healthy (updatable)."); // NOTE: Acquire a shared lock on the primary only if the UUID is different // from the target instance. mysqlshdk::mysql::Lock_scoped plock; if (const auto target_uuid = target_instance->get_uuid(); target_uuid.empty() || target_uuid != get_primary_master()->get_uuid()) plock = get_primary_master()->get_lock_shared(); console->print_info("* Validating instance..."); Instance_metadata instance_md; validate_rejoin_instance(topology_mng.get(), target_instance.get(), &clone_options, &instance_md, interactive); // Update target instance replication settings and restart replication. console->print_info("* Rejoining instance to replicaset..."); DBUG_EXECUTE_IF("dba_abort_async_rejoin_replica", { throw std::logic_error("debug"); }); // NOTE: Replication user needs to be refreshed in case we are rejoining the // old PRIMARY from the replicaset. Async_replication_options ar_options; read_replication_options(&ar_options); std::string repl_account_host; std::tie(ar_options.repl_credentials, repl_account_host) = refresh_replication_user(target_instance.get(), dry_run); try { if (*clone_options.recovery_method == Member_recovery_method::CLONE) { auto cert_subject = query_cluster_instance_auth_cert_subject(*target_instance); // since we're in a rejoin, check if the value is valid switch (auto auth_type = query_cluster_auth_type(); auth_type) { case mysqlsh::dba::Replication_auth_type::CERT_SUBJECT: case mysqlsh::dba::Replication_auth_type::CERT_SUBJECT_PASSWORD: if (cert_subject.empty()) throw shcore::Exception::runtime_error(shcore::str_format( "The replicaset's SSL mode is set to %s but the instance being " "rejoined doesn't have a valid 'certSubject' option. Please " "stop " "GR on that instance and then add it back using " "cluster.addInstance() with the appropriate authentication " "options.", to_string(auth_type).c_str())); break; default: break; } // Do and monitor the clone handle_clone(target_instance, clone_options, ar_options, repl_account_host, cert_subject, progress_style, sync_timeout, dry_run); // When clone is used, the target instance will restart and all // connections are closed so we need to test if the connection to the // target instance and MD are closed and re-open if necessary target_instance->reconnect_if_needed("Target"); m_metadata_storage->get_md_server()->reconnect_if_needed("Metadata"); // Clone will copy all tables, including the replication settings stored // in mysql.slave_master_info. MySQL will start replication by default if // the replication setting are not empty, so in a fast system or if // --skip-slave-start is not enabled replication will start and the slave // threads will be up-and-running before we issue the new CHANGE MASTER. // This will result in an error: MySQL Error 3081 (HY000): (...) This // operation cannot be performed with running replication threads; run // STOP SLAVE FOR CHANNEL '' first (BUG#30632029) // // To avoid this situation, we must stop the slave and reset the // replication channels. remove_channel(target_instance.get(), k_replicaset_channel_name, dry_run); reset_channel(target_instance.get(), k_replicaset_channel_name, true, dry_run); } if (!dry_run) { async_rejoin_replica(get_primary_master().get(), target_instance.get(), k_replicaset_channel_name, ar_options); console->print_info( "** Waiting for rejoined instance to synchronize with PRIMARY..."); try { // Sync and check whether the slave started OK sync_transactions(*target_instance, {k_replicaset_channel_name}, sync_timeout); } catch (const cancel_sync &) { log_info("Operating canceled during transactions sync at %s.", target_instance->descr().c_str()); // Throw it up throw; } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_GTID_SYNC_TIMEOUT) { console->print_info( "You may increase or disable the transaction sync timeout with " "the timeout option for <<<rejoinInstance>>>()"); } throw e; } } console->print_info("* Updating the Metadata..."); // Set new instance information and store in MD. if (!dry_run) { MetadataStorage::Transaction trx(m_metadata_storage); instance_md.master_id = static_cast<const topology::Server *>( topology_mng->topology()->get_primary_master_node()) ->instance_id; instance_md.primary_master = false; m_metadata_storage->record_async_member_rejoined(instance_md); trx.commit(); ensure_metadata_has_server_uuid(*target_instance); } } catch (const cancel_sync &) { stop_channel(target_instance.get(), k_replicaset_channel_name, true, false); console->print_info(); console->print_info("Changes successfully reverted."); return; } catch (const std::exception &e) { console->print_error("Error rejoining instance to replicaset: " + format_active_exception()); log_warning("While rejoining async instance: %s", e.what()); stop_channel(target_instance.get(), k_replicaset_channel_name, true, false); console->print_error(target_instance->descr() + " could not be rejoined to the replicaset"); throw; } console->print_info(shcore::str_format( "The instance '%s' rejoined the replicaset and is replicating " "from %s.\n", target_instance->descr().c_str(), get_primary_master().get()->get_canonical_address().c_str())); if (dry_run) { console->print_info("dryRun finished."); console->print_info(); } } void Replica_set_impl::validate_remove_instance( Global_topology_manager *topology, mysqlshdk::mysql::IInstance *master, const std::string &target_address, Instance *target, bool force, Instance_metadata *out_instance_md, bool *out_repl_working) { auto console = current_console(); // check if belongs to the replicaset try { *out_instance_md = m_metadata_storage->get_instance_by_address(target_address); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_MEMBER_METADATA_MISSING) { console->print_error( "Instance " + target_address + " cannot be removed because it does not belong to the replicaset " "(not found in the metadata). If you really want to remove this " "instance because it is still using replication then it must be " "stopped manually."); throw shcore::Exception( target_address + " does not belong to the replicaset", SHERR_DBA_BADARG_INSTANCE_NOT_IN_CLUSTER); } throw; } if (out_instance_md->primary_master) { console->print_error(target_address + " is a PRIMARY and cannot be removed."); throw shcore::Exception( "PRIMARY instance cannot be removed from the replicaset.", SHERR_DBA_BADARG_INSTANCE_REMOVE_NOT_ALLOWED); } if (out_instance_md->cluster_id != get_id()) throw shcore::Exception( target_address + " does not belong to the replicaset", SHERR_DBA_BADARG_INSTANCE_NOT_IN_CLUSTER); // check consistency of the global topology if (target) { topology->validate_remove_replica(master, target, force, out_repl_working); } } void Replica_set_impl::remove_instance(const std::string &instance_def_, std::optional<bool> force, int timeout) { log_debug("Checking remove instance preconditions."); check_preconditions_and_primary_availability("removeInstance"); auto console = current_console(); auto ipool = current_ipool(); // Normalize what's given from the argument, to strip out username, pwd // and other garbage std::string instance_def = Connection_options(instance_def_).uri_endpoint(); log_debug("Setting up topology manager."); auto topology = setup_topology_manager(); log_debug("Connecting to target instance."); Scoped_instance target_server; try { // Do not print the ERROR message here (in connect_target_instance()) target_server = Scoped_instance(connect_target_instance(instance_def_, false)); } catch (const shcore::Exception &) { // Check if instance belongs to the replicaset (to send a more user-friendly // message to users) bool belong_to_md = true; std::string target_address = target_server ? target_server->get_canonical_address() : instance_def; if (Connection_options(target_address).has_port()) { try { m_metadata_storage->get_instance_by_address(target_address); } catch (const shcore::Exception &err) { if (err.code() == SHERR_DBA_MEMBER_METADATA_MISSING) { belong_to_md = false; } // Ignore any error here: when checking if belongs to metadata. } } else { // user didn't provide a port, we cannot check if instance is in metadata belong_to_md = false; } if (!force.has_value() || !force.value()) { console->print_error( "Unable to connect to the target instance " + target_address + ". Please make sure the instance is available and try again. If the " "instance is permanently not reachable, use the 'force' option to " "remove it from the replicaset metadata and skip reconfiguration of " "that instance."); throw; } else { if (belong_to_md) { console->print_note( "Unable to connect to the target instance " + target_address + ". The instance will only be removed from the metadata, but its " "replication configuration cannot be updated. Please, take any " "necessary actions to make sure that the instance will not " "replicate from the replicaset if brought back online."); } else { console->print_error( "Instance " + target_address + " is unreachable and was not found in the replicaset metadata. " "The exact address of the instance as recorded in the metadata " "must be used in cases where the target is unreachable."); throw shcore::Exception( target_address + " does not belong to the replicaset", SHERR_DBA_BADARG_INSTANCE_NOT_IN_CLUSTER); } } } // Acquire required locks on target instance and primary. // No "write" operation allowed to be executed concurrently on the target // instance, but the primary can be "shared" by other operations on different // target instances. // NOTE: Do not lock target instance if unreachable, and skip primary if the // same as the target instance. mysqlshdk::mysql::Lock_scoped slock, plock; { std::string target_uuid; if (target_server) { slock = target_server->get_lock_exclusive(); target_uuid = target_server->get_uuid(); } // NOTE: Acquire a shared lock on the primary only if the UUID is different // from the target instance. if (target_uuid.empty() || target_uuid != get_primary_master()->get_uuid()) plock = get_primary_master()->get_lock_shared(); } Instance_metadata md; bool repl_working = false; validate_remove_instance( topology.get(), get_primary_master().get(), target_server.get() ? target_server->get_canonical_address() : instance_def, target_server.get(), force.value_or(false), &md, &repl_working); if (md.invalidated) { console->print_note(md.label + " is invalidated, replication sync will be skipped."); timeout = -1; } // sync transactions before making changes (if not invalidated) if (target_server && repl_working && timeout >= 0) { try { console->print_info("* Waiting for instance '" + target_server->descr() + "' to synchronize with the PRIMARY..."); sync_transactions(*target_server, {k_replicaset_channel_name}, timeout); } catch (const shcore::Exception &e) { if (force.value_or(false)) { console->print_warning( "Transaction sync failed but ignored because of 'force' option: " + e.format()); } else { console->print_error( "Transaction sync failed. Use the 'force' option to remove " "anyway."); throw; } } } // drop user first since we need to query MD for it - this will ignore DB // errors drop_replication_user(md.uuid, target_server.get()); // update metadata log_debug("Removing instance from the Metadata."); MetadataStorage::Transaction trx(get_metadata_storage()); m_metadata_storage->record_async_member_removed(md.cluster_id, md.id); trx.commit(); if (target_server.get()) { if (repl_working && timeout >= 0) { // If replication is working, sync once again so that the drop user and // metadata update are caught up with try { console->print_info( "* Waiting for instance '" + target_server->descr() + "' to synchronize the Metadata updates with the PRIMARY..."); sync_transactions(*target_server, {k_replicaset_channel_name}, timeout); } catch (const shcore::Exception &e) { if (force.value_or(false)) { console->print_warning( "Transaction sync failed but ignored because of 'force' " "option: " + e.format()); } else { console->print_error( "Transaction sync failed. Use the 'force' option to remove " "anyway."); throw; } } } // actual stop slave happens last, so that other changes can get propagated // to the instance being removed 1st log_debug("Updating replication settings on the target instance."); try { async_remove_replica(target_server.get(), k_replicaset_channel_name, false); } catch (const shcore::Exception &e) { console->print_error("Error updating replication settings: " + e.format()); console->print_info( shcore::str_format("Metadata for instance '%s' was deleted, but " "replication clean up failed with an error.\n", target_server->descr().c_str())); log_warning("While removing async instance: %s", e.what()); throw; } console->print_info(shcore::str_format( "The instance '%s' was removed from the replicaset.\n", md.label.c_str())); } else { console->print_info(shcore::str_format( "Metadata for instance '%s' was deleted, but instance " "configuration could not be updated.\n", instance_def.c_str())); } // If target server was the removed one, invalidate if (m_cluster_server->get_uuid() == md.uuid) { target_server_invalidated(); } } void Replica_set_impl::set_primary_instance(const std::string &instance_def, uint32_t timeout, bool dry_run) { auto console = current_console(); auto ipool = current_ipool(); check_preconditions_and_primary_availability("setPrimaryInstance"); // NOTE: Acquire an exclusive lock on the primary. auto plock = get_primary_master()->get_lock_exclusive(); topology::Server_global_topology *srv_topology = nullptr; auto topology = setup_topology_manager(&srv_topology); // topology->set_sync_timeout(timeout); const topology::Server *promoted = check_target_member(srv_topology, instance_def); if (!promoted) throw shcore::Exception( "Unable to find instance '" + instance_def + "' in the topology.", SHERR_DBA_ASYNC_MEMBER_TOPOLOGY_MISSING); const topology::Server *demoted = static_cast<const topology::Server *>( srv_topology->get_primary_master_node()); if (promoted == demoted) { console->print_info("Target instance " + promoted->label + " is already the PRIMARY."); return; } validate_node_status(promoted); console->print_info(shcore::str_format( "%s will be promoted to PRIMARY of '%s'.\nThe current PRIMARY is %s.\n", promoted->label.c_str(), get_name().c_str(), demoted->label.c_str())); console->print_info("* Connecting to replicaset instances"); std::list<Instance_metadata> unreachable; Scoped_instance_list instances(connect_all_members(0, false, &unreachable)); if (!unreachable.empty()) { throw shcore::Exception("One or more instances are unreachable", SHERR_DBA_ASYNC_MEMBER_UNREACHABLE); } // Acquire required locks on all (alive) replica set instances (except the // primary, already acquired). No "write" operation allowed to be executed // concurrently on the instances. auto i_locks = get_instance_lock_exclusive(instances.list(), std::chrono::seconds::zero(), get_primary_master()->get_uuid()); // another set of connections for locks // Note: give extra margin for the connection read timeout, so that it doesn't // get triggered before server-side timeouts Scoped_instance_list lock_instances( connect_all_members(timeout + 5, false, &unreachable)); if (!unreachable.empty()) { throw shcore::Exception("One or more instances are unreachable", SHERR_DBA_ASYNC_MEMBER_UNREACHABLE); } std::shared_ptr<Instance> master; std::shared_ptr<Instance> new_master; { auto it = std::find_if(instances.list().begin(), instances.list().end(), [promoted](const std::shared_ptr<Instance> &i) { return i->get_uuid() == promoted->get_primary_member()->uuid; }); if (it == instances.list().end()) { throw shcore::Exception::runtime_error(promoted->label + " cannot be promoted"); } new_master = *it; it = std::find_if(instances.list().begin(), instances.list().end(), [demoted](const std::shared_ptr<Instance> &i) { return i->get_uuid() == demoted->get_primary_member()->uuid; }); if (it == instances.list().end()) { throw std::logic_error("Internal error: couldn't find primary"); } master = *it; } console->print_info(); console->print_info("* Performing validation checks"); topology->validate_switch_primary(master.get(), new_master.get(), instances.list()); console->print_info(); // Pre-synchronize the promoted primary before making any changes // This should ensure that there's nothing left that could cause long delays // or timeouts (locks, replication lag etc). Once the metadata is updated, // the router will begin sending RW traffic to the new primary. We won't lose // consistency because they should fail with SRO errors, but we should try // to minimize the amount of time spent in that state. console->print_info("* Synchronizing transaction backlog at " + new_master->descr()); if (!dry_run) { sync_transactions(*new_master, {k_replicaset_channel_name}, timeout); } console->print_info("* Updating metadata"); // Re-generate a new password for the master being demoted. Async_replication_options ar_options; read_replication_options(&ar_options); std::string repl_account_host; std::tie(ar_options.repl_credentials, repl_account_host) = refresh_replication_user(master.get(), dry_run); // Update the metadata with the state the replicaset is supposed to be in log_info("Updating metadata at %s", m_metadata_storage->get_md_server()->descr().c_str()); if (!dry_run) { MetadataStorage::Transaction trx(get_metadata_storage()); m_metadata_storage->record_async_primary_switch(promoted->instance_id); trx.commit(); } console->print_info(); // Synchronize all slaves and lock all instances. Global_locks global_locks; try { global_locks.acquire(lock_instances.list(), demoted->get_primary_member()->uuid, timeout, dry_run); } catch (const std::exception &e) { console->print_error(shcore::str_format( "An error occurred while preparing replicaset instances for a PRIMARY " "switch: %s", e.what())); console->print_note("Reverting metadata changes"); if (!dry_run) { try { MetadataStorage::Transaction trx(get_metadata_storage()); m_metadata_storage->record_async_primary_switch(demoted->instance_id); trx.commit(); } catch (const std::exception &err) { console->print_warning(shcore::str_format( "Failed to revert metadata changes on the PRIMARY: %s", err.what())); } } throw; } console->print_info("* Updating replication topology"); // Update the topology but revert if it fails try { do_set_primary_instance(master.get(), new_master.get(), instances.list(), ar_options, dry_run); } catch (...) { console->print_note("Reverting metadata changes"); if (!dry_run) { MetadataStorage::Transaction trx(get_metadata_storage()); m_metadata_storage->record_async_primary_switch(demoted->instance_id); trx.commit(); } throw; } console->print_info(); // This will update the MD object to use the new primary if (!dry_run) { primary_instance_did_change(new_master); new_master->steal(); } console->print_info(new_master->get_canonical_address() + " was promoted to PRIMARY."); console->print_info(); if (dry_run) { console->print_info("dryRun finished."); console->print_info(); } } void Replica_set_impl::do_set_primary_instance( Instance *master, Instance *new_master, const std::list<std::shared_ptr<Instance>> &instances, const Async_replication_options &ar_options, bool dry_run) { auto console = current_console(); shcore::Scoped_callback_list undo_list; try { // First, promote the new primary by stopping slave, unfencing it and // making the old primary a slave of the new one. // Topology changes are reverted on exception. async_swap_primary(master, new_master, k_replicaset_channel_name, ar_options, &undo_list, dry_run); // NOTE: Skip old master, already setup previously by async_swap_primary(). async_change_primary(new_master, instances, k_replicaset_channel_name, ar_options, master, &undo_list, dry_run); } catch (...) { console->print_error("Error changing replication source: " + format_active_exception()); console->print_note("Reverting replication changes"); undo_list.call(); throw shcore::Exception("Error during switchover", SHERR_DBA_SWITCHOVER_ERROR); } undo_list.cancel(); // Clear replication configs from the promoted instance. Do it after // everything is done, to make reverting easier. try { reset_channel(new_master, k_replicaset_channel_name, true, dry_run); } catch (...) { // Failure to reset slave is not fatal console->print_warning("Error resetting replication configurations at " + new_master->descr()); } } void Replica_set_impl::force_primary_instance(const std::string &instance_def, uint32_t timeout, bool invalidate_error_instances, bool dry_run) { Async_replication_options ar_options; try { check_preconditions("forcePrimaryInstance"); } catch (const shcore::Exception &e) { // When forcing the primary, there's no primary available if (e.code() == SHERR_DBA_ASYNC_PRIMARY_UNAVAILABLE) { log_debug("No PRIMARY member available: %s", e.what()); } else { throw; } } auto console = current_console(); auto ipool = current_ipool(); topology::Server_global_topology *srv_topology = nullptr; auto topology = setup_topology_manager(&srv_topology); const topology::Server *demoted = static_cast<const topology::Server *>( srv_topology->get_primary_master_node()); const topology::Server *promoted = nullptr; // Check if the specified target instance is available. if (!instance_def.empty()) { promoted = check_target_member(srv_topology, instance_def); if (promoted == demoted) { throw shcore::Exception(promoted->label + " is already the PRIMARY", SHERR_DBA_BAD_ASYNC_PRIMARY_CANDIDATE); } } std::vector<Instance_metadata> instances_md = get_metadata_storage()->get_all_instances(get_id()); std::list<std::shared_ptr<Instance>> instances; std::list<Instance_id> invalidate_ids; { std::list<Instance_metadata> unreachable; console->print_info("* Connecting to replicaset instances"); // give extra margin for the connection read timeout, so that it doesn't // get triggered before server-side timeouts instances = connect_all_members(timeout + 5, true, &unreachable); if (!unreachable.empty()) { if (!invalidate_error_instances) { console->print_error( "Could not connect to one or more SECONDARY instances. Use the " "'invalidateErrorInstances' option to perform the failover anyway " "by skipping and invalidating unreachable instances."); throw shcore::Exception("One or more instances are unreachable", SHERR_DBA_UNREACHABLE_INSTANCES); } } for (const auto &i : unreachable) { console->print_note( i.label + " will be invalidated and must be removed from the replicaset."); invalidate_ids.push_back(i.id); // Remove invalidated instance from the instance metadata list. instances_md.erase( std::remove_if(instances_md.begin(), instances_md.end(), [&i](const Instance_metadata &i_md) { return i_md.uuid == i.uuid; }), instances_md.end()); } console->print_info(); } // Acquire required locks on all (alive) replica set instances. // No "write" operation allowed to be executed concurrently on the instances. auto i_locks = get_instance_lock_exclusive(instances); // Check for replication applier errors on all (online) instances, in order // to anticipate issues when applying retrieved transaction and try to // minimize the consequences of BUG#30148247. // NOTE: Instances with replication errors will be invalidated and skipped if // invalidate_error_instances = true. check_replication_applier_errors(srv_topology, &instances, invalidate_error_instances, &instances_md, &invalidate_ids); // Wait for all instances to apply retrieved transactions (relay log) first. // NOTE: Otherwise GTID_EXECUTED set might be missing trx when checking most // up-to-date instances. wait_all_apply_retrieved_trx(&instances, std::chrono::seconds{timeout}, invalidate_error_instances, &instances_md, &invalidate_ids); // Find a candidate to be promoted. // NOTE: Use updated (current) GTID_EXECUTED set from instance and not the // "cached" value in the srv_topology. if (instance_def.empty()) { console->print_info( "* Searching instance with the most up-to-date transaction set"); promoted = srv_topology->find_failover_candidate(instances); } if (!promoted) { assert(instance_def.empty()); throw shcore::Exception( "Could not find a suitable candidate to failover to", SHERR_DBA_NO_ASYNC_PRIMARY_CANDIDATES); } else { console->print_info( promoted->label + " will be promoted to PRIMARY of the replicaset and the " "former PRIMARY will be invalidated."); console->print_info(); } if (promoted->invalidated) throw shcore::Exception(promoted->label + " was invalidated by a failover", SHERR_DBA_ASYNC_MEMBER_INVALIDATED); std::shared_ptr<Instance> new_master; { auto it = std::find_if(instances.begin(), instances.end(), [promoted](const std::shared_ptr<Instance> &i) { return i->get_uuid() == promoted->get_primary_member()->uuid; }); // either an invalid target or the target is unavailable if (it == instances.end()) throw shcore::Exception::argument_error(promoted->label + " cannot be promoted"); new_master = *it; } // Validate instance to be promoted. topology->validate_force_primary(new_master.get(), instances); if (invalidate_ids.size() > 0) { console->print_note(shcore::str_format( "%zi instances will be skipped and invalidated during the failover", invalidate_ids.size())); console->print_info(); } try { console->print_info("* Promoting " + new_master->descr() + " to a PRIMARY..."); async_force_primary(new_master.get(), k_replicaset_channel_name, ar_options, dry_run); console->print_info(); // MD update has to happen after the failover, since there's no PRIMARY // before that console->print_info("* Updating metadata..."); if (!dry_run) { new_master->steal(); primary_instance_did_change(new_master); try { MetadataStorage::Transaction trx(get_metadata_storage()); m_metadata_storage->record_async_primary_forced_switch( promoted->instance_id, invalidate_ids); trx.commit(); } catch (const shcore::Exception &e) { // CR_SERVER_LOST will happen if the connection timeouts while waiting // for the INSERT/UPDATE to execute, which could happen if there's // a lock, for example. console->print_error("Could not update metadata: " + e.format()); if (e.code() == CR_SERVER_LOST) { console->print_info( "Metadata update may have failed because of a timeout."); } // Restart replication on the promoted instance and re-enable read-only, // i.e., revert all previous changes from async_force_primary(). Scoped_instance target(ipool->connect_unchecked(promoted)); undo_async_force_primary(target.get(), k_replicaset_channel_name, dry_run); // fence_instance(target.get()); invalidate_handle(); throw shcore::Exception("Error during failover: " + e.format(), SHERR_DBA_FAILOVER_ERROR); } } console->print_info(); console->print_info(promoted->label + " was force-promoted to PRIMARY."); console->print_note( "Former PRIMARY " + demoted->label + " is now invalidated and must be removed from the replicaset."); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_FAILOVER_ERROR) throw; log_warning("While forcing primary instance: %s", e.what()); throw; } catch (const std::exception &e) { log_warning("While forcing primary instance: %s", e.what()); throw; } console->print_info("* Updating source of remaining SECONDARY instances"); { shcore::Scoped_callback_list undo_list; try { async_change_primary(new_master.get(), instances, k_replicaset_channel_name, ar_options, nullptr, &undo_list, dry_run); } catch (...) { console->print_error("Error changing replication source: " + format_active_exception()); console->print_note("Reverting replication changes"); undo_list.call(); throw shcore::Exception("Error during switchover", SHERR_DBA_SWITCHOVER_ERROR); } undo_list.cancel(); } console->print_info(); // Clear replication configs from the promoted instance. Do it after // everything is done, to make reverting easier. reset_channel(new_master.get(), k_replicaset_channel_name, true, dry_run); console->print_info("Failover finished successfully."); console->print_info(); if (dry_run) { console->print_info("dryRun finished."); console->print_info(); } } shcore::Value Replica_set_impl::status(int extended) { check_preconditions_and_primary_availability("status", false); shcore::Dictionary_t status; // topology info { Cluster_metadata cmd = get_metadata(); std::unique_ptr<topology::Global_topology> topo( topology::scan_global_topology(get_metadata_storage().get(), cmd, k_replicaset_channel_name, true)); Status_options opts; opts.show_members = true; opts.show_details = extended; status = replica_set_status( *dynamic_cast<topology::Server_global_topology *>(topo.get()), opts); status->get_map("replicaSet")->set("name", shcore::Value(get_name())); } // Gets the metadata version if (extended >= 1) { auto version = mysqlsh::dba::metadata::installed_version( m_metadata_storage->get_md_server()); status->set("metadataVersion", shcore::Value(version.get_base())); } // gets the async replication channels SSL info { auto instances = get_metadata_storage()->get_replica_set_instances(get_id()); auto topo = status->get_map("replicaSet")->get_map("topology"); for (const auto &i : instances) { if (!topo->has_key(i.endpoint)) continue; if (!topo->get_map(i.endpoint)->has_key("replication")) continue; std::string repl_account; { auto [account_user, account_host] = get_metadata_storage()->get_instance_repl_account( i.uuid, Cluster_type::ASYNC_REPLICATION); repl_account = std::move(account_user); } std::string ssl_cipher, ssl_version; mysqlshdk::mysql::iterate_thread_variables( *(get_metadata_storage()->get_md_server()), "Binlog Dump GTID", repl_account, "Ssl%", [&ssl_cipher, &ssl_version](std::string var_name, std::string var_value) { if (var_name == "Ssl_cipher") ssl_cipher = std::move(var_value); else if (var_name == "Ssl_version") ssl_version = std::move(var_value); return (ssl_cipher.empty() || ssl_version.empty()); }); if (ssl_cipher.empty() && ssl_version.empty()) continue; auto target_map = topo->get_map(i.endpoint)->get_map("replication"); assert(target_map->has_key("replicationSsl")); if (ssl_cipher.empty()) (*target_map)["replicationSsl"] = shcore::Value(std::move(ssl_version)); else if (ssl_version.empty()) (*target_map)["replicationSsl"] = shcore::Value(std::move(ssl_cipher)); else (*target_map)["replicationSsl"] = shcore::Value(shcore::str_format( "%s %s", ssl_cipher.c_str(), ssl_version.c_str())); } } return shcore::Value(status); } std::shared_ptr<Global_topology_manager> Replica_set_impl::setup_topology_manager( topology::Server_global_topology **out_topology, bool deep) { Cluster_metadata cmd; if (!get_metadata_storage()->get_cluster(get_id(), &cmd)) throw shcore::Exception("Metadata not found for replicaset " + get_name(), SHERR_DBA_METADATA_MISSING); std::unique_ptr<topology::Global_topology> topology( topology::scan_global_topology(get_metadata_storage().get(), cmd, k_replicaset_channel_name, deep)); auto gtm = std::make_shared<Star_global_topology_manager>(0, std::move(topology)); if (out_topology) *out_topology = dynamic_cast<topology::Server_global_topology *>(gtm->topology()); return gtm; } std::vector<Instance_metadata> Replica_set_impl::get_instances_from_metadata() const { return get_metadata_storage()->get_replica_set_instances(get_id()); } /* * Acquire the primary. * * Find the primary and lock it if needed, ensuring the replicaset object can * perform update operations. * * For a replicaset to be updatable, it's necessary that: * - the MD object is connected to the PRIMARY of the primary master of the * primary replicaset, so that the MD can be updated (and is also not lagged) * - the primary master of the replicaset is reachable, so that replicaset * accounts can be created there. * * An exception is thrown if not possible to connect to the primary master. * * An Instance object connected to the primary master is returned. The session * is owned by the replicaset object. */ std::tuple<mysqlsh::dba::Instance *, mysqlshdk::mysql::Lock_scoped> Replica_set_impl::acquire_primary_locked(mysqlshdk::mysql::Lock_mode mode, std::string_view skip_lock_uuid) { auto check_not_invalidated = [this](Instance *primary, Instance_metadata *out_new_primary) { uint64_t view_id; auto members = m_metadata_storage->get_replica_set_members(get_id(), &view_id); assert(!members.empty()); auto ipool = current_ipool(); // Check if some other member has a higher view_id for (const auto &m : members) { try { Scoped_instance i(ipool->connect_unchecked_uuid(m.uuid)); MetadataStorage md(*i); uint64_t alt_view_id = 0; std::vector<Instance_metadata> alt_members = md.get_replica_set_members(get_id(), &alt_view_id); if (alt_view_id <= view_id) continue; log_info("view_id at %s is %s, target %s is %s", m.endpoint.c_str(), std::to_string(alt_view_id).c_str(), primary->descr().c_str(), std::to_string(view_id).c_str()); for (const auto &am : alt_members) { if (!am.primary_master) continue; log_info("Primary according to target %s is %s", m.label.c_str(), am.label.c_str()); *out_new_primary = am; break; } return false; } catch (const std::exception &e) { log_warning("Could not connect to member %s: %s", m.endpoint.c_str(), e.what()); } } return true; }; // Auxiliary lambda function to find the primary. auto find_primary = [&]() { auto ipool = current_ipool(); try { std::shared_ptr<Instance> primary = ipool->connect_async_cluster_primary(get_id()); primary->steal(); m_primary_master = primary; m_metadata_storage = std::make_shared<MetadataStorage>(m_primary_master); ipool->set_metadata(m_metadata_storage); log_info("Connected to ReplicaSet PRIMARY instance '%s'", m_primary_master->descr().c_str()); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_ASYNC_MEMBER_INVALIDATED) { current_console()->print_error( get_name() + " was invalidated by a failover. Please " "repair it or remove it from the ReplicaSet."); } throw shcore::Exception(e.what(), SHERR_DBA_ASYNC_PRIMARY_UNAVAILABLE); } }; // Always search for the primary (no cache, since it might have changed). uint64_t view_id, new_view_id; std::string uuid, new_uuid; bool primary_found = false; // Make sure the primary did not changed while determining it and acquiring // the lock on it, avoiding any race condition. mysqlshdk::mysql::Lock_scoped plock; while (!primary_found) { // Get the primary info (before acquiring lock); m_metadata_storage->get_replica_set_primary_info(get_id(), &uuid, &view_id); find_primary(); // acquire the needed lock on the primary if (skip_lock_uuid.empty() || skip_lock_uuid != m_primary_master->get_uuid()) { if (mode == mysqlshdk::mysql::Lock_mode::SHARED) plock = m_primary_master->get_lock_shared(); else if (mode == mysqlshdk::mysql::Lock_mode::EXCLUSIVE) plock = m_primary_master->get_lock_exclusive(); } // Get the primary info again (after acquiring lock); m_metadata_storage->get_replica_set_primary_info(get_id(), &new_uuid, &new_view_id); if (uuid == new_uuid) { primary_found = true; continue; } // Primary changed while finding it and acquiring lock on it: // - Release any acquired lock on the old primary and try again. plock = nullptr; // forces a lock release (if any) } // ensure that the primary isn't invalidated Instance_metadata new_primary; if (!check_not_invalidated(m_primary_master.get(), &new_primary)) { // Throw an exception so that the error can bubble up all the way up to the // top of the stack. All assumptions made by the code path up to this point // were based on an invalidated members view of the replicaset, which // is no good. shcore::Exception exc("Target " + m_cluster_server->descr() + " was invalidated in a failover", SHERR_DBA_ASYNC_MEMBER_INVALIDATED); exc.error()->set("new_primary_endpoint", shcore::Value(new_primary.endpoint)); throw exc; } return {m_primary_master.get(), std::move(plock)}; } mysqlsh::dba::Instance *Replica_set_impl::acquire_primary(bool, bool) { // since acquire_primary_locked() has a lock mode NONE, to avoid duplicating // code, we can simply call it with NONE auto [instance, lock] = acquire_primary_locked(mysqlshdk::mysql::Lock_mode::NONE); assert(!lock); // make sure the lock is empty return instance; } Cluster_metadata Replica_set_impl::get_metadata() const { Cluster_metadata cmd; if (!get_metadata_storage()->get_cluster(get_id(), &cmd)) { throw shcore::Exception( "ReplicaSet metadata could not be loaded for " + get_name(), SHERR_DBA_METADATA_MISSING); } return cmd; } void Replica_set_impl::release_primary() { m_primary_master.reset(); } void Replica_set_impl::primary_instance_did_change( const std::shared_ptr<Instance> &new_primary) { if (m_primary_master) m_primary_master->release(); m_primary_master.reset(); if (new_primary) { m_primary_master = new_primary; new_primary->retain(); m_metadata_storage = std::make_shared<MetadataStorage>(new_primary); } } void Replica_set_impl::invalidate_handle() { if (m_primary_master) m_primary_master->release(); m_primary_master.reset(); } void Replica_set_impl::ensure_metadata_has_server_uuid( const mysqlsh::dba::Instance &instance) { const auto target_uuid = instance.get_uuid(); try { m_metadata_storage->get_instance_by_uuid(target_uuid); return; // uuid is in metadata } catch (const shcore::Exception &e) { if (e.code() != SHERR_DBA_MEMBER_METADATA_MISSING) throw; } log_info("Updating instance '%s' server UUID in metadata.", instance.descr().c_str()); Instance_metadata instance_md(query_instance_info(instance, false)); instance_md.cluster_id = get_id(); m_metadata_storage->update_instance(instance_md); } void Replica_set_impl::ensure_compatible_donor( const std::string &instance_def, mysqlshdk::mysql::IInstance *recipient) { /* * A donor is compatible if: * * - It's an ONLINE ReplicaSet member * - The target (recipient) and donor instances support clone (version * >= 8.0.17) * - It has the same version of the recipient * - It has the same operating system as the recipient */ const auto target = Scoped_instance(connect_target_instance(instance_def)); // Check if the target belongs to the ReplicaSet (MD) std::string target_address = target->get_canonical_address(); try { m_metadata_storage->get_instance_by_address(target_address); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_MEMBER_METADATA_MISSING) { throw shcore::Exception( "Instance " + target_address + " does not belong to the replicaset", SHERR_DBA_BADARG_INSTANCE_NOT_IN_CLUSTER); } throw; } // Check if the instance is ONLINE { auto topology_mng = setup_topology_manager(); auto topology_node = topology_mng->topology()->try_get_node_for_uuid(target->get_uuid()); if (!topology_node) topology_node = topology_mng->topology()->try_get_node_for_endpoint( target->get_canonical_address()); if (!topology_node) throw shcore::Exception( "Unable to find instance '" + target->descr() + "' in the topology.", SHERR_DBA_ASYNC_MEMBER_TOPOLOGY_MISSING); if (topology_node->status() != topology::Node_status::ONLINE) { throw shcore::Exception("Instance " + target_address + " is not an ONLINE member of the ReplicaSet.", SHERR_DBA_BADARG_INSTANCE_NOT_ONLINE); } } // Check if the instance support clone (the recipient was already checked) if (!supports_mysql_clone(target->get_version())) { throw shcore::Exception( "Instance " + target_address + " does not support MySQL Clone.", SHERR_DBA_CLONE_NO_SUPPORT); } // Check if the versions are compatible if (!Base_cluster_impl::verify_compatible_clone_versions( target->get_version(), recipient->get_version())) { throw shcore::Exception( shcore::str_format( "Instance '%s' cannot be a donor because its version (%s) isn't " "compatible with the recipient's (%s).", target_address.c_str(), target->get_version().get_full().c_str(), recipient->get_version().get_full().c_str()), SHERR_DBA_CLONE_DIFF_VERSION); } // Check if the instance has the same OS the recipient if (target->get_version_compile_os() != recipient->get_version_compile_os()) { throw shcore::Exception("Instance " + target_address + " cannot be a donor because it has a different " "Operating System than the recipient.", SHERR_DBA_CLONE_DIFF_OS); } // Check if the instance is running on the same platform of the recipient if (target->get_version_compile_machine() != recipient->get_version_compile_machine()) { throw shcore::Exception( "Instance " + target_address + " cannot be a donor because it is running on a different " "platform than the recipient.", SHERR_DBA_CLONE_DIFF_PLATFORM); } } std::string Replica_set_impl::pick_clone_donor( mysqlshdk::mysql::IInstance *recipient) { auto console = current_console(); std::string r; auto topology_mng = setup_topology_manager(); auto topology = topology_mng->topology(); // Get the ReplicaSet primary member const topology::Node *primary = topology->get_primary_master_node(); // Get the ReplicaSet secondary members std::list<const topology::Node *> secondaries = topology_mng->topology()->get_slave_nodes(primary); std::string full_msg; // By default, the donor must be an online secondary member. If not available, // then it must be the primary if (!secondaries.empty()) { for (const auto *s : secondaries) { if (recipient->get_uuid() != s->get_primary_member()->uuid) { std::string instance_def = s->get_primary_member()->endpoint; try { if (mysqlshdk::utils::Net::is_ipv6( mysqlshdk::utils::split_host_and_port(instance_def).first)) throw shcore::Exception::runtime_error( "Instance hostname/report_host is an IPv6 address, which is " "not supported for cloning"); ensure_compatible_donor(instance_def, recipient); r = instance_def; // No need to continue looking for more break; } catch (const shcore::Exception &e) { std::string msg = "SECONDARY '" + instance_def + "' is not a suitable clone donor: " + e.what(); log_info("%s", msg.c_str()); full_msg += msg + "\n"; continue; } } } } // If no secondary is suitable, use the primary if (r.empty()) { std::string instance_def = primary->get_primary_member()->endpoint; try { if (mysqlshdk::utils::Net::is_ipv6( mysqlshdk::utils::split_host_and_port(instance_def).first)) throw shcore::Exception::runtime_error( "Instance hostname/report_host is an IPv6 address, which is " "not supported for cloning"); ensure_compatible_donor(instance_def, recipient); r = instance_def; } catch (const shcore::Exception &e) { std::string msg = "PRIMARY '" + instance_def + "' is not a suitable clone donor: " + e.what(); log_info("%s", msg.c_str()); full_msg += msg + "\n"; } } // If nobody is compatible... if (r.empty()) { console->print_error( "None of the members in the replicaSet are compatible to be used as " "clone donors for " + recipient->descr()); console->print_info(full_msg); throw shcore::Exception("The ReplicaSet has no compatible clone donors.", SHERR_DBA_CLONE_NO_DONORS); } return r; } void Replica_set_impl::revert_topology_changes( mysqlshdk::mysql::IInstance *target_server, bool remove_user, bool dry_run) { auto console = current_console(); // revert changes by deleting the account we created for it and // clearing replication console->print_info("Reverting topology changes..."); try { if (remove_user) { drop_replication_user(target_server->get_uuid(), target_server); } async_remove_replica(target_server, k_replicaset_channel_name, dry_run); } catch (const std::exception &e) { console->print_error( std::string("Error while reverting replication changes: ") + format_active_exception()); } } void Replica_set_impl::handle_clone( const std::shared_ptr<mysqlsh::dba::Instance> &recipient, const Clone_options &clone_options, const Async_replication_options &ar_options, const std::string &repl_account_host, const std::string &repl_account_cert_subject, const Recovery_progress_style &progress_style, int sync_timeout, bool dry_run) { auto console = current_console(); /* * Clone handling: * * 1. Pick a valid donor (unless cloneDonor is set). By default, the donor * must be an ONLINE SECONDARY member. If not available, then must be the * PRIMARY. * 2. Install the Clone plugin on the donor and recipient (if not installed * already) * 3. Set the donor to the selected donor: SET GLOBAL clone_valid_donor_list * = "donor_host:donor_port"; * 4. Create or update a recovery account with the required * privileges for replicaSets management + clone usage (BACKUP_ADMIN) * 5. Ensure the donor's recovery account has the clone usage required * privileges: BACKUP_ADMIN * 6. Grant the CLONE_ADMIN privilege to the recovery account * at the recipient * 7. Create the SQL clone command based on the above * 8. Execute the clone command */ // Pick a valid donor std::string donor; if (clone_options.clone_donor.has_value()) { ensure_compatible_donor(*clone_options.clone_donor, recipient.get()); donor = *clone_options.clone_donor; } else { donor = pick_clone_donor(recipient.get()); } // Install the clone plugin on the recipient and donor const auto donor_instance = Scoped_instance(connect_target_instance(donor)); log_info("Installing the clone plugin on donor '%s'%s.", donor_instance.get()->get_canonical_address().c_str(), dry_run ? " (dryRun)" : ""); if (!dry_run) { mysqlshdk::mysql::install_clone_plugin(*donor_instance, nullptr); } log_info("Installing the clone plugin on recipient '%s'%s.", recipient->get_canonical_address().c_str(), dry_run ? " (dryRun)" : ""); if (!dry_run) { mysqlshdk::mysql::install_clone_plugin(*recipient, nullptr); } // Set the donor to the selected donor on the recipient if (!dry_run) { recipient->set_sysvar("clone_valid_donor_list", donor); } // Create or update a recovery account with the required privileges for // replicaSets management + clone usage (BACKUP_ADMIN) on the recipient // Check if super_read_only is enabled. If so it must be disabled to create // the account if (recipient->get_sysvar_bool("super_read_only", false)) { recipient->set_sysvar("super_read_only", false); } // Clone requires a user in both donor and recipient: // // On the donor, the clone user requires the BACKUP_ADMIN privilege for // accessing and transferring data from the donor, and for blocking DDL // during the cloning operation. // // On the recipient, the clone user requires the CLONE_ADMIN privilege for // replacing recipient data, blocking DDL during the cloning operation, and // automatically restarting the server. The CLONE_ADMIN privilege includes // BACKUP_ADMIN and SHUTDOWN privileges implicitly. // // For that reason, we create a user in the recipient with the same username // and password as the replication user created in the donor. create_clone_recovery_user_nobinlog( recipient.get(), *ar_options.repl_credentials, repl_account_host, query_cluster_auth_cert_issuer(), repl_account_cert_subject, dry_run); if (!dry_run) { // Ensure the donor's recovery account has the clone usage required // privileges: BACKUP_ADMIN get_primary_master()->executef("GRANT BACKUP_ADMIN ON *.* TO ?@?", ar_options.repl_credentials->user, repl_account_host); // If the donor instance is processing transactions, it may have the // clone-user handling (create + grant) still in the backlog waiting to be // applied. For that reason, we must wait for it to be in sync with the // primary before starting the clone itself (BUG#30628746) std::string primary_address = get_primary_master()->get_canonical_address(); std::string donor_address = donor_instance->get_canonical_address(); if (!mysqlshdk::utils::are_endpoints_equal(primary_address, donor_address)) { console->print_info( "* Waiting for the donor to synchronize with PRIMARY..."); if (!dry_run) { try { // Sync the donor with the primary sync_transactions(*donor_instance, {k_replicaset_channel_name}, sync_timeout); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_GTID_SYNC_TIMEOUT) { console->print_error( "The donor instance failed to synchronize its transaction set " "with the PRIMARY."); } throw; } catch (const cancel_sync &) { // Throw it up throw; } } console->print_info(); } // Create a new connection to the recipient and run clone asynchronously std::string instance_def = recipient->get_connection_options().uri_endpoint(); const auto recipient_clone = Scoped_instance(connect_target_instance(instance_def)); mysqlshdk::db::Connection_options clone_donor_opts(donor); // we need a point in time as close as possible, but still earlier than // when recovery starts to monitor the recovery phase. The timestamp // resolution is timestamp(3) irrespective of platform std::string begin_time = recipient->queryf_one_string(0, "", "SELECT NOW(3)"); std::exception_ptr error_ptr; auto clone_thread = spawn_scoped_thread([&recipient_clone, clone_donor_opts, ar_options, &error_ptr] { mysqlsh::thread_init(); bool enabled_ssl{false}; switch (ar_options.auth_type) { case Replication_auth_type::CERT_ISSUER: case Replication_auth_type::CERT_SUBJECT: case Replication_auth_type::CERT_ISSUER_PASSWORD: case Replication_auth_type::CERT_SUBJECT_PASSWORD: enabled_ssl = true; break; default: break; } try { mysqlshdk::mysql::do_clone(recipient_clone, clone_donor_opts, *ar_options.repl_credentials, enabled_ssl); } catch (const shcore::Error &err) { // Clone canceled if (err.code() == ER_QUERY_INTERRUPTED) { log_info("Clone canceled: %s", err.format().c_str()); } else { log_info("Error cloning from instance '%s': %s", clone_donor_opts.uri_endpoint().c_str(), err.format().c_str()); error_ptr = std::current_exception(); } } catch (const std::exception &err) { log_info("Error cloning from instance '%s': %s", clone_donor_opts.uri_endpoint().c_str(), err.what()); error_ptr = std::current_exception(); } mysqlsh::thread_end(); }); shcore::Scoped_callback join([&clone_thread, error_ptr]() { if (clone_thread.joinable()) clone_thread.join(); }); try { auto post_clone_auth = recipient->get_connection_options(); post_clone_auth.set_login_options_from( donor_instance->get_connection_options()); monitor_standalone_clone_instance( recipient->get_connection_options(), post_clone_auth, begin_time, progress_style, k_clone_start_timeout, current_shell_options()->get().dba_restart_wait_timeout); // When clone is used, the target instance will restart and all // connections are closed so we need to test if the connection to the // target instance and MD are closed and re-open if necessary recipient->reconnect_if_needed("Target"); m_metadata_storage->get_md_server()->reconnect_if_needed("Metadata"); // Remove the BACKUP_ADMIN grant from the recovery account get_primary_master()->executef("REVOKE BACKUP_ADMIN ON *.* FROM ?@?", ar_options.repl_credentials->user, repl_account_host); } catch (const stop_monitoring &) { console->print_info(); console->print_note("Recovery process canceled. Reverting changes..."); // Cancel the clone query mysqlshdk::mysql::cancel_clone(*recipient); log_info("Clone canceled."); log_debug("Waiting for clone thread..."); // wait for the clone thread to finish clone_thread.join(); log_debug("Clone thread joined"); // When clone is canceled, the target instance will restart and all // connections are closed so we need to wait for the server to start-up // and re-establish the session. Also we need to test if the connection // to the target instance and MD are closed and re-open if necessary *recipient = *wait_server_startup( recipient->get_connection_options(), mysqlshdk::mysql::k_server_recovery_restart_timeout, Recovery_progress_style::NOWAIT); recipient->reconnect_if_needed("Target"); m_metadata_storage->get_md_server()->reconnect_if_needed("Metadata"); // Remove the BACKUP_ADMIN grant from the recovery account get_primary_master()->executef("REVOKE BACKUP_ADMIN ON *.* FROM ?@?", ar_options.repl_credentials->user, repl_account_host); // XXX do a full simple test with replAllowedHost option for all topos cleanup_clone_recovery(recipient.get(), *ar_options.repl_credentials, repl_account_host); // Thrown the exception cancel_sync up throw cancel_sync(); } catch (const restart_timeout &) { console->print_warning( "Clone process appears to have finished and tried to restart the " "MySQL server, but it has not yet started back up."); console->print_info(); console->print_info( "Please make sure the MySQL server at '" + recipient->descr() + "' is properly restarted. The operation will be reverted, but you may" " retry adding the instance after restarting it. "); throw shcore::Exception("Timeout waiting for server to restart", SHERR_DBA_SERVER_RESTART_TIMEOUT); } catch (const shcore::Error &e) { throw shcore::Exception::mysql_error_with_code(e.what(), e.code()); } } } Member_recovery_method Replica_set_impl::validate_instance_recovery( Member_op_action op_action, mysqlshdk::mysql::IInstance *donor_instance, mysqlshdk::mysql::IInstance *target_instance, Member_recovery_method opt_recovery_method, bool gtid_set_is_complete, bool interactive) { auto check_recoverable = [donor_instance](mysqlshdk::mysql::IInstance *tgt_instance) { // Get the gtid state in regards to the donor mysqlshdk::mysql::Replica_gtid_state state = mysqlshdk::mysql::check_replica_gtid_state( *donor_instance, *tgt_instance, nullptr, nullptr); if (state != mysqlshdk::mysql::Replica_gtid_state::IRRECOVERABLE) return true; else return false; }; Member_recovery_method recovery_method = mysqlsh::dba::validate_instance_recovery( Cluster_type::ASYNC_REPLICATION, op_action, donor_instance, target_instance, check_recoverable, opt_recovery_method, gtid_set_is_complete, interactive); return recovery_method; } Instance_id Replica_set_impl::manage_instance( Instance *instance, const std::pair<std::string, std::string> &repl_user, const std::string &instance_label, Instance_id master_id, bool is_primary) { MetadataStorage::Transaction trx(get_metadata_storage()); Instance_metadata inst = query_instance_info(*instance, false); if (!instance_label.empty()) inst.label = instance_label; inst.cluster_id = get_id(); if (master_id > 0) { inst.master_id = master_id; } inst.primary_master = is_primary; auto instance_id = get_metadata_storage()->record_async_member_added(inst); if (!repl_user.first.empty()) get_metadata_storage()->update_instance_repl_account( inst.uuid, Cluster_type::ASYNC_REPLICATION, repl_user.first, repl_user.second); trx.commit(); return instance_id; } const topology::Server *Replica_set_impl::check_target_member( topology::Server_global_topology *topology, const std::string &instance_def) { auto console = current_console(); // we can't print instance_def directly because it may contain credentials std::string instance_label = Connection_options(instance_def).uri_endpoint(); const auto target_instance = Scoped_instance(connect_target_instance(instance_def)); try { return topology->get_server(target_instance->get_uuid()); } catch (const shcore::Exception &e) { log_warning("%s: %s", instance_label.c_str(), e.format().c_str()); if (e.code() == SHERR_DBA_CLUSTER_METADATA_MISSING) { throw shcore::Exception( "Target instance " + instance_label + " is not a managed instance.", SHERR_DBA_BADARG_INSTANCE_NOT_MANAGED); } throw; } } std::list<std::shared_ptr<Instance>> Replica_set_impl::connect_all_members( uint32_t read_timeout, bool skip_primary, std::list<Instance_metadata> *out_unreachable) { std::vector<Instance_metadata> instances = get_metadata_storage()->get_all_instances(get_id()); auto console = current_console(); std::list<std::shared_ptr<Instance>> r; auto ipool = current_ipool(); for (const auto &i : instances) { if ((i.primary_master && skip_primary) || i.invalidated) continue; try { try { mysqlshdk::db::Connection_options opts(i.endpoint); ipool->default_auth_opts().set(&opts); // The read timeout will allow commands that block at the server but // have no server-side timeouts to not block the shell indefinitely. if (read_timeout > 0) opts.set(mysqlshdk::db::kNetReadTimeout, {std::to_string(read_timeout * 1000)}); console->print_info("** Connecting to " + i.label); r.emplace_back(ipool->connect_unchecked(opts)); } CATCH_AND_THROW_CONNECTION_ERROR(i.endpoint) } catch (const shcore::Exception &e) { // Client errors are likely because the server is unreachable/crashed if (e.is_mysql() && mysqlshdk::db::is_mysql_client_error(e.code())) { if (out_unreachable) out_unreachable->push_back(i); if (i.primary_master) console->print_warning("Could not connect to PRIMARY instance: " + e.format()); else console->print_warning("Could not connect to SECONDARY instance: " + e.format()); } else { throw; } } } return r; } /** * Check for replication applier errors on all online instances. * * NOTE: If invalidate_error_instances is true, then instances with applier * errors or stopped will be skipped (removed from the instances lists) * and their ID added to the invalidated list. * * @param srv_topology Server_global_topology object with initial information * about the status of each instance. * @param out_online_instances List of online instances. * @param invalidate_error_instances Boolean indicate if instances with errors * must be invalidated. * @param out_instances_md Vector with the instances metadata information. * @param out_invalidate_ids List with the IDs of the invalidated instances. */ void Replica_set_impl::check_replication_applier_errors( topology::Server_global_topology *srv_topology, std::list<std::shared_ptr<Instance>> *out_online_instances, bool invalidate_error_instances, std::vector<Instance_metadata> *out_instances_md, std::list<Instance_id> *out_invalidate_ids) const { auto console = current_console(); std::list<std::string> error_uuids; for (const auto &instance : *out_online_instances) { // Get the matching instance info from the Server Global Topology. auto srv_info = srv_topology->get_server(instance->get_uuid())->get_primary_member(); if (!srv_info->master_channel) continue; // Get the applier status. // NOTE: Ignore receiver status since it is expected to have errors // if the primary failed and some expected receiver status // (e.g., CONNECTION_ERROR) have precedence over other applier // status (e.g., APPLIER_OFF). mysqlshdk::mysql::Replication_channel channel = srv_info->master_channel->info; mysqlshdk::mysql::Replication_channel::Status applier_status = channel.applier_status(); // Issue error if the replication applier is stopped or has errors. if (applier_status == mysqlshdk::mysql::Replication_channel::Status::APPLIER_OFF) { std::string err_msg{"Replication applier is OFF at instance " + srv_info->label + "."}; if (!invalidate_error_instances) { console->print_error(err_msg); } else { log_warning("%s", err_msg.c_str()); console->print_note( srv_info->label + " will be invalidated (replication applier is OFF) and must be " "fixed or removed from the replicaset."); } error_uuids.push_back(srv_info->uuid); } else if (applier_status == mysqlshdk::mysql::Replication_channel::Status::APPLIER_ERROR) { for (const auto &applier : channel.appliers) { if (applier.last_error.code == 0) continue; std::string err_msg{"Replication applier error at " + srv_info->label + ": " + mysqlshdk::mysql::to_string(applier.last_error)}; if (!invalidate_error_instances) { current_console()->print_error(err_msg); } else { log_warning("%s", err_msg.c_str()); console->print_note( srv_info->label + " will be invalidated (replication applier error) and must " "be fixed or removed from the replicaset."); } } error_uuids.push_back(srv_info->uuid); } } if (error_uuids.empty()) return; if (!invalidate_error_instances) { console->print_error( "Replication errors found for one or more SECONDARY instances. Use " "the 'invalidateErrorInstances' option to perform the failover " "anyway by skipping and invalidating instances with errors."); throw shcore::Exception( "One or more instances have replication applier errors.", SHERR_DBA_REPLICATION_APPLIER_ERROR); } console->print_info(); // Update instances lists according to invalidated instances. for (const auto &uuid : error_uuids) { // Add instance to invalidate to list. auto it = std::find_if( out_instances_md->begin(), out_instances_md->end(), [&uuid](const Instance_metadata &i_md) { return i_md.uuid == uuid; }); if (it != out_instances_md->end()) { out_invalidate_ids->push_back(it->id); } // Remove instance to invalidate from lists. out_online_instances->remove_if( [&uuid](const std::shared_ptr<Instance> &i) { return i->get_uuid() == uuid; }); out_instances_md->erase(it); } } shcore::Value Replica_set_impl::list_routers(bool only_upgrade_required) { try { check_preconditions("listRouters"); } catch (const shcore::Exception &e) { // The primary might not be available, but listRouters() must work anyway if (e.code() == SHERR_DBA_ASYNC_PRIMARY_UNAVAILABLE) { current_console()->print_warning(e.format()); } else { throw; } } shcore::Value r = Base_cluster_impl::list_routers(only_upgrade_required); (*r.as_map())["replicaSetName"] = shcore::Value(get_name()); return r; } void Replica_set_impl::remove_router_metadata(const std::string &router) { check_preconditions_and_primary_availability("removeRouterMetadata"); bool interactive = current_shell_options()->get().wizards; // Initialized Instance pool with the metadata from the current session. Instance_pool::Auth_options auth_opts; auth_opts.get(get_cluster_server()->get_connection_options()); Scoped_instance_pool ipool(interactive, auth_opts); ipool->set_metadata(get_metadata_storage()); // Acquire a shared lock on the primary. The metadata instance (primary) // can be "shared" by other operations executing concurrently on other // instances. auto plock = get_primary_master()->get_lock_shared(); Base_cluster_impl::remove_router_metadata(router, true); } void Replica_set_impl::setup_admin_account( const std::string &username, const std::string &host, const Setup_account_options &options) { check_preconditions_and_primary_availability("setupAdminAccount"); Base_cluster_impl::setup_admin_account(username, host, options); } void Replica_set_impl::setup_router_account( const std::string &username, const std::string &host, const Setup_account_options &options) { check_preconditions_and_primary_availability("setupRouterAccount"); Base_cluster_impl::setup_router_account(username, host, options); } std::pair<mysqlshdk::mysql::Auth_options, std::string> Replica_set_impl::refresh_replication_user(mysqlshdk::mysql::IInstance *slave, bool dry_run) { assert(m_primary_master); mysqlshdk::mysql::Auth_options creds; auto account = get_metadata_storage()->get_instance_repl_account( slave->get_uuid(), Cluster_type::ASYNC_REPLICATION); if (account.first.empty()) { account = {make_replication_user_name(slave->get_server_id(), k_async_cluster_user_name), "%"}; } creds.user = account.first; try { // Create replication accounts for this instance at the master // replicaset unless the user provided one. auto console = mysqlsh::current_console(); log_info("Resetting password for %s@%s at %s", creds.user.c_str(), account.second.c_str(), m_primary_master->descr().c_str()); // re-create replication with a new generated password if (!dry_run) { std::string repl_password; mysqlshdk::mysql::set_random_password(*m_primary_master, creds.user, {account.second}, &repl_password); creds.password = repl_password; } } catch (const std::exception &e) { throw shcore::Exception::runtime_error(shcore::str_format( "Error while resetting password for replication account: %s", e.what())); } return {creds, account.second}; } void Replica_set_impl::drop_replication_user( const std::string &server_uuid, mysqlshdk::mysql::IInstance *slave) { assert(m_primary_master); auto account = get_metadata_storage()->get_instance_repl_account( server_uuid, Cluster_type::ASYNC_REPLICATION); if (account.first.empty()) { if (slave) { account = {make_replication_user_name(slave->get_server_id(), k_async_cluster_user_name), ""}; } else { // If the instance is unreachable and the replication account info is not // stored in the MD schema we cannot attempt to obtain the instance's // server_id to determine the account username. In this particular // scenario, we must just log that the account couldn't be removed log_info( "Unable to drop instance's replication account from the ReplicaSet: " "Instance '%s' is unreachable, unable to determine its replication " "account.", server_uuid.c_str()); } } log_info("Dropping account %s@%s at %s", account.first.c_str(), account.second.c_str(), m_primary_master->descr().c_str()); try { if (account.second.empty()) mysqlshdk::mysql::drop_all_accounts_for_user(*m_primary_master, account.first); else m_primary_master->drop_user(account.first, account.second, true); } catch (const shcore::Error &e) { auto console = current_console(); console->print_warning(shcore::str_format( "%s: Error dropping account %s@%s: %s", m_primary_master->descr().c_str(), account.first.c_str(), account.second.c_str(), e.format().c_str())); // ignore the error and move on } } std::pair<mysqlshdk::mysql::Auth_options, std::string> Replica_set_impl::create_replication_user(mysqlshdk::mysql::IInstance *slave, std::string_view auth_cert_subject, bool dry_run, mysqlshdk::mysql::IInstance *master) { if (!master) master = m_primary_master.get(); mysqlshdk::mysql::Auth_options creds; std::string host = "%"; if (shcore::Value allowed_host; !dry_run && get_metadata_storage()->query_cluster_attribute( get_id(), k_cluster_attribute_replication_allowed_host, &allowed_host) && allowed_host.type == shcore::String && !allowed_host.as_string().empty()) { host = allowed_host.as_string(); } creds.user = make_replication_user_name(slave->get_server_id(), k_async_cluster_user_name); try { // Create replication accounts for this instance at the master // replicaset unless the user provided one. // Accounts are created at the master replicaset regardless of who will use // them, since they'll get replicated everywhere. log_info("Creating replication user %s@%s at %s%s", creds.user.c_str(), host.c_str(), master->descr().c_str(), dry_run ? " (dryRun)" : ""); if (dry_run) return {creds, host}; // re-create replication with a new generated password // Check if the replication user already exists and delete it if it does, // before creating it again. mysqlshdk::mysql::drop_all_accounts_for_user(*master, creds.user); mysqlshdk::mysql::IInstance::Create_user_options user_options; user_options.grants.push_back({"REPLICATION SLAVE", "*.*", false}); auto auth_type = query_cluster_auth_type(); // setup password bool requires_password{false}; switch (auth_type) { case Replication_auth_type::PASSWORD: case Replication_auth_type::CERT_ISSUER_PASSWORD: case Replication_auth_type::CERT_SUBJECT_PASSWORD: requires_password = true; break; default: break; } // setup cert issuer and/or subject switch (auth_type) { case Replication_auth_type::CERT_SUBJECT: case Replication_auth_type::CERT_SUBJECT_PASSWORD: user_options.cert_subject = auth_cert_subject; [[fallthrough]]; case Replication_auth_type::CERT_ISSUER: case Replication_auth_type::CERT_ISSUER_PASSWORD: user_options.cert_issuer = query_cluster_auth_cert_issuer(); break; default: break; } if (requires_password) { std::string repl_password; mysqlshdk::mysql::create_user_with_random_password( *master, creds.user, {host}, user_options, &repl_password); creds.password = std::move(repl_password); } else { mysqlshdk::mysql::create_user(*master, creds.user, {host}, user_options); } } catch (const std::exception &e) { throw shcore::Exception::runtime_error(shcore::str_format( "Error while setting up replication account: %s", e.what())); } return {creds, host}; } shcore::Dictionary_t Replica_set_impl::get_topology_options() { shcore::Dictionary_t ret = shcore::make_dict(); // Get the topology std::unique_ptr<topology::Global_topology> topo( topology::scan_global_topology(get_metadata_storage().get(), get_metadata(), k_replicaset_channel_name, true)); for (const topology::Server &server : static_cast<topology::Server_global_topology *>(topo.get())->servers()) { const topology::Instance *instance = server.get_primary_member(); if (instance->connect_error.empty()) { shcore::Array_t array = shcore::make_array(); // Get the parallel-appliers options for (const auto &[name, value] : instance->parallel_appliers) { shcore::Dictionary_t option = shcore::make_dict(); (*option)["variable"] = shcore::Value(name); (*option)["value"] = value.has_value() ? shcore::Value(*value) : shcore::Value::Null(); array->push_back(shcore::Value(std::move(option))); } // cert_subject { auto cert_subject = query_cluster_instance_auth_cert_subject(instance->uuid); shcore::Dictionary_t option = shcore::make_dict(); (*option)["option"] = shcore::Value(kCertSubject); (*option)["value"] = shcore::Value(std::move(cert_subject)); array->push_back(shcore::Value(std::move(option))); } ret->set(server.label, shcore::Value(array)); } else { shcore::Dictionary_t connect_error = shcore::make_dict(); (*connect_error)["connectError"] = shcore::Value(instance->connect_error); ret->set(server.label, shcore::Value(std::move(connect_error))); } } return ret; } shcore::Value Replica_set_impl::options() { try { check_preconditions("options"); } catch (const shcore::Exception &e) { // The primary might not be available, but options() must work anyway if (e.code() != SHERR_DBA_ASYNC_PRIMARY_UNAVAILABLE) throw; current_console()->print_warning(e.format()); } shcore::Dictionary_t inner_dict = shcore::make_dict(); (*inner_dict)["name"] = shcore::Value(get_name()); // get the topology options (*inner_dict)["topology"] = shcore::Value(get_topology_options()); // Get the instance options // get the tags (*inner_dict)[kTags] = Base_cluster_impl::get_cluster_tags(); // read replica attributes { std::array<std::tuple<std::string_view, std::string_view>, 3> attribs{ std::make_tuple(k_cluster_attribute_replication_allowed_host, kReplicationAllowedHost), std::make_tuple(k_cluster_attribute_member_auth_type, kMemberAuthType), std::make_tuple(k_cluster_attribute_cert_issuer, kCertIssuer)}; shcore::Array_t options = shcore::make_array(); for (const auto &[attrib_name, attrib_desc] : attribs) { shcore::Value attrib_value; if (!get_metadata_storage()->query_cluster_attribute( get_id(), attrib_name, &attrib_value)) attrib_value = shcore::Value::Null(); options->emplace_back( shcore::Value(shcore::make_dict("option", shcore::Value(attrib_desc), "value", std::move(attrib_value)))); } (*inner_dict)["globalOptions"] = shcore::Value(std::move(options)); } shcore::Dictionary_t res = shcore::make_dict(); (*res)["replicaSet"] = shcore::Value(std::move(inner_dict)); return shcore::Value(std::move(res)); } void Replica_set_impl::_set_instance_option( const std::string & /*instance_def*/, const std::string &option, const shcore::Value & /*value*/) { throw shcore::Exception::argument_error("Option '" + option + "' not supported."); } void Replica_set_impl::update_replication_allowed_host( const std::string &host) { for (const Instance_metadata &instance : get_instances_from_metadata()) { auto account = get_metadata_storage()->get_instance_repl_account( instance.uuid, Cluster_type::ASYNC_REPLICATION); bool maybe_adopted = false; if (account.first.empty()) { account = {make_replication_user_name(instance.server_id, k_async_cluster_user_name), "%"}; maybe_adopted = true; } if (account.second != host) { log_info("Re-creating account for %s: %s@%s -> %s@%s", instance.endpoint.c_str(), account.first.c_str(), account.second.c_str(), account.first.c_str(), host.c_str()); clone_user(*get_primary_master(), account.first, account.second, account.first, host); get_primary_master()->drop_user(account.first, account.second, true); get_metadata_storage()->update_instance_repl_account( instance.uuid, Cluster_type::ASYNC_REPLICATION, account.first, host); } else { log_info("Skipping account recreation for %s: %s@%s == %s@%s", instance.endpoint.c_str(), account.first.c_str(), account.second.c_str(), account.first.c_str(), host.c_str()); } if (maybe_adopted) { auto ipool = current_ipool(); Scoped_instance target( ipool->connect_unchecked_endpoint(instance.endpoint)); // If the replicaset could have been adopted, then also ensure the channel // is using the right account std::string repl_user = mysqlshdk::mysql::get_replication_user( *target, k_replicaset_channel_name); if (repl_user != account.first && !instance.primary_master) { current_console()->print_info(shcore::str_format( "Changing replication user at %s to %s", instance.endpoint.c_str(), account.first.c_str())); std::string password; mysqlshdk::mysql::set_random_password(*get_primary_master(), account.first, {host}, &password); mysqlshdk::mysql::stop_replication_receiver(*target, k_replicaset_channel_name); mysqlshdk::mysql::Replication_credentials_options options; options.password = password; mysqlshdk::mysql::change_replication_credentials( *target, k_replicaset_channel_name, account.first, options); mysqlshdk::mysql::start_replication_receiver(*target, k_replicaset_channel_name); } } } } void Replica_set_impl::_set_option(const std::string &option, const shcore::Value &value) { if (option == kReplicationAllowedHost) { if (value.type != shcore::String || value.as_string().empty()) throw shcore::Exception::argument_error( shcore::str_format("Invalid value for '%s': Argument #2 is expected " "to be a string.", option.c_str())); update_replication_allowed_host(value.as_string()); get_metadata_storage()->update_cluster_attribute( get_id(), k_cluster_attribute_replication_allowed_host, value); current_console()->print_info(shcore::str_format( "Internally managed replication users updated for ReplicaSet '%s'", get_name().c_str())); } else { throw shcore::Exception::argument_error("Option '" + option + "' not supported."); } } void Replica_set_impl::check_preconditions_and_primary_availability( const std::string &function_name, bool throw_if_primary_unavailable) { try { check_preconditions(function_name); } catch (const shcore::Exception &e) { if (e.code() == SHERR_DBA_ASYNC_PRIMARY_UNAVAILABLE) { auto console = mysqlsh::current_console(); std::string err = "Unable to connect to the PRIMARY of the ReplicaSet " + get_name() + ": " + e.format(); if (throw_if_primary_unavailable) { console->print_error(err); } else { console->print_warning(err); } console->print_info( "Cluster change operations will not be possible unless the PRIMARY " "can be reached."); console->print_info( "If the PRIMARY is unavailable, you must either repair it or " "perform a forced failover."); console->print_info( "See \\help <<<forcePrimaryInstance>>> for more information."); if (throw_if_primary_unavailable) { throw shcore::Exception("PRIMARY instance is unavailable", e.code()); } } else { throw; } } } } // namespace dba } // namespace mysqlsh