in modules/adminapi/dba/reboot_cluster_from_complete_outage.cc [1025:1439]
std::shared_ptr<Cluster> Reboot_cluster_from_complete_outage::do_run() {
const auto console = current_console();
auto cluster_impl = m_cluster->impl();
auto cluster_ret = m_cluster;
auto cs_info = retrieve_cs_info(cluster_impl.get());
bool cluster_is_multi_primary = (cluster_impl->get_cluster_topology_type() ==
mysqlshdk::gr::Topology_mode::MULTI_PRIMARY);
if (!m_options.get_dry_run()) {
console->print_info("Restoring the Cluster '" + cluster_impl->get_name() +
"' from complete outage...");
console->print_info();
}
// after all the checks are done, this will store all the instances we need
// to work with
std::vector<std::shared_ptr<Instance>> instances;
{
// ensure that the current session instance exists on the Metadata Schema
if (std::string group_md_address =
m_target_instance->get_canonical_address();
!cluster_impl->contains_instance_with_address(group_md_address)) {
throw shcore::Exception::runtime_error(
"The current session instance does not belong to the Cluster: '" +
cluster_impl->get_name() + "'.");
}
// gather all instances
std::vector<Instance_metadata> instances_unreachable;
instances = retrieve_instances(&instances_unreachable);
// gather all (including target instance) instances states
std::unordered_map<std::shared_ptr<Instance>, mysqlshdk::gr::Member_state>
istates;
istates.rehash(instances.size() + 1);
istates[m_target_instance] =
mysqlshdk::gr::get_member_state(*m_target_instance);
for (const auto &i : instances)
istates[i] = mysqlshdk::gr::get_member_state(*i);
assert(instances.size() == (istates.size() - 1));
{
std::vector<std::string> addresses;
addresses.reserve(istates.size() + instances_unreachable.size());
std::transform(istates.cbegin(), istates.cend(),
std::back_inserter(addresses), [](const auto &istate) {
return shcore::str_format(
"'%s' (%s)",
istate.first->get_canonical_address().c_str(),
to_string(istate.second).c_str());
});
std::transform(
instances_unreachable.cbegin(), instances_unreachable.cend(),
std::back_inserter(addresses), [](const auto &i) {
return shcore::str_format("'%s' (UNREACHABLE)", i.address.c_str());
});
// so that the user msg is consistent
std::sort(addresses.begin(), addresses.end());
console->print_info(shcore::str_format(
"Cluster instances: %s", shcore::str_join(addresses, ", ").c_str()));
}
// check the state of the instances
std::vector<std::shared_ptr<Instance>> instances_online;
auto members_all_offline = ensure_all_members_offline(
istates, m_options.get_dry_run(), &instances_online);
assert(!members_all_offline || instances_online.empty());
// reaching this point, we have all the info needed to decide if the
// command can proceed.
if (members_all_offline)
log_info("All members of the Cluster are either offline or unreachable.");
// if any instance is online, we can't proceed
if (!instances_online.empty())
throw std::runtime_error("The Cluster is ONLINE");
if (!cs_info.is_invalidated &&
(!instances_unreachable.empty() || !members_all_offline)) {
std::vector<std::string> addresses;
addresses.reserve(instances_online.size() + instances_unreachable.size());
std::transform(instances_online.cbegin(), instances_online.cend(),
std::back_inserter(addresses),
[](const auto &i) { return i->get_canonical_address(); });
std::transform(instances_unreachable.cbegin(),
instances_unreachable.cend(),
std::back_inserter(addresses),
[](const auto &i) { return i.endpoint; });
// so that the user msg is consistent
std::sort(addresses.begin(), addresses.end());
console->print_warning(shcore::str_format(
"One or more instances of the Cluster could not be reached "
"and cannot be rejoined nor ensured to be OFFLINE: %s. Cluster may "
"diverge and become inconsistent unless all instances are either "
"reachable or certain to be OFFLINE and not accepting new "
"transactions. You may use the 'force' option to bypass this check "
"and proceed anyway.",
shcore::str_join(addresses, ", ", [](const auto &address) {
return shcore::str_format("'%s'", address.c_str());
}).c_str()));
if (!m_options.get_force())
throw std::runtime_error(
"Could not determine if Cluster is completely OFFLINE");
}
}
if (m_options.switch_communication_stack && cs_info.is_invalidated)
throw shcore::Exception::runtime_error(
"Cannot switch the communication stack in an invalidated Cluster "
"because its instances won't be removed or rejoined.");
if (m_options.gr_options.local_address.has_value()) {
console->print_warning(
"The value used for 'localAddress' only applies to the current "
"session instance (seed). If the values generated automatically for "
"other rejoining Cluster members are not valid, please use "
"<Cluster>.<<<rejoinInstance>>>() with the 'localAddress' option.");
console->print_info();
}
// before picking the best seed instance, make sure that all pending
// transactions are applied
{
console->print_info(
"Waiting for instances to apply pending received transactions...");
if (!m_options.get_dry_run()) {
auto check_cb = [](const mysqlshdk::mysql::IInstance &instance,
std::chrono::seconds timeout) {
mysqlshdk::mysql::Replication_channel channel;
if (!get_channel_status(instance, mysqlshdk::gr::k_gr_applier_channel,
&channel))
return;
if (auto status = channel.status();
(status == mysqlshdk::mysql::Replication_channel::OFF) ||
(status == mysqlshdk::mysql::Replication_channel::APPLIER_OFF) ||
(status == mysqlshdk::mysql::Replication_channel::APPLIER_ERROR))
return;
wait_for_apply_retrieved_trx(
instance, mysqlshdk::gr::k_gr_applier_channel, timeout, false);
};
check_cb(*m_target_instance, m_options.get_timeout());
for (const auto &i : instances) check_cb(*i, m_options.get_timeout());
}
}
// pick the seed instance
std::shared_ptr<Instance> best_instance_gtid;
{
best_instance_gtid =
pick_best_instance_gtid(instances, cs_info.is_member,
m_options.get_force(), m_options.get_primary());
// check if the other instances are compatible in regards to GTID with the
// (new) seed
if (!m_options.get_force()) {
auto new_seed =
best_instance_gtid ? best_instance_gtid : m_target_instance;
instances.push_back(m_target_instance); // temporary
for (const auto &i : instances) {
if (i->get_uuid() == new_seed->get_uuid()) continue;
std::string_view reason;
switch (auto gtid_state = check_replica_gtid_state(*new_seed, *i);
gtid_state) {
case mysqlshdk::mysql::Replica_gtid_state::IDENTICAL:
case mysqlshdk::mysql::Replica_gtid_state::RECOVERABLE:
continue;
case mysqlshdk::mysql::Replica_gtid_state::DIVERGED:
reason = "GTIDs diverged";
break;
case mysqlshdk::mysql::Replica_gtid_state::IRRECOVERABLE:
reason = "former has missing transactions";
break;
case mysqlshdk::mysql::Replica_gtid_state::NEW:
reason = "former has an empty GTID set";
break;
}
throw shcore::Exception::runtime_error(shcore::str_format(
"The instance '%s' has an incompatible GTID set with the seed "
"instance '%s' (%.*s). If you wish to proceed, the 'force' option "
"must be explicitly set.",
i->descr().c_str(), new_seed->descr().c_str(), (int)reason.size(),
reason.data()));
}
instances.pop_back();
}
}
// The 'force' option is a no-op in this scenario (instances aren't rejoined
// if the Cluster belongs to a ClusterSet and is INVALIDATED), however, it's
// more correct to forbid it instead of ignoring it.
if (cs_info.is_invalidated && m_options.get_force())
throw std::runtime_error(shcore::str_format(
"The 'force' option cannot be used in a Cluster that belongs to a "
"ClusterSet and is INVALIDATED."));
// everything is ready, proceed with the reboot...
// will have to lock the target instance and all reachable members
mysqlshdk::mysql::Lock_scoped_list i_locks;
{
auto new_seed = best_instance_gtid ? best_instance_gtid : m_target_instance;
i_locks.push_back(new_seed->get_lock_exclusive());
instances.push_back(m_target_instance); // temporary
for (const auto &instance : instances) {
if (instance->get_uuid() != new_seed->get_uuid())
i_locks.push_back(instance->get_lock_exclusive());
}
instances.pop_back();
}
// reboot seed
if (best_instance_gtid) {
// if we have a better instance in the cluster (in terms of highest GTID),
// use it instead of the target
if (mysqlshdk::utils::are_endpoints_equal(
best_instance_gtid->get_canonical_address(),
m_options.get_primary())) {
console->print_info(shcore::str_format(
"Switching over to instance '%s' to be used as seed.",
best_instance_gtid->get_canonical_address().c_str()));
} else {
console->print_info(shcore::str_format(
"Switching over to instance '%s' (which has the highest GTID set), "
"to be used as seed.",
best_instance_gtid->get_canonical_address().c_str()));
}
if (!m_options.get_dry_run()) {
// use the new target instance as seed
instances.push_back(std::exchange(m_target_instance, best_instance_gtid));
best_instance_gtid = nullptr;
reboot_seed(cs_info);
// refresh metadata and cluster info
std::shared_ptr<MetadataStorage> metadata;
std::shared_ptr<mysqlsh::dba::Cluster> cluster;
m_dba->connect_to_target_group(m_target_instance, &metadata, nullptr,
false);
current_ipool()->set_metadata(metadata);
cluster = m_dba->get_cluster(cluster_impl->get_name().c_str(), metadata,
m_target_instance, true, true);
cluster_ret = cluster;
cluster_impl = cluster->impl();
}
} else if (!m_options.get_dry_run()) {
// use the target instance as seed
reboot_seed(cs_info);
}
bool rejoin_remaning_instances = true;
// don't rejoin the instances *if* cluster is in a cluster set and is
// invalidated (former primary) or is a replica and the primary doesn't have
// global status OK
if (cs_info.is_member &&
(cs_info.is_invalidated ||
(!cs_info.is_primary &&
cs_info.primary_status != Cluster_global_status::OK))) {
auto msg = cs_info.is_invalidated
? "Skipping rejoining remaining instances because the "
"Cluster belongs to a ClusterSet and is INVALIDATED. "
"Please add or remove the instances after the Cluster is "
"rejoined to the ClusterSet."
: "Skipping rejoining remaining instances because the "
"Cluster belongs to a ClusterSet and its global status is "
"not OK. Please add or remove the instances after the "
"Cluster is rejoined to the ClusterSet.";
console->print_info(msg);
rejoin_remaning_instances = false;
} else if (!m_options.get_dry_run()) {
// it's either a non ClusterSet instance or it is but it's not the
// primary, so we just need to acquire the primary before rejoining the
// instances
if (cs_info.is_member && !cluster_impl->is_cluster_set_remove_pending()) {
std::shared_ptr<Cluster_set_impl> cs;
try {
cs = cluster_impl->check_and_get_cluster_set_for_cluster();
} catch (const shcore::Exception &e) {
if (e.code() != SHERR_DBA_METADATA_MISSING) throw;
}
// If the ClusterSet couldn't be obtained, it means the Cluster has been
// forcefully removed from it
if (!m_options.get_dry_run() && cs) {
// Acquire primary to ensure the correct primary member will be used
// from now on when the Cluster belongs to a ClusterSet.
cluster_impl->acquire_primary();
// If the communication stack was changed and this is a replica
// cluster, we must ensure here that 'grLocal' reflects the new value
// for local address
if (!cs_info.is_primary &&
m_options.switch_communication_stack.has_value()) {
cluster_impl->update_metadata_for_instance(*m_target_instance);
}
}
}
}
// if the cluster is part of a set
if (cs_info.is_member) {
// if this is a primary, we don't want to rejoin, but we need to check if
// the cluster was removed from the set, to mark it as pending removed,
// otherwise, calling dissolve on the cluster object returned will fail
if (cs_info.is_primary) {
try {
// Get the clusterset object (which also checks for various clusterset
// MD consistency scenarios)
cluster_impl->check_and_get_cluster_set_for_cluster();
} catch (const shcore::Exception &e) {
if (e.code() == SHERR_DBA_METADATA_MISSING)
cluster_impl->set_cluster_set_remove_pending(true);
}
} else if (!cs_info.removed_from_set && !cs_info.is_invalidated) {
// this is a replica cluster, so try and rejoin the cluster set
console->print_info("Rejoining Cluster into its original ClusterSet...");
console->print_info();
if (!m_options.get_dry_run()) {
try {
auto cluster_set_impl = cluster_impl->get_cluster_set_object();
cluster_set_impl->get_primary_cluster()->check_cluster_online();
cluster_set_impl->rejoin_cluster(cluster_impl->get_name(), {}, false);
// also ensure SRO is enabled on all members
cluster_set_impl->ensure_replica_settings(cluster_impl.get(), false);
} catch (const shcore::Exception &e) {
switch (e.code()) {
case SHERR_DBA_DATA_ERRANT_TRANSACTIONS:
console->print_warning(
"Unable to rejoin Cluster to the ClusterSet because this "
"Cluster has errant transactions that did not originate from "
"the primary Cluster of the ClusterSet.");
break;
case SHERR_DBA_GROUP_OFFLINE:
case SHERR_DBA_GROUP_UNREACHABLE:
case SHERR_DBA_GROUP_HAS_NO_QUORUM:
case SHERR_DBA_CLUSTER_PRIMARY_UNAVAILABLE:
console->print_warning(
"Unable to rejoin Cluster to the ClusterSet (primary Cluster "
"is unreachable). Please call ClusterSet.rejoinCluster() to "
"manually rejoin this Cluster back into the ClusterSet.");
break;
case SHERR_DBA_METADATA_MISSING: {
std::string cs_domain_name;
cluster_impl->get_metadata_storage()->check_cluster_set(
nullptr, nullptr, &cs_domain_name);
console->print_warning(shcore::str_format(
"The Cluster '%s' appears to have been removed from the "
"ClusterSet '%s', however its own metadata copy wasn't "
"properly updated during the removal.",
cluster_impl->get_name().c_str(), cs_domain_name.c_str()));
cluster_impl->set_cluster_set_remove_pending(true);
} break;
default:
throw;
}
}
}
}
}
if (rejoin_remaning_instances && !m_options.get_dry_run()) {
// and finally, rejoin all instances
rejoin_instances(cluster_impl.get(), *m_target_instance, instances,
m_options, !cluster_is_multi_primary);
}
if (m_options.get_dry_run()) {
console->print_info("dryRun finished.");
console->print_info();
return nullptr;
}
console->print_info("The Cluster was successfully rebooted.");
console->print_info();
return cluster_ret;
}