in modules/adminapi/cluster_set/cluster_set_impl.cc [2143:2422]
void Cluster_set_impl::set_primary_cluster(
const std::string &cluster_name,
const clusterset::Set_primary_cluster_options &options) {
check_preconditions("setPrimaryCluster");
// put an exclusive lock on the clusterset
mysqlshdk::mysql::Lock_scoped_list api_locks;
api_locks.push_back(get_lock_exclusive());
auto console = current_console();
console->print_info("Switching the primary cluster of the clusterset to '" +
cluster_name + "'");
if (options.dry_run)
console->print_note("dryRun enabled, no changes will be made");
auto primary = get_primary_master();
shcore::on_leave_scope release_primary_finally(
[this]() { release_primary(); });
auto primary_cluster = get_primary_cluster();
std::shared_ptr<Cluster_impl> promoted_cluster;
try {
promoted_cluster = get_cluster(cluster_name);
} catch (const shcore::Exception &e) {
console->print_error("Cluster '" + cluster_name +
"' cannot be promoted: " + e.format());
throw;
}
if (promoted_cluster == primary_cluster) {
throw shcore::Exception(
"Cluster '" + cluster_name + "' is already the PRIMARY cluster",
SHERR_DBA_TARGET_ALREADY_PRIMARY);
}
auto promoted = promoted_cluster->get_cluster_server();
log_info(
"Switchover primary_cluster=%s primary_instance=%s "
"promoted_cluster=%s promoted_instance=%s",
primary_cluster->get_name().c_str(), primary->descr().c_str(),
promoted_cluster->get_name().c_str(), promoted->descr().c_str());
// if there any unreachable clusters we collect them in this list, which will
// be used by connect_all_clusters() for skipping clusters
std::list<Cluster_id> unreachable;
console->print_info("* Verifying clusterset status");
check_clusters_available(options.invalidate_replica_clusters, &unreachable);
std::list<std::shared_ptr<Cluster_impl>> clusters(
connect_all_clusters(0, false, &unreachable));
shcore::on_leave_scope release_cluster_primaries([&clusters] {
for (auto &c : clusters) c->release_primary();
});
// put an exclusive lock on each reachable cluster
for (const auto &cluster : clusters)
api_locks.push_back(cluster->get_lock_exclusive());
// another set of connections for locks
// get triggered before server-side timeouts
std::list<std::shared_ptr<Cluster_impl>> lock_clusters(
connect_all_clusters(options.timeout + 5, false, &unreachable));
auto release_lock_primaries =
shcore::on_leave_scope([this, &lock_clusters, options] {
for (auto &c : lock_clusters) {
if (!options.dry_run && options.timeout > 0) {
c->get_cluster_server()->set_sysvar_default(
"lock_wait_timeout", mysqlshdk::mysql::Var_qualifier::SESSION);
}
c->release_primary();
}
get_primary_master()->set_sysvar_default(
"lock_wait_timeout", mysqlshdk::mysql::Var_qualifier::SESSION);
});
std::list<std::shared_ptr<Instance>> lock_instances;
for (auto &c : lock_clusters) {
if (!options.dry_run && options.timeout > 0) {
c->get_cluster_server()->set_sysvar(
"lock_wait_timeout", static_cast<int64_t>(options.timeout),
mysqlshdk::mysql::Var_qualifier::SESSION);
}
lock_instances.emplace_back(c->get_primary_master());
}
get_primary_master()->set_sysvar("lock_wait_timeout",
static_cast<int64_t>(options.timeout),
mysqlshdk::mysql::Var_qualifier::SESSION);
get_metadata_storage()->get_md_server()->set_sysvar(
"lock_wait_timeout", static_cast<int64_t>(options.timeout),
mysqlshdk::mysql::Var_qualifier::SESSION);
console->print_info();
// make sure that all instances have the most up-to-date GTID
console->print_info(
"** Waiting for the promoted cluster to apply pending received "
"transactions...");
if (!options.dry_run) {
mysqlshdk::mysql::Replication_channel channel;
if (get_channel_status(*promoted, mysqlshdk::gr::k_gr_applier_channel,
&channel)) {
switch (channel.status()) {
case mysqlshdk::mysql::Replication_channel::OFF:
case mysqlshdk::mysql::Replication_channel::APPLIER_OFF:
case mysqlshdk::mysql::Replication_channel::APPLIER_ERROR:
break;
default: {
auto timeout =
(options.timeout <= 0)
? options.timeout
: current_shell_options()->get().dba_gtid_wait_timeout;
wait_for_apply_retrieved_trx(*promoted,
mysqlshdk::gr::k_gr_applier_channel,
std::chrono::seconds{timeout}, true);
}
}
}
}
ensure_transaction_set_consistent_and_recoverable(
promoted.get(), primary.get(), primary_cluster.get(), false,
options.dry_run, nullptr);
// Get the current settings for the ClusterSet replication channel
Async_replication_options ar_options = get_clusterset_replication_options();
console->print_info("* Refreshing replication account of demoted cluster");
// Re-generate a new password for the master being demoted.
ar_options.repl_credentials =
refresh_cluster_replication_user(primary_cluster.get(), options.dry_run);
console->print_info("* Synchronizing transaction backlog at " +
promoted->descr());
if (!options.dry_run) {
sync_transactions(*promoted, k_clusterset_async_channel_name,
options.timeout);
}
console->print_info();
// Restore the transaction_size_limit value to the original one
restore_transaction_size_limit(promoted_cluster.get(), options.dry_run);
Undo_tracker undo_tracker;
try {
console->print_info("* Updating metadata");
// Update the metadata with the state the replicaset is supposed to be in
log_info("Updating metadata at %s",
m_metadata_storage->get_md_server()->descr().c_str());
if (!options.dry_run) {
{
MetadataStorage::Transaction trx(m_metadata_storage);
m_metadata_storage->record_cluster_set_primary_switch(
get_id(), promoted_cluster->get_id(), unreachable);
trx.commit();
}
undo_tracker.add("", [=]() {
MetadataStorage::Transaction trx(m_metadata_storage);
m_metadata_storage->record_cluster_set_primary_switch(
get_id(), primary_cluster->get_id(), {});
trx.commit();
});
for (const auto &c : options.invalidate_replica_clusters) {
console->print_note(
"Cluster '" + c +
"' was INVALIDATED and no longer part of the clusterset.");
}
}
console->print_info();
console->print_info("* Updating topology");
// Do the actual switchover:
// - stop/delete channel from primary of promoted
// - create channel in all members of demoted
// - enable failover at primary of demoted
// - start replica at demoted
undo_tracker.add("", [this, primary_cluster, options]() {
promote_to_primary(primary_cluster.get(), false, options.dry_run);
});
demote_from_primary(primary_cluster.get(), promoted.get(), ar_options,
options.dry_run);
// update settings other than credentials in members of remaining replica
// clusters
ar_options.repl_credentials = {};
{
// Synchronize all slaves and lock all instances.
Global_locks global_locks;
try {
global_locks.acquire(lock_instances, primary->get_uuid(),
options.timeout, options.dry_run);
} catch (const std::exception &e) {
console->print_error(
shcore::str_format("An error occurred while preparing "
"instances for a PRIMARY switch: %s",
e.what()));
throw;
}
console->print_info(
"* Synchronizing remaining transactions at promoted primary");
if (!options.dry_run) {
sync_transactions(*promoted, k_clusterset_async_channel_name,
options.timeout);
}
console->print_info();
}
undo_tracker.add(
"", [this, promoted_cluster, primary, ar_options, options]() {
demote_from_primary(promoted_cluster.get(), primary.get(), ar_options,
options.dry_run);
});
promote_to_primary(promoted_cluster.get(), true, options.dry_run);
console->print_info("* Updating replica clusters");
DBUG_EXECUTE_IF("set_primary_cluster_skip_update_replicas", {
std::cerr << "Skipping replica update "
"(set_primary_cluster_skip_update_replicas)\n";
return;
});
for (const auto &replica : clusters) {
if (promoted_cluster->get_name() != replica->get_name() &&
primary_cluster->get_name() != replica->get_name()) {
undo_tracker.add(
"", [this, &replica, primary, ar_options, options, console]() {
try {
update_replica(replica.get(), primary.get(), ar_options,
options.dry_run);
} catch (...) {
console->print_error("Could not revert changes to " +
replica->get_name() + ": " +
format_active_exception());
}
});
update_replica(replica.get(), promoted.get(), ar_options,
options.dry_run);
log_info("PRIMARY changed for cluster %s", replica->get_name().c_str());
}
}
// reset replication channel from the promoted primary after revert isn't
// needed anymore
delete_async_channel(promoted_cluster.get(), options.dry_run);
} catch (...) {
current_console()->print_info(
"* Could not complete operation, attempting to revert changes...");
try {
undo_tracker.execute();
} catch (...) {
current_console()->print_error("Could not revert changes: " +
format_active_exception());
}
throw;
}
console->print_info("Cluster '" + promoted_cluster->get_name() +
"' was promoted to PRIMARY of the clusterset. The "
"PRIMARY instance is '" +
promoted->descr() + "'");
console->print_info();
if (options.dry_run) {
console->print_info("dryRun finished.");
console->print_info();
}
}