in modules/adminapi/cluster/status.cc [1352:1679]
shcore::Dictionary_t Status::get_topology(
const std::vector<mysqlshdk::gr::Member> &member_info) {
using mysqlshdk::gr::Member_role;
using mysqlshdk::gr::Member_state;
using mysqlshdk::mysql::Replication_channel;
Member_stats_map member_stats = query_member_stats();
shcore::Dictionary_t dict = shcore::make_dict();
auto get_member = [&member_info](const std::string &uuid) {
for (const auto &m : member_info) {
if (m.uuid == uuid) return m;
}
return mysqlshdk::gr::Member();
};
std::vector<Instance_metadata_info> instances;
// add placeholders for unmanaged members
for (const auto &m : member_info) {
bool found = false;
for (const auto &i : m_instances) {
if (i.uuid == m.uuid) {
Instance_metadata_info mdi;
mdi.md = i;
mdi.actual_server_uuid = m.uuid;
instances.emplace_back(std::move(mdi));
found = true;
break;
}
}
if (!found) {
// if instance in MD was not found by uuid, then search by address
for (const auto &i : m_instances) {
if (!mysqlshdk::utils::are_endpoints_equal(
i.address,
mysqlshdk::utils::make_host_and_port(m.host, m.port)))
continue;
log_debug(
"Instance with address=%s is in group, but UUID doesn't match "
"group=%s MD=%s",
i.endpoint.c_str(), m.uuid.c_str(), i.uuid.c_str());
Instance_metadata_info mdi;
mdi.md = i;
mdi.actual_server_uuid = m.uuid;
instances.emplace_back(std::move(mdi));
found = true;
break;
}
}
if (!found) {
Instance_metadata_info mdi;
mdi.md.address = mysqlshdk::utils::make_host_and_port(m.host, m.port);
mdi.md.label = mdi.md.address;
mdi.md.uuid = m.uuid;
mdi.md.endpoint = mdi.md.address;
mdi.actual_server_uuid = mdi.md.uuid;
log_debug("Instance %s with uuid=%s found in group but not in MD",
mdi.md.address.c_str(), m.uuid.c_str());
auto group_instance = m_cluster.get_cluster_server();
mysqlshdk::db::Connection_options opts(mdi.md.endpoint);
mysqlshdk::db::Connection_options group_session_copts(
group_instance->get_connection_options());
opts.set_login_options_from(group_session_copts);
try {
m_member_sessions[mdi.md.endpoint] = Instance::connect(opts);
} catch (const shcore::Error &e) {
m_member_connect_errors[mdi.md.endpoint] = e.format();
}
instances.emplace_back(std::move(mdi));
}
}
// look for instances in MD but not in group
for (const auto &i : m_instances) {
bool found = false;
for (const auto &m : member_info) {
if (m.uuid == i.uuid ||
mysqlshdk::utils::are_endpoints_equal(
i.endpoint,
mysqlshdk::utils::make_host_and_port(m.host, m.port))) {
found = true;
break;
}
}
if (!found) {
log_debug("Instance with uuid=%s address=%s is in MD but not in group",
i.uuid.c_str(), i.endpoint.c_str());
auto &instance = m_member_sessions[i.endpoint];
Instance_metadata_info mdi;
mdi.md = i;
if (instance) {
mdi.actual_server_uuid = instance->get_uuid();
} else {
// fallback to assuming the server_uuid is OK if we can't connect to
// the instance
mdi.actual_server_uuid = i.uuid;
}
instances.emplace_back(std::move(mdi));
}
}
std::map<std::string, std::string> endpoints;
if (m_cluster.get_metadata_storage()->real_version().get_major() > 1) {
endpoints =
m_cluster.get_metadata_storage()->get_instances_with_recovery_accounts(
m_cluster.get_id());
}
auto mismatched_recovery_accounts =
m_cluster.get_mismatched_recovery_accounts();
for (const auto &inst : instances) {
shcore::Dictionary_t member = shcore::make_dict();
mysqlshdk::gr::Member minfo(get_member(inst.actual_server_uuid));
mysqlshdk::gr::Member_state self_state =
mysqlshdk::gr::Member_state::MISSING;
auto &instance = m_member_sessions[inst.md.endpoint];
std::optional<bool> super_read_only;
std::optional<bool> offline_mode;
std::vector<std::string> fence_sysvars;
bool auto_rejoin = false;
Replication_channel applier_channel;
Replication_channel recovery_channel;
Parallel_applier_options parallel_applier_options;
if (instance) {
// Get the current parallel-applier options
parallel_applier_options = Parallel_applier_options(*instance);
// Get super_read_only value of each instance to set the mode accurately.
super_read_only = instance->get_sysvar_bool("super_read_only");
// Get offline_mode value of each instance to set the mode accurately.
offline_mode = instance->get_sysvar_bool("offline_mode");
// Check if auto-rejoin is running.
auto_rejoin = mysqlshdk::gr::is_running_gr_auto_rejoin(*instance);
self_state = mysqlshdk::gr::get_member_state(*instance);
minfo.version = instance->get_version().get_base();
if (m_extended.has_value()) {
if (*m_extended >= 1) {
fence_sysvars = instance->get_fence_sysvars();
auto workers = parallel_applier_options.replica_parallel_workers;
if (parallel_applier_options.replica_parallel_workers.value_or(0) >
0) {
(*member)["applierWorkerThreads"] = shcore::Value(*workers);
}
}
if (*m_extended >= 3) {
collect_local_status(member, *instance,
minfo.state == Member_state::RECOVERING);
}
if (minfo.state == Member_state::ONLINE)
collect_basic_local_status(member, *instance,
minfo.role == Member_role::PRIMARY);
shcore::Value recovery_info;
if (minfo.state == Member_state::RECOVERING) {
std::string status;
// Get the join timestamp from the Metadata
shcore::Value join_time;
m_cluster.get_metadata_storage()->query_instance_attribute(
instance->get_uuid(), k_instance_attribute_join_time, &join_time);
std::tie(status, recovery_info) = recovery_status(
*instance,
join_time.type == shcore::String ? join_time.as_string() : "");
if (!status.empty()) {
(*member)["recoveryStatusText"] = shcore::Value(status);
}
}
// Include recovery channel info if RECOVERING or if there's an error
if (mysqlshdk::mysql::get_channel_status(
*instance, mysqlshdk::gr::k_gr_recovery_channel,
&recovery_channel) &&
*m_extended > 0) {
if (minfo.state == Member_state::RECOVERING ||
recovery_channel.status() != Replication_channel::OFF) {
mysqlshdk::mysql::Replication_channel_master_info master_info;
mysqlshdk::mysql::Replication_channel_relay_log_info relay_info;
mysqlshdk::mysql::get_channel_info(
*instance, mysqlshdk::gr::k_gr_recovery_channel, &master_info,
&relay_info);
if (!recovery_info) recovery_info = shcore::Value::new_map();
(*recovery_info.as_map())["recoveryChannel"] = shcore::Value(
channel_status(&recovery_channel, &master_info, &relay_info, "",
*m_extended - 1, true, false));
}
}
if (recovery_info) (*member)["recovery"] = recovery_info;
// Include applier channel info ONLINE and channel not ON
// or != RECOVERING and channel not OFF
if (mysqlshdk::mysql::get_channel_status(
*instance, mysqlshdk::gr::k_gr_applier_channel,
&applier_channel) &&
*m_extended > 0) {
if ((self_state == Member_state::ONLINE &&
applier_channel.status() != Replication_channel::ON) ||
(self_state != Member_state::RECOVERING &&
self_state != Member_state::ONLINE &&
applier_channel.status() != Replication_channel::OFF)) {
mysqlshdk::mysql::Replication_channel_master_info master_info;
mysqlshdk::mysql::Replication_channel_relay_log_info relay_info;
mysqlshdk::mysql::get_channel_info(
*instance, mysqlshdk::gr::k_gr_applier_channel, &master_info,
&relay_info);
(*member)["applierChannel"] = shcore::Value(
channel_status(&applier_channel, &master_info, &relay_info, "",
*m_extended - 1, false, false));
}
}
}
} else {
(*member)["shellConnectError"] =
shcore::Value(m_member_connect_errors[inst.md.endpoint]);
}
feed_metadata_info(member, inst.md);
feed_member_info(member, minfo, offline_mode, super_read_only,
fence_sysvars, self_state, auto_rejoin);
{
shcore::Array_t issues = instance_diagnostics(
instance.get(), &m_cluster, inst, recovery_channel, applier_channel,
super_read_only, minfo, self_state, parallel_applier_options,
*m_cluster_transaction_size_limit);
if (offline_mode.value_or(false)) {
issues->push_back(
shcore::Value("WARNING: Instance has 'offline_mode' enabled."));
} else if (instance && instance->is_set_persist_supported()) {
auto value = instance->get_persisted_value("offline_mode");
if (value.has_value() && shcore::str_caseeq(*value, "ON")) {
issues->push_back(shcore::Value(
"WARNING: Instance has 'offline_mode' enabled and persisted. In "
"the event that this instance becomes a primary, Shell or other "
"members will be prevented from connecting to it disrupting the "
"Cluster's normal functioning."));
}
}
if (instance) {
auto ret_val = validate_instance_recovery_user(
instance, endpoints, inst.actual_server_uuid, self_state);
// if the last check returned an error, the next check will too, so
// there's no need to show the user two different msgs that kind of
// point to the same thing (although for different reasons)
if (ret_val->empty()) {
auto it = mismatched_recovery_accounts.find(instance->get_id());
if (it != mismatched_recovery_accounts.end()) {
auto msg = shcore::str_format(
"WARNING: Incorrect recovery account (%s) being used. Use "
"Cluster.rescan() to repair.",
it->second.c_str());
ret_val->push_back(shcore::Value(std::move(msg)));
}
}
issues->insert(issues->end(), std::make_move_iterator(ret_val->begin()),
std::make_move_iterator(ret_val->end()));
}
// check if primary has unused recovery accounts
if (minfo.role == Member_role::PRIMARY) {
auto ret_val =
validate_recovery_accounts_unused(mismatched_recovery_accounts);
issues->insert(issues->end(), std::make_move_iterator(ret_val->begin()),
std::make_move_iterator(ret_val->end()));
}
if (issues && !issues->empty()) {
(*member)["instanceErrors"] = shcore::Value(std::move(issues));
}
}
if ((m_extended.value_or(0) >= 2) &&
member_stats.find(inst.md.uuid) != member_stats.end()) {
shcore::Dictionary_t mdict = member;
auto dict_for = [mdict](const std::string &key) {
if (!mdict->has_key(key)) {
(*mdict)[key] = shcore::Value(shcore::make_dict());
}
return mdict->get_map(key);
};
if (member_stats[inst.md.uuid].first) {
feed_member_stats(dict_for("recovery"),
member_stats[inst.md.uuid].first);
}
if (member_stats[inst.md.uuid].second) {
feed_member_stats(dict_for("transactions"),
member_stats[inst.md.uuid].second);
}
}
(*dict)[inst.md.label] = shcore::Value(member);
}
return dict;
}