std::shared_ptr Reboot_cluster_from_complete_outage::do_run()

in modules/adminapi/dba/reboot_cluster_from_complete_outage.cc [1025:1439]


std::shared_ptr<Cluster> Reboot_cluster_from_complete_outage::do_run() {
  const auto console = current_console();
  auto cluster_impl = m_cluster->impl();
  auto cluster_ret = m_cluster;

  auto cs_info = retrieve_cs_info(cluster_impl.get());
  bool cluster_is_multi_primary = (cluster_impl->get_cluster_topology_type() ==
                                   mysqlshdk::gr::Topology_mode::MULTI_PRIMARY);

  if (!m_options.get_dry_run()) {
    console->print_info("Restoring the Cluster '" + cluster_impl->get_name() +
                        "' from complete outage...");
    console->print_info();
  }

  // after all the checks are done, this will store all the instances we need
  // to work with
  std::vector<std::shared_ptr<Instance>> instances;
  {
    // ensure that the current session instance exists on the Metadata Schema
    if (std::string group_md_address =
            m_target_instance->get_canonical_address();
        !cluster_impl->contains_instance_with_address(group_md_address)) {
      throw shcore::Exception::runtime_error(
          "The current session instance does not belong to the Cluster: '" +
          cluster_impl->get_name() + "'.");
    }

    // gather all instances
    std::vector<Instance_metadata> instances_unreachable;
    instances = retrieve_instances(&instances_unreachable);

    // gather all (including target instance) instances states
    std::unordered_map<std::shared_ptr<Instance>, mysqlshdk::gr::Member_state>
        istates;

    istates.rehash(instances.size() + 1);
    istates[m_target_instance] =
        mysqlshdk::gr::get_member_state(*m_target_instance);
    for (const auto &i : instances)
      istates[i] = mysqlshdk::gr::get_member_state(*i);

    assert(instances.size() == (istates.size() - 1));

    {
      std::vector<std::string> addresses;
      addresses.reserve(istates.size() + instances_unreachable.size());

      std::transform(istates.cbegin(), istates.cend(),
                     std::back_inserter(addresses), [](const auto &istate) {
                       return shcore::str_format(
                           "'%s' (%s)",
                           istate.first->get_canonical_address().c_str(),
                           to_string(istate.second).c_str());
                     });
      std::transform(
          instances_unreachable.cbegin(), instances_unreachable.cend(),
          std::back_inserter(addresses), [](const auto &i) {
            return shcore::str_format("'%s' (UNREACHABLE)", i.address.c_str());
          });

      // so that the user msg is consistent
      std::sort(addresses.begin(), addresses.end());

      console->print_info(shcore::str_format(
          "Cluster instances: %s", shcore::str_join(addresses, ", ").c_str()));
    }

    // check the state of the instances
    std::vector<std::shared_ptr<Instance>> instances_online;
    auto members_all_offline = ensure_all_members_offline(
        istates, m_options.get_dry_run(), &instances_online);

    assert(!members_all_offline || instances_online.empty());

    // reaching this point, we have all the info needed to decide if the
    // command can proceed.

    if (members_all_offline)
      log_info("All members of the Cluster are either offline or unreachable.");

    // if any instance is online, we can't proceed
    if (!instances_online.empty())
      throw std::runtime_error("The Cluster is ONLINE");

    if (!cs_info.is_invalidated &&
        (!instances_unreachable.empty() || !members_all_offline)) {
      std::vector<std::string> addresses;
      addresses.reserve(instances_online.size() + instances_unreachable.size());

      std::transform(instances_online.cbegin(), instances_online.cend(),
                     std::back_inserter(addresses),
                     [](const auto &i) { return i->get_canonical_address(); });
      std::transform(instances_unreachable.cbegin(),
                     instances_unreachable.cend(),
                     std::back_inserter(addresses),
                     [](const auto &i) { return i.endpoint; });

      // so that the user msg is consistent
      std::sort(addresses.begin(), addresses.end());

      console->print_warning(shcore::str_format(
          "One or more instances of the Cluster could not be reached "
          "and cannot be rejoined nor ensured to be OFFLINE: %s. Cluster may "
          "diverge and become inconsistent unless all instances are either "
          "reachable or certain to be OFFLINE and not accepting new "
          "transactions. You may use the 'force' option to bypass this check "
          "and proceed anyway.",
          shcore::str_join(addresses, ", ", [](const auto &address) {
            return shcore::str_format("'%s'", address.c_str());
          }).c_str()));

      if (!m_options.get_force())
        throw std::runtime_error(
            "Could not determine if Cluster is completely OFFLINE");
    }
  }

  if (m_options.switch_communication_stack && cs_info.is_invalidated)
    throw shcore::Exception::runtime_error(
        "Cannot switch the communication stack in an invalidated Cluster "
        "because its instances won't be removed or rejoined.");

  if (m_options.gr_options.local_address.has_value()) {
    console->print_warning(
        "The value used for 'localAddress' only applies to the current "
        "session instance (seed). If the values generated automatically for "
        "other rejoining Cluster members are not valid, please use "
        "<Cluster>.<<<rejoinInstance>>>() with the 'localAddress' option.");
    console->print_info();
  }

  // before picking the best seed instance, make sure that all pending
  // transactions are applied
  {
    console->print_info(
        "Waiting for instances to apply pending received transactions...");

    if (!m_options.get_dry_run()) {
      auto check_cb = [](const mysqlshdk::mysql::IInstance &instance,
                         std::chrono::seconds timeout) {
        mysqlshdk::mysql::Replication_channel channel;
        if (!get_channel_status(instance, mysqlshdk::gr::k_gr_applier_channel,
                                &channel))
          return;

        if (auto status = channel.status();
            (status == mysqlshdk::mysql::Replication_channel::OFF) ||
            (status == mysqlshdk::mysql::Replication_channel::APPLIER_OFF) ||
            (status == mysqlshdk::mysql::Replication_channel::APPLIER_ERROR))
          return;

        wait_for_apply_retrieved_trx(
            instance, mysqlshdk::gr::k_gr_applier_channel, timeout, false);
      };

      check_cb(*m_target_instance, m_options.get_timeout());

      for (const auto &i : instances) check_cb(*i, m_options.get_timeout());
    }
  }

  // pick the seed instance
  std::shared_ptr<Instance> best_instance_gtid;
  {
    best_instance_gtid =
        pick_best_instance_gtid(instances, cs_info.is_member,
                                m_options.get_force(), m_options.get_primary());

    // check if the other instances are compatible in regards to GTID with the
    // (new) seed
    if (!m_options.get_force()) {
      auto new_seed =
          best_instance_gtid ? best_instance_gtid : m_target_instance;

      instances.push_back(m_target_instance);  // temporary

      for (const auto &i : instances) {
        if (i->get_uuid() == new_seed->get_uuid()) continue;

        std::string_view reason;
        switch (auto gtid_state = check_replica_gtid_state(*new_seed, *i);
                gtid_state) {
          case mysqlshdk::mysql::Replica_gtid_state::IDENTICAL:
          case mysqlshdk::mysql::Replica_gtid_state::RECOVERABLE:
            continue;
          case mysqlshdk::mysql::Replica_gtid_state::DIVERGED:
            reason = "GTIDs diverged";
            break;
          case mysqlshdk::mysql::Replica_gtid_state::IRRECOVERABLE:
            reason = "former has missing transactions";
            break;
          case mysqlshdk::mysql::Replica_gtid_state::NEW:
            reason = "former has an empty GTID set";
            break;
        }

        throw shcore::Exception::runtime_error(shcore::str_format(
            "The instance '%s' has an incompatible GTID set with the seed "
            "instance '%s' (%.*s). If you wish to proceed, the 'force' option "
            "must be explicitly set.",
            i->descr().c_str(), new_seed->descr().c_str(), (int)reason.size(),
            reason.data()));
      }

      instances.pop_back();
    }
  }

  // The 'force' option is a no-op in this scenario (instances aren't rejoined
  // if the Cluster belongs to a ClusterSet and is INVALIDATED), however, it's
  // more correct to forbid it instead of ignoring it.
  if (cs_info.is_invalidated && m_options.get_force())
    throw std::runtime_error(shcore::str_format(
        "The 'force' option cannot be used in a Cluster that belongs to a "
        "ClusterSet and is INVALIDATED."));

  // everything is ready, proceed with the reboot...

  // will have to lock the target instance and all reachable members
  mysqlshdk::mysql::Lock_scoped_list i_locks;
  {
    auto new_seed = best_instance_gtid ? best_instance_gtid : m_target_instance;
    i_locks.push_back(new_seed->get_lock_exclusive());

    instances.push_back(m_target_instance);  // temporary

    for (const auto &instance : instances) {
      if (instance->get_uuid() != new_seed->get_uuid())
        i_locks.push_back(instance->get_lock_exclusive());
    }

    instances.pop_back();
  }

  // reboot seed
  if (best_instance_gtid) {
    // if we have a better instance in the cluster (in terms of highest GTID),
    // use it instead of the target
    if (mysqlshdk::utils::are_endpoints_equal(
            best_instance_gtid->get_canonical_address(),
            m_options.get_primary())) {
      console->print_info(shcore::str_format(
          "Switching over to instance '%s' to be used as seed.",
          best_instance_gtid->get_canonical_address().c_str()));
    } else {
      console->print_info(shcore::str_format(
          "Switching over to instance '%s' (which has the highest GTID set), "
          "to be used as seed.",
          best_instance_gtid->get_canonical_address().c_str()));
    }

    if (!m_options.get_dry_run()) {
      // use the new target instance as seed
      instances.push_back(std::exchange(m_target_instance, best_instance_gtid));
      best_instance_gtid = nullptr;

      reboot_seed(cs_info);

      // refresh metadata and cluster info
      std::shared_ptr<MetadataStorage> metadata;
      std::shared_ptr<mysqlsh::dba::Cluster> cluster;

      m_dba->connect_to_target_group(m_target_instance, &metadata, nullptr,
                                     false);
      current_ipool()->set_metadata(metadata);

      cluster = m_dba->get_cluster(cluster_impl->get_name().c_str(), metadata,
                                   m_target_instance, true, true);

      cluster_ret = cluster;
      cluster_impl = cluster->impl();
    }

  } else if (!m_options.get_dry_run()) {
    // use the target instance as seed
    reboot_seed(cs_info);
  }

  bool rejoin_remaning_instances = true;

  // don't rejoin the instances *if* cluster is in a cluster set and is
  // invalidated (former primary) or is a replica and the primary doesn't have
  // global status OK
  if (cs_info.is_member &&
      (cs_info.is_invalidated ||
       (!cs_info.is_primary &&
        cs_info.primary_status != Cluster_global_status::OK))) {
    auto msg = cs_info.is_invalidated
                   ? "Skipping rejoining remaining instances because the "
                     "Cluster belongs to a ClusterSet and is INVALIDATED. "
                     "Please add or remove the instances after the Cluster is "
                     "rejoined to the ClusterSet."
                   : "Skipping rejoining remaining instances because the "
                     "Cluster belongs to a ClusterSet and its global status is "
                     "not OK. Please add or remove the instances after the "
                     "Cluster is rejoined to the ClusterSet.";
    console->print_info(msg);

    rejoin_remaning_instances = false;
  } else if (!m_options.get_dry_run()) {
    // it's either a non ClusterSet instance or it is but it's not the
    // primary, so we just need to acquire the primary before rejoining the
    // instances
    if (cs_info.is_member && !cluster_impl->is_cluster_set_remove_pending()) {
      std::shared_ptr<Cluster_set_impl> cs;
      try {
        cs = cluster_impl->check_and_get_cluster_set_for_cluster();
      } catch (const shcore::Exception &e) {
        if (e.code() != SHERR_DBA_METADATA_MISSING) throw;
      }

      // If the ClusterSet couldn't be obtained, it means the Cluster has been
      // forcefully removed from it
      if (!m_options.get_dry_run() && cs) {
        // Acquire primary to ensure the correct primary member will be used
        // from now on when the Cluster belongs to a ClusterSet.
        cluster_impl->acquire_primary();

        // If the communication stack was changed and this is a replica
        // cluster, we must ensure here that 'grLocal' reflects the new value
        // for local address
        if (!cs_info.is_primary &&
            m_options.switch_communication_stack.has_value()) {
          cluster_impl->update_metadata_for_instance(*m_target_instance);
        }
      }
    }
  }

  // if the cluster is part of a set
  if (cs_info.is_member) {
    // if this is a primary, we don't want to rejoin, but we need to check if
    // the cluster was removed from the set, to mark it as pending removed,
    // otherwise, calling dissolve on the cluster object returned will fail
    if (cs_info.is_primary) {
      try {
        // Get the clusterset object (which also checks for various clusterset
        // MD consistency scenarios)
        cluster_impl->check_and_get_cluster_set_for_cluster();

      } catch (const shcore::Exception &e) {
        if (e.code() == SHERR_DBA_METADATA_MISSING)
          cluster_impl->set_cluster_set_remove_pending(true);
      }
    } else if (!cs_info.removed_from_set && !cs_info.is_invalidated) {
      // this is a replica cluster, so try and rejoin the cluster set
      console->print_info("Rejoining Cluster into its original ClusterSet...");
      console->print_info();

      if (!m_options.get_dry_run()) {
        try {
          auto cluster_set_impl = cluster_impl->get_cluster_set_object();

          cluster_set_impl->get_primary_cluster()->check_cluster_online();

          cluster_set_impl->rejoin_cluster(cluster_impl->get_name(), {}, false);

          // also ensure SRO is enabled on all members
          cluster_set_impl->ensure_replica_settings(cluster_impl.get(), false);
        } catch (const shcore::Exception &e) {
          switch (e.code()) {
            case SHERR_DBA_DATA_ERRANT_TRANSACTIONS:
              console->print_warning(
                  "Unable to rejoin Cluster to the ClusterSet because this "
                  "Cluster has errant transactions that did not originate from "
                  "the primary Cluster of the ClusterSet.");
              break;
            case SHERR_DBA_GROUP_OFFLINE:
            case SHERR_DBA_GROUP_UNREACHABLE:
            case SHERR_DBA_GROUP_HAS_NO_QUORUM:
            case SHERR_DBA_CLUSTER_PRIMARY_UNAVAILABLE:
              console->print_warning(
                  "Unable to rejoin Cluster to the ClusterSet (primary Cluster "
                  "is unreachable). Please call ClusterSet.rejoinCluster() to "
                  "manually rejoin this Cluster back into the ClusterSet.");
              break;
            case SHERR_DBA_METADATA_MISSING: {
              std::string cs_domain_name;
              cluster_impl->get_metadata_storage()->check_cluster_set(
                  nullptr, nullptr, &cs_domain_name);
              console->print_warning(shcore::str_format(
                  "The Cluster '%s' appears to have been removed from the "
                  "ClusterSet '%s', however its own metadata copy wasn't "
                  "properly updated during the removal.",
                  cluster_impl->get_name().c_str(), cs_domain_name.c_str()));

              cluster_impl->set_cluster_set_remove_pending(true);
            } break;
            default:
              throw;
          }
        }
      }
    }
  }

  if (rejoin_remaning_instances && !m_options.get_dry_run()) {
    // and finally, rejoin all instances
    rejoin_instances(cluster_impl.get(), *m_target_instance, instances,
                     m_options, !cluster_is_multi_primary);
  }

  if (m_options.get_dry_run()) {
    console->print_info("dryRun finished.");
    console->print_info();

    return nullptr;
  }

  console->print_info("The Cluster was successfully rebooted.");
  console->print_info();

  return cluster_ret;
}