void Cluster_set_impl::set_primary_cluster()

in modules/adminapi/cluster_set/cluster_set_impl.cc [2143:2422]


void Cluster_set_impl::set_primary_cluster(
    const std::string &cluster_name,
    const clusterset::Set_primary_cluster_options &options) {
  check_preconditions("setPrimaryCluster");

  // put an exclusive lock on the clusterset
  mysqlshdk::mysql::Lock_scoped_list api_locks;
  api_locks.push_back(get_lock_exclusive());

  auto console = current_console();
  console->print_info("Switching the primary cluster of the clusterset to '" +
                      cluster_name + "'");
  if (options.dry_run)
    console->print_note("dryRun enabled, no changes will be made");

  auto primary = get_primary_master();
  shcore::on_leave_scope release_primary_finally(
      [this]() { release_primary(); });

  auto primary_cluster = get_primary_cluster();
  std::shared_ptr<Cluster_impl> promoted_cluster;
  try {
    promoted_cluster = get_cluster(cluster_name);
  } catch (const shcore::Exception &e) {
    console->print_error("Cluster '" + cluster_name +
                         "' cannot be promoted: " + e.format());
    throw;
  }

  if (promoted_cluster == primary_cluster) {
    throw shcore::Exception(
        "Cluster '" + cluster_name + "' is already the PRIMARY cluster",
        SHERR_DBA_TARGET_ALREADY_PRIMARY);
  }

  auto promoted = promoted_cluster->get_cluster_server();

  log_info(
      "Switchover primary_cluster=%s primary_instance=%s "
      "promoted_cluster=%s promoted_instance=%s",
      primary_cluster->get_name().c_str(), primary->descr().c_str(),
      promoted_cluster->get_name().c_str(), promoted->descr().c_str());

  // if there any unreachable clusters we collect them in this list, which will
  // be used by connect_all_clusters() for skipping clusters
  std::list<Cluster_id> unreachable;

  console->print_info("* Verifying clusterset status");
  check_clusters_available(options.invalidate_replica_clusters, &unreachable);

  std::list<std::shared_ptr<Cluster_impl>> clusters(
      connect_all_clusters(0, false, &unreachable));
  shcore::on_leave_scope release_cluster_primaries([&clusters] {
    for (auto &c : clusters) c->release_primary();
  });

  // put an exclusive lock on each reachable cluster
  for (const auto &cluster : clusters)
    api_locks.push_back(cluster->get_lock_exclusive());

  // another set of connections for locks
  // get triggered before server-side timeouts
  std::list<std::shared_ptr<Cluster_impl>> lock_clusters(
      connect_all_clusters(options.timeout + 5, false, &unreachable));
  auto release_lock_primaries =
      shcore::on_leave_scope([this, &lock_clusters, options] {
        for (auto &c : lock_clusters) {
          if (!options.dry_run && options.timeout > 0) {
            c->get_cluster_server()->set_sysvar_default(
                "lock_wait_timeout", mysqlshdk::mysql::Var_qualifier::SESSION);
          }
          c->release_primary();
        }
        get_primary_master()->set_sysvar_default(
            "lock_wait_timeout", mysqlshdk::mysql::Var_qualifier::SESSION);
      });
  std::list<std::shared_ptr<Instance>> lock_instances;
  for (auto &c : lock_clusters) {
    if (!options.dry_run && options.timeout > 0) {
      c->get_cluster_server()->set_sysvar(
          "lock_wait_timeout", static_cast<int64_t>(options.timeout),
          mysqlshdk::mysql::Var_qualifier::SESSION);
    }

    lock_instances.emplace_back(c->get_primary_master());
  }
  get_primary_master()->set_sysvar("lock_wait_timeout",
                                   static_cast<int64_t>(options.timeout),
                                   mysqlshdk::mysql::Var_qualifier::SESSION);
  get_metadata_storage()->get_md_server()->set_sysvar(
      "lock_wait_timeout", static_cast<int64_t>(options.timeout),
      mysqlshdk::mysql::Var_qualifier::SESSION);

  console->print_info();

  // make sure that all instances have the most up-to-date GTID
  console->print_info(
      "** Waiting for the promoted cluster to apply pending received "
      "transactions...");

  if (!options.dry_run) {
    mysqlshdk::mysql::Replication_channel channel;
    if (get_channel_status(*promoted, mysqlshdk::gr::k_gr_applier_channel,
                           &channel)) {
      switch (channel.status()) {
        case mysqlshdk::mysql::Replication_channel::OFF:
        case mysqlshdk::mysql::Replication_channel::APPLIER_OFF:
        case mysqlshdk::mysql::Replication_channel::APPLIER_ERROR:
          break;
        default: {
          auto timeout =
              (options.timeout <= 0)
                  ? options.timeout
                  : current_shell_options()->get().dba_gtid_wait_timeout;
          wait_for_apply_retrieved_trx(*promoted,
                                       mysqlshdk::gr::k_gr_applier_channel,
                                       std::chrono::seconds{timeout}, true);
        }
      }
    }
  }

  ensure_transaction_set_consistent_and_recoverable(
      promoted.get(), primary.get(), primary_cluster.get(), false,
      options.dry_run, nullptr);

  // Get the current settings for the ClusterSet replication channel
  Async_replication_options ar_options = get_clusterset_replication_options();

  console->print_info("* Refreshing replication account of demoted cluster");
  // Re-generate a new password for the master being demoted.
  ar_options.repl_credentials =
      refresh_cluster_replication_user(primary_cluster.get(), options.dry_run);

  console->print_info("* Synchronizing transaction backlog at " +
                      promoted->descr());
  if (!options.dry_run) {
    sync_transactions(*promoted, k_clusterset_async_channel_name,
                      options.timeout);
  }
  console->print_info();

  // Restore the transaction_size_limit value to the original one
  restore_transaction_size_limit(promoted_cluster.get(), options.dry_run);

  Undo_tracker undo_tracker;

  try {
    console->print_info("* Updating metadata");

    // Update the metadata with the state the replicaset is supposed to be in
    log_info("Updating metadata at %s",
             m_metadata_storage->get_md_server()->descr().c_str());
    if (!options.dry_run) {
      {
        MetadataStorage::Transaction trx(m_metadata_storage);
        m_metadata_storage->record_cluster_set_primary_switch(
            get_id(), promoted_cluster->get_id(), unreachable);
        trx.commit();
      }

      undo_tracker.add("", [=]() {
        MetadataStorage::Transaction trx(m_metadata_storage);
        m_metadata_storage->record_cluster_set_primary_switch(
            get_id(), primary_cluster->get_id(), {});
        trx.commit();
      });

      for (const auto &c : options.invalidate_replica_clusters) {
        console->print_note(
            "Cluster '" + c +
            "' was INVALIDATED and no longer part of the clusterset.");
      }
    }
    console->print_info();

    console->print_info("* Updating topology");

    // Do the actual switchover:
    // - stop/delete channel from primary of promoted
    // - create channel in all members of demoted
    // - enable failover at primary of demoted
    // - start replica at demoted

    undo_tracker.add("", [this, primary_cluster, options]() {
      promote_to_primary(primary_cluster.get(), false, options.dry_run);
    });
    demote_from_primary(primary_cluster.get(), promoted.get(), ar_options,
                        options.dry_run);

    // update settings other than credentials in members of remaining replica
    // clusters
    ar_options.repl_credentials = {};

    {
      // Synchronize all slaves and lock all instances.
      Global_locks global_locks;
      try {
        global_locks.acquire(lock_instances, primary->get_uuid(),
                             options.timeout, options.dry_run);
      } catch (const std::exception &e) {
        console->print_error(
            shcore::str_format("An error occurred while preparing "
                               "instances for a PRIMARY switch: %s",
                               e.what()));
        throw;
      }

      console->print_info(
          "* Synchronizing remaining transactions at promoted primary");
      if (!options.dry_run) {
        sync_transactions(*promoted, k_clusterset_async_channel_name,
                          options.timeout);
      }
      console->print_info();
    }

    undo_tracker.add(
        "", [this, promoted_cluster, primary, ar_options, options]() {
          demote_from_primary(promoted_cluster.get(), primary.get(), ar_options,
                              options.dry_run);
        });
    promote_to_primary(promoted_cluster.get(), true, options.dry_run);

    console->print_info("* Updating replica clusters");

    DBUG_EXECUTE_IF("set_primary_cluster_skip_update_replicas", {
      std::cerr << "Skipping replica update "
                   "(set_primary_cluster_skip_update_replicas)\n";
      return;
    });

    for (const auto &replica : clusters) {
      if (promoted_cluster->get_name() != replica->get_name() &&
          primary_cluster->get_name() != replica->get_name()) {
        undo_tracker.add(
            "", [this, &replica, primary, ar_options, options, console]() {
              try {
                update_replica(replica.get(), primary.get(), ar_options,
                               options.dry_run);
              } catch (...) {
                console->print_error("Could not revert changes to " +
                                     replica->get_name() + ": " +
                                     format_active_exception());
              }
            });

        update_replica(replica.get(), promoted.get(), ar_options,
                       options.dry_run);

        log_info("PRIMARY changed for cluster %s", replica->get_name().c_str());
      }
    }

    // reset replication channel from the promoted primary after revert isn't
    // needed anymore
    delete_async_channel(promoted_cluster.get(), options.dry_run);
  } catch (...) {
    current_console()->print_info(
        "* Could not complete operation, attempting to revert changes...");
    try {
      undo_tracker.execute();
    } catch (...) {
      current_console()->print_error("Could not revert changes: " +
                                     format_active_exception());
    }
    throw;
  }

  console->print_info("Cluster '" + promoted_cluster->get_name() +
                      "' was promoted to PRIMARY of the clusterset. The "
                      "PRIMARY instance is '" +
                      promoted->descr() + "'");
  console->print_info();

  if (options.dry_run) {
    console->print_info("dryRun finished.");
    console->print_info();
  }
}