modules/adminapi/cluster/status.cc (1,563 lines of code) (raw):

/* * Copyright (c) 2018, 2024, Oracle and/or its affiliates. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2.0, * as published by the Free Software Foundation. * * This program is designed to work with certain software (including * but not limited to OpenSSL) that is licensed under separate terms, * as designated in a particular file or component or in included license * documentation. The authors of MySQL hereby grant you an additional * permission to link the program and your derivative works with the * separately licensed software that they have either included with * the program or referenced in the documentation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See * the GNU General Public License, version 2.0, for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "modules/adminapi/cluster/status.h" #include <algorithm> #include <optional> #include <string> #include "modules/adminapi/cluster_set/cluster_set_impl.h" #include "modules/adminapi/common/common.h" #include "modules/adminapi/common/common_status.h" #include "modules/adminapi/common/metadata_storage.h" #include "modules/adminapi/common/parallel_applier_options.h" #include "modules/adminapi/common/server_features.h" #include "modules/adminapi/common/sql.h" #include "mysqlshdk/libs/mysql/clone.h" #include "mysqlshdk/libs/mysql/group_replication.h" #include "mysqlshdk/libs/mysql/repl_config.h" #include "mysqlshdk/libs/utils/debug.h" namespace mysqlsh { namespace dba { namespace cluster { namespace { template <typename R> inline bool set_uint(shcore::Dictionary_t dict, const std::string &prop, const R &row, const std::string &field) { if (row.has_field(field)) { if (row.is_null(field)) { (*dict)[prop] = shcore::Value::Null(); } else { (*dict)[prop] = shcore::Value(row.get_uint(field)); } return true; } return false; } template <typename R> inline bool set_secs(shcore::Dictionary_t dict, const std::string &prop, const R &row, const std::string &field) { if (row.has_field(field)) { if (row.is_null(field)) { (*dict)[prop] = shcore::Value::Null(); } else { (*dict)[prop] = shcore::Value(row.get_int(field) / 1000000.0); } return true; } return false; } template <typename R> inline bool set_string(shcore::Dictionary_t dict, const std::string &prop, const R &row, const std::string &field) { if (row.has_field(field)) { if (row.is_null(field)) { (*dict)[prop] = shcore::Value::Null(); } else { std::string field_str = row.get_string(field); // Strip any newline from the string, especially important for GTID-sets field_str.erase(std::remove(field_str.begin(), field_str.end(), '\n'), field_str.end()); (*dict)[prop] = shcore::Value(field_str); } return true; } return false; } template <typename R> inline bool set_ts(shcore::Dictionary_t dict, const std::string &prop, const R &row, const std::string &field) { if (row.has_field(field)) { if (row.is_null(field)) { (*dict)[prop] = shcore::Value::Null(); } else { std::string ts = row.get_string(field); if (shcore::str_beginswith(ts, "0000-00-00 00:00:00")) { (*dict)[prop] = shcore::Value(""); } else { (*dict)[prop] = shcore::Value(ts); } } return true; } return false; } } // namespace Status::Status(const Cluster_impl &cluster, std::optional<uint64_t> extended) : m_cluster(cluster), m_extended(extended) {} Status::~Status() = default; void Status::connect_to_members() { auto ipool = current_ipool(); for (const auto &inst : m_instances) { try { m_member_sessions[inst.endpoint] = ipool->connect_unchecked_endpoint(inst.endpoint); } catch (const shcore::Error &e) { m_member_connect_errors[inst.endpoint] = e.format(); } } } shcore::Dictionary_t Status::check_group_status( const mysqlsh::dba::Instance &instance, const std::vector<mysqlshdk::gr::Member> &members, bool has_quorum) { shcore::Dictionary_t dict = shcore::make_dict(); m_no_quorum = !has_quorum; int total_in_group = 0; int online_count = 0; int quorum_size = 0; // count inconsistencies in the group vs metadata int missing_from_group = 0; for (const auto &inst : m_instances) { if (std::find_if(members.begin(), members.end(), [&inst](const mysqlshdk::gr::Member &member) { return member.uuid == inst.uuid; }) == members.end()) { missing_from_group++; } } for (const auto &member : members) { total_in_group++; if (member.state == mysqlshdk::gr::Member_state::ONLINE || member.state == mysqlshdk::gr::Member_state::RECOVERING) quorum_size++; if (member.state == mysqlshdk::gr::Member_state::ONLINE) online_count++; } size_t number_of_failures_tolerated = quorum_size > 0 ? (quorum_size - 1) / 2 : 0; Cluster_status rs_status; std::string desc_status; if (online_count == 0) { rs_status = Cluster_status::ERROR; if (has_quorum) desc_status = "There are no ONLINE members in the cluster."; else desc_status = "Cluster has no quorum as visible from '" + instance.descr() + "' and no ONLINE members that can be used to restore it."; } else if (!has_quorum) { rs_status = Cluster_status::NO_QUORUM; desc_status = "Cluster has no quorum as visible from '" + instance.descr() + "' and cannot process write transactions."; } else { if (m_cluster.is_fenced_from_writes()) { rs_status = Cluster_status::FENCED_WRITES; desc_status = "Cluster is fenced from Write Traffic."; } else if (m_cluster.is_cluster_set_member() && m_cluster.is_invalidated()) { rs_status = Cluster_status::INVALIDATED; desc_status = "Cluster was invalidated by the ClusterSet it belongs to."; } else if (number_of_failures_tolerated == 0) { if (missing_from_group > 0 || total_in_group != quorum_size) rs_status = Cluster_status::OK_NO_TOLERANCE_PARTIAL; else rs_status = Cluster_status::OK_NO_TOLERANCE; desc_status = "Cluster is NOT tolerant to any failures."; } else { if (missing_from_group > 0 || total_in_group != quorum_size) { rs_status = Cluster_status::OK_PARTIAL; } else { rs_status = Cluster_status::OK; } if (number_of_failures_tolerated == 1) { desc_status = "Cluster is ONLINE and can tolerate up to ONE failure."; } else { desc_status = "Cluster is ONLINE and can tolerate up to " + std::to_string(number_of_failures_tolerated) + " failures."; } } } if (static_cast<int>(m_instances.size()) > online_count) { if (m_instances.size() - online_count == 1) { desc_status.append(" 1 member is not active."); } else { desc_status.append(" " + std::to_string(m_instances.size() - online_count) + " members are not active."); } } (*dict)["statusText"] = shcore::Value(desc_status); (*dict)["status"] = shcore::Value(to_string(rs_status)); return dict; } const Instance_metadata *Status::instance_with_uuid(const std::string &uuid) { for (const auto &i : m_instances) { if (i.uuid == uuid) return &i; } return nullptr; } Member_stats_map Status::query_member_stats() { Member_stats_map stats; auto group_instance = m_cluster.get_cluster_server(); if (group_instance) { try { auto member_stats = group_instance->query( "SELECT * FROM performance_schema.replication_group_member_stats"); while (auto row = member_stats->fetch_one_named()) { std::string channel = row.get_string("CHANNEL_NAME"); if (channel == "group_replication_applier") { stats[row.get_string("MEMBER_ID")].second = mysqlshdk::db::Row_by_name(row); } else if (channel == "group_replication_recovery") { stats[row.get_string("MEMBER_ID")].first = mysqlshdk::db::Row_by_name(row); } } } catch (const mysqlshdk::db::Error &e) { throw shcore::Exception::mysql_error_with_code( group_instance->descr() + ": " + e.what(), e.code()); } } return stats; } void Status::collect_last_error(shcore::Dictionary_t dict, const mysqlshdk::db::Row_ref_by_name &row, const std::string &prefix, const std::string &key_prefix) { if (row.has_field(prefix + "ERROR_NUMBER") && row.get_uint(prefix + "ERROR_NUMBER") != 0) { set_uint(dict, key_prefix + "Errno", row, prefix + "ERROR_NUMBER"); set_string(dict, key_prefix + "Error", row, prefix + "ERROR_MESSAGE"); set_ts(dict, key_prefix + "ErrorTimestamp", row, prefix + "ERROR_TIMESTAMP"); } } shcore::Value Status::collect_last(const mysqlshdk::db::Row_ref_by_name &row, const std::string &prefix, const std::string &what) { shcore::Dictionary_t tx = shcore::make_dict(); set_string(tx, "transaction", row, prefix + "_TRANSACTION"); set_ts(tx, "originalCommitTimestamp", row, prefix + "_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP"); set_ts(tx, "immediateCommitTimestamp", row, prefix + "_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP"); set_ts(tx, "startTimestamp", row, prefix + "_TRANSACTION_START_" + what + "_TIMESTAMP"); set_ts(tx, "endTimestamp", row, prefix + "_TRANSACTION_END_" + what + "_TIMESTAMP"); set_secs(tx, "originalCommitToEndTime", row, "LAST_ORIGINAL_COMMIT_TO_END_" + what + "_TIME"); set_secs(tx, "immediateCommitToEndTime", row, "LAST_IMMEDIATE_COMMIT_TO_END_" + what + "_TIME"); set_secs(tx, shcore::str_lower(what) + "Time", row, "LAST_" + what + "_TIME"); set_uint(tx, "retries", row, prefix + "_TRANSACTION_RETRIES_COUNT"); collect_last_error(tx, row, prefix + "_TRANSACTION_LAST_TRANSIENT_", "lastTransient"); return shcore::Value(tx); } shcore::Value Status::collect_current(const mysqlshdk::db::Row_ref_by_name &row, const std::string &prefix, const std::string &what) { if (row.has_field(prefix + "_TRANSACTION")) { std::string gtid = row.get_string(prefix + "_TRANSACTION"); if (!gtid.empty()) { shcore::Dictionary_t tx = shcore::make_dict(); (*tx)["transaction"] = shcore::Value(gtid); set_ts(tx, "originalCommitTimestamp", row, prefix + "_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP"); set_ts(tx, "immediateCommitTimestamp", row, prefix + "_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP"); set_ts(tx, "startTimestamp", row, prefix + "_TRANSACTION_START_" + what + "_TIMESTAMP"); set_secs(tx, "originalCommitToNowTime", row, "CURRENT_ORIGINAL_COMMIT_TO_NOW_TIME"); set_secs(tx, "immediateCommitToNowTime", row, "CURRENT_IMMEDIATE_COMMIT_TO_NOW_TIME"); set_uint(tx, "retries", row, prefix + "_TRANSACTION_RETRIES_COUNT"); collect_last_error(tx, row, prefix + "_TRANSACTION_LAST_TRANSIENT_", "lastTransient"); return shcore::Value(tx); } } return shcore::Value(); } shcore::Value Status::connection_status( const mysqlshdk::db::Row_ref_by_name &row) { shcore::Dictionary_t dict = shcore::make_dict(); // lookup label of the server // (*dict)["source"] = string_from_row(("SOURCE_UUID")); set_uint(dict, "threadId", row, "THREAD_ID"); set_string(dict, "receivedTransactionSet", row, "RECEIVED_TRANSACTION_SET"); collect_last_error(dict, row); set_ts(dict, "lastHeartbeatTimestamp", row, "LAST_HEARTBEAT_TIMESTAMP"); set_uint(dict, "receivedHeartbeats", row, "COUNT_RECEIVED_HEARTBEATS"); auto last = collect_last(row, "LAST_QUEUED", "QUEUE"); if (!last.as_map()->empty()) (*dict)["lastQueued"] = shcore::Value(last); if (auto v = collect_current(row, "QUEUEING", "QUEUE")) { (*dict)["currentlyQueueing"] = v; } return shcore::Value(dict); } shcore::Value Status::coordinator_status( const mysqlshdk::db::Row_ref_by_name &row) { shcore::Dictionary_t dict = shcore::make_dict(); set_uint(dict, "threadId", row, "THREAD_ID"); collect_last_error(dict, row); auto last = collect_last(row, "LAST_PROCESSED", "BUFFER"); if (!last.as_map()->empty()) (*dict)["lastProcessed"] = shcore::Value(last); if (auto v = collect_current(row, "PROCESSING", "BUFFER")) { (*dict)["currentlyProcessing"] = v; } return shcore::Value(dict); } shcore::Value Status::applier_status( const mysqlshdk::db::Row_ref_by_name &row) { shcore::Dictionary_t dict = shcore::make_dict(); set_uint(dict, "threadId", row, "THREAD_ID"); collect_last_error(dict, row); auto last = collect_last(row, "LAST_APPLIED", "APPLY"); if (!last.as_map()->empty()) (*dict)["lastApplied"] = shcore::Value(last); set_string(dict, "lastSeen", row, "LAST_SEEN_TRANSACTION"); if (auto v = collect_current(row, "APPLYING", "APPLY")) { (*dict)["currentlyApplying"] = v; } return shcore::Value(dict); } static constexpr const char *k_calculate_lag_query = R"*( SELECT IF( /* IF replication connection or sql thread is not running, we return 'null', replication isn't working properly */ applier_coordinator_status.SERVICE_STATE = 'OFF' OR conn_status.SERVICE_STATE = 'OFF', 'null', IF( /* When the last queued gtid and last applied gtid is the same, then the applier applied everything in the queue In addition when the applying transaction is 0 (e.g. due to replication restart) we also have nothing to execute */ GTID_SUBTRACT(conn_status.LAST_QUEUED_TRANSACTION, applier_status.LAST_APPLIED_TRANSACTION) = '' OR UNIX_TIMESTAMP(applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0, 'applier_queue_applied', /* Replication lag is the diff between now and the time it was committed by its source */ TIMEDIFF( NOW(6), applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP ) ) ) AS cluster_member_lag FROM performance_schema.replication_connection_status AS conn_status JOIN performance_schema.replication_applier_status_by_worker AS applier_status ON applier_status.channel_name = conn_status.channel_name JOIN performance_schema.replication_applier_status_by_coordinator AS applier_coordinator_status ON applier_coordinator_status.channel_name = conn_status.channel_name WHERE conn_status.channel_name = 'group_replication_applier' ORDER BY /* As we have a row returned per worker thread, we need to obtain the worker that is executing the oldest transaction to find replication lag. Because workers can be idle, we have to put a lower priority on them, hence the ordering by 0-EXECUTING, 1-IDLE first */ IF(GTID_SUBTRACT(conn_status.LAST_QUEUED_TRANSACTION, applier_status.LAST_APPLIED_TRANSACTION) = '' OR UNIX_TIMESTAMP(applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0, '1-IDLE', '0-EXECUTING') ASC, applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP ASC LIMIT 1; )*"; /** * Similar to collect_local_status(), but only includes basic/important * stats that should be displayed in the default output. */ void Status::collect_basic_local_status(shcore::Dictionary_t dict, const mysqlsh::dba::Instance &instance, bool is_primary) { using mysqlshdk::utils::Version; auto version = instance.get_version(); shcore::Dictionary_t applier_node = shcore::make_dict(); shcore::Array_t applier_workers = shcore::make_array(); #define TSDIFF(prefix, start, end) \ "TIMEDIFF(" prefix "_" end ", " prefix "_" start ")" std::string sql; if (version >= Version(8, 0, 0)) { if (m_cluster.is_cluster_set_member()) { // PRIMARY of PC has no relevant replication lag info // PRIMARY of RC shows lag from clusterset_replication channel // SECONDARY members show replication from gr_applier channel std::string channel_name; if (is_primary) { if (!m_cluster.is_primary_cluster()) { channel_name = k_clusterset_async_channel_name; } } else { channel_name = mysqlshdk::gr::k_gr_applier_channel; } if (!channel_name.empty()) { mysqlshdk::mysql::Replication_channel channel; if (mysqlshdk::mysql::get_channel_status(instance, channel_name, &channel)) { dict->set("replicationLagFromOriginalSource", shcore::Value(channel.repl_lag_from_original)); dict->set("replicationLagFromImmediateSource", shcore::Value(channel.repl_lag_from_immediate)); } else { dict->set("replicationLagFromOriginalSource", shcore::Value::Null()); dict->set("replicationLagFromImmediateSource", shcore::Value::Null()); } } } else { auto result = instance.query(k_calculate_lag_query); auto row = result->fetch_one_named(); if (row) { std::string lag = row.get_string("cluster_member_lag", ""); if (lag == "null" || lag.empty()) { (*dict)["replicationLag"] = shcore::Value::Null(); } else { (*dict)["replicationLag"] = shcore::Value(lag); } } } } #undef TSDIFF } /** * Collect per instance replication stats collected from performance_schema * tables. */ void Status::collect_local_status(shcore::Dictionary_t dict, const mysqlsh::dba::Instance &instance, bool recovering) { using mysqlshdk::utils::Version; auto version = instance.get_version(); shcore::Dictionary_t recovery_node = shcore::make_dict(); shcore::Dictionary_t applier_node = shcore::make_dict(); shcore::Array_t recovery_workers = shcore::make_array(); shcore::Array_t applier_workers = shcore::make_array(); #define TSDIFF(prefix, start, end) \ "TIMESTAMPDIFF(MICROSECOND, " prefix "_" start ", " prefix "_" end ")" #define TSDIFF_NOW(prefix, start) \ "TIMESTAMPDIFF(MICROSECOND, " prefix "_" start ", NOW(6))" std::string sql; sql = "SELECT *"; if (version >= Version(8, 0, 0)) { sql += ","; sql += TSDIFF("LAST_APPLIED_TRANSACTION", "ORIGINAL_COMMIT_TIMESTAMP", "END_APPLY_TIMESTAMP"); sql += " AS LAST_ORIGINAL_COMMIT_TO_END_APPLY_TIME,"; sql += TSDIFF("LAST_APPLIED_TRANSACTION", "IMMEDIATE_COMMIT_TIMESTAMP", "END_APPLY_TIMESTAMP"); sql += " AS LAST_IMMEDIATE_COMMIT_TO_END_APPLY_TIME,"; sql += TSDIFF("LAST_APPLIED_TRANSACTION", "START_APPLY_TIMESTAMP", "END_APPLY_TIMESTAMP"); sql += " AS LAST_APPLY_TIME,"; sql += TSDIFF_NOW("APPLYING_TRANSACTION", "ORIGINAL_COMMIT_TIMESTAMP"); sql += " AS CURRENT_ORIGINAL_COMMIT_TO_NOW_TIME,"; sql += TSDIFF_NOW("APPLYING_TRANSACTION", "IMMEDIATE_COMMIT_TIMESTAMP"); sql += " AS CURRENT_IMMEDIATE_COMMIT_TO_NOW_TIME"; } sql += " FROM performance_schema.replication_applier_status_by_worker"; // this can return multiple rows per channel for // multi-threaded applier, otherwise just one. If MT, we also // get stuff in the coordinator table auto result = instance.query(sql); auto row = result->fetch_one_named(); while (row) { std::string channel_name = row.get_string("CHANNEL_NAME"); if (channel_name == "group_replication_recovery") { recovery_workers->push_back(applier_status(row)); } if (channel_name == "group_replication_applier" && row.get_string("SERVICE_STATE") != "OFF") { applier_workers->push_back(applier_status(row)); } row = result->fetch_one_named(); } sql = "SELECT *"; if (version >= Version(8, 0, 0)) { sql += ","; sql += TSDIFF("LAST_PROCESSED_TRANSACTION", "ORIGINAL_COMMIT_TIMESTAMP", "END_BUFFER_TIMESTAMP"); sql += " AS LAST_ORIGINAL_COMMIT_TO_END_BUFFER_TIME,"; sql += TSDIFF("LAST_PROCESSED_TRANSACTION", "IMMEDIATE_COMMIT_TIMESTAMP", "END_BUFFER_TIMESTAMP"); sql += " AS LAST_IMMEDIATE_COMMIT_TO_END_BUFFER_TIME,"; sql += TSDIFF("LAST_PROCESSED_TRANSACTION", "START_BUFFER_TIMESTAMP", "END_BUFFER_TIMESTAMP"); sql += " AS LAST_BUFFER_TIME,"; sql += TSDIFF_NOW("PROCESSING_TRANSACTION", "ORIGINAL_COMMIT_TIMESTAMP"); sql += " AS CURRENT_ORIGINAL_COMMIT_TO_NOW_TIME,"; sql += TSDIFF_NOW("PROCESSING_TRANSACTION", "IMMEDIATE_COMMIT_TIMESTAMP"); sql += " AS CURRENT_IMMEDIATE_COMMIT_TO_NOW_TIME"; } sql += " FROM performance_schema.replication_applier_status_by_coordinator"; result = instance.query(sql); row = result->fetch_one_named(); while (row) { std::string channel_name = row.get_string("CHANNEL_NAME"); if (channel_name == "group_replication_recovery") { (*recovery_node)["coordinator"] = coordinator_status(row); } if (channel_name == "group_replication_applier" && row.get_string("SERVICE_STATE") != "OFF") { (*applier_node)["coordinator"] = coordinator_status(row); } row = result->fetch_one_named(); } sql = "SELECT *"; if (version >= Version(8, 0, 0)) { sql += ","; sql += TSDIFF("LAST_QUEUED_TRANSACTION", "ORIGINAL_COMMIT_TIMESTAMP", "END_QUEUE_TIMESTAMP"); sql += " AS LAST_ORIGINAL_COMMIT_TO_END_QUEUE_TIME,"; sql += TSDIFF("LAST_QUEUED_TRANSACTION", "IMMEDIATE_COMMIT_TIMESTAMP", "END_QUEUE_TIMESTAMP"); sql += " AS LAST_IMMEDIATE_COMMIT_TO_END_QUEUE_TIME,"; sql += TSDIFF("LAST_QUEUED_TRANSACTION", "START_QUEUE_TIMESTAMP", "END_QUEUE_TIMESTAMP"); sql += " AS LAST_QUEUE_TIME,"; sql += TSDIFF_NOW("QUEUEING_TRANSACTION", "ORIGINAL_COMMIT_TIMESTAMP"); sql += " AS CURRENT_ORIGINAL_COMMIT_TO_NOW_TIME,"; sql += TSDIFF_NOW("QUEUEING_TRANSACTION", "IMMEDIATE_COMMIT_TIMESTAMP"); sql += " AS CURRENT_IMMEDIATE_COMMIT_TO_NOW_TIME"; } sql += " FROM performance_schema.replication_connection_status"; result = instance.query(sql); row = result->fetch_one_named(); while (row) { std::string channel_name = row.get_string("CHANNEL_NAME"); if (channel_name == "group_replication_recovery") { (*recovery_node)["connection"] = connection_status(row); } if (channel_name == "group_replication_applier" && row.get_string("SERVICE_STATE") != "OFF") { (*applier_node)["connection"] = connection_status(row); } row = result->fetch_one_named(); } if (!applier_workers->empty()) { (*applier_node)["workers"] = shcore::Value(applier_workers); } if (!recovery_workers->empty() && recovering) { (*recovery_node)["workers"] = shcore::Value(recovery_workers); } if (!applier_node->empty()) { (*dict)["transactions"] = shcore::Value(applier_node); } if (!recovery_node->empty() && recovering) { (*dict)["recovery"] = shcore::Value(recovery_node); } (*dict)["version"] = shcore::Value(version.get_full()); } void Status::feed_metadata_info(shcore::Dictionary_t dict, const Instance_metadata &info) { (*dict)["address"] = shcore::Value(info.endpoint); (*dict)["role"] = shcore::Value("HA"); if (info.hidden_from_router) { (*dict)["hiddenFromRouter"] = shcore::Value::True(); } } void Status::feed_member_info(shcore::Dictionary_t dict, const mysqlshdk::gr::Member &member, std::optional<bool> offline_mode, std::optional<bool> super_read_only, const std::vector<std::string> &fence_sysvars, mysqlshdk::gr::Member_state self_state, bool is_auto_rejoin_running) { (*dict)["readReplicas"] = shcore::Value(shcore::make_dict()); if (m_extended.value_or(0) >= 1) { // Set fenceSysVars array. shcore::Array_t fence_array = shcore::make_array(); for (const auto &sysvar : fence_sysvars) { fence_array->push_back(shcore::Value(sysvar)); } (*dict)["fenceSysVars"] = shcore::Value(fence_array); (*dict)["memberId"] = shcore::Value(member.uuid); } // memberRole instance Role as reported by GR (Primary/Secondary) (*dict)["memberRole"] = shcore::Value(mysqlshdk::gr::to_string(member.role)); if ((m_extended.value_or(0) >= 1) || member.state != self_state) { // memberState is from the point of view of the member itself (*dict)["memberState"] = shcore::Value(mysqlshdk::gr::to_string(self_state)); } // status is the status from the point of view of the quorum (*dict)["status"] = shcore::Value(mysqlshdk::gr::to_string(member.state)); // Set the instance mode (read-only or read-write). if (!offline_mode.has_value() || !super_read_only.has_value() || member.state != mysqlshdk::gr::Member_state::ONLINE) { // offline_mode or super_read_only is null if it could not be obtained from // the instance. (*dict)["mode"] = shcore::Value("n/a"); } else { // We deal with offline mode the same as n/a and set mode to read-only if // there is no quorum otherwise according to the instance super_read_only // value. if (*offline_mode) (*dict)["mode"] = shcore::Value("n/a"); else (*dict)["mode"] = shcore::Value((m_no_quorum || *super_read_only) ? "R/O" : "R/W"); } // Display autoRejoinRunning attribute by default for each member, but only // if running (true). if (is_auto_rejoin_running) { (*dict)["autoRejoinRunning"] = shcore::Value(is_auto_rejoin_running); } if (!member.version.empty()) { (*dict)["version"] = shcore::Value(member.version); } } void Status::feed_member_stats(shcore::Dictionary_t dict, const mysqlshdk::db::Row_by_name &stats) { set_uint(dict, "inQueueCount", stats, "COUNT_TRANSACTIONS_IN_QUEUE"); set_uint(dict, "checkedCount", stats, "COUNT_TRANSACTIONS_CHECKED"); set_uint(dict, "conflictsDetectedCount", stats, "COUNT_CONFLICTS_DETECTED"); set_string(dict, "committedAllMembers", stats, "TRANSACTIONS_COMMITTED_ALL_MEMBERS"); set_string(dict, "lastConflictFree", stats, "LAST_CONFLICT_FREE_TRANSACTION"); set_uint(dict, "inApplierQueueCount", stats, "COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE"); set_uint(dict, "appliedCount", stats, "COUNT_TRANSACTIONS_REMOTE_APPLIED"); set_uint(dict, "proposedCount", stats, "COUNT_TRANSACTIONS_LOCAL_PROPOSED"); set_uint(dict, "rollbackCount", stats, "COUNT_TRANSACTIONS_LOCAL_ROLLBACK"); } namespace { shcore::Value distributed_progress( const mysqlshdk::mysql::IInstance &instance) { shcore::Dictionary_t dict = shcore::make_dict(); mysqlshdk::mysql::Replication_channel channel; if (mysqlshdk::mysql::get_channel_status( instance, mysqlshdk::gr::k_gr_recovery_channel, &channel)) { dict->set("state", shcore::Value(mysqlshdk::mysql::to_string(channel.status()))); if (channel.receiver.last_error.code != 0) { dict->set("receiverErrorNumber", shcore::Value(channel.receiver.last_error.code)); dict->set("receiverError", shcore::Value(channel.receiver.last_error.message)); } for (const auto &applier : channel.appliers) { if (applier.last_error.code != 0) { dict->set("applierErrorNumber", shcore::Value(applier.last_error.code)); dict->set("applierError", shcore::Value(applier.last_error.message)); break; } } } return shcore::Value(dict); } shcore::Value clone_progress(const mysqlshdk::mysql::IInstance &instance, const std::string &begin_time) { shcore::Dictionary_t dict = shcore::make_dict(); mysqlshdk::mysql::Clone_status status = mysqlshdk::mysql::check_clone_status(instance, begin_time); dict->set("cloneState", shcore::Value(status.state)); dict->set("cloneStartTime", shcore::Value(status.begin_time)); if (status.error_n) dict->set("errorNumber", shcore::Value(status.error_n)); if (!status.error.empty()) dict->set("error", shcore::Value(status.error)); if (!status.stages.empty()) { auto stage = status.stages[status.current_stage()]; dict->set("currentStage", shcore::Value(stage.stage)); dict->set("currentStageState", shcore::Value(stage.state)); if (stage.work_estimated > 0) { dict->set( "currentStageProgress", shcore::Value(stage.work_completed * 100.0 / stage.work_estimated)); } } else { // When the check for clone progress is executed right after clone has // started there's a tiny gap of time on which P_S.clone_progress hasn't // been populated yet. If we ran into that scenario, we must manually // populate the dictionary fields with undefined values to avoid a segfault. dict->set("currentStage", shcore::Value()); dict->set("currentStageState", shcore::Value()); dict->set("currentStageProgress", shcore::Value()); } return shcore::Value(dict); } std::pair<std::string, shcore::Value> recovery_status( const mysqlshdk::mysql::IInstance &instance, const std::string &join_timestamp) { std::string status; shcore::Value info; mysqlshdk::gr::Group_member_recovery_status recovery = mysqlshdk::gr::detect_recovery_status(instance, join_timestamp); switch (recovery) { case mysqlshdk::gr::Group_member_recovery_status::DISTRIBUTED: status = "Distributed recovery in progress"; info = distributed_progress(instance); break; case mysqlshdk::gr::Group_member_recovery_status::DISTRIBUTED_ERROR: status = "Distributed recovery error"; info = distributed_progress(instance); break; case mysqlshdk::gr::Group_member_recovery_status::CLONE: status = "Cloning in progress"; info = clone_progress(instance, join_timestamp); break; case mysqlshdk::gr::Group_member_recovery_status::CLONE_ERROR: status = "Clone error"; info = clone_progress(instance, join_timestamp); break; case mysqlshdk::gr::Group_member_recovery_status::UNKNOWN: status = "Recovery in progress"; break; case mysqlshdk::gr::Group_member_recovery_status::UNKNOWN_ERROR: status = "Recovery error"; break; case mysqlshdk::gr::Group_member_recovery_status::DONE_OFFLINE: case mysqlshdk::gr::Group_member_recovery_status::DONE_ONLINE: return {"", shcore::Value()}; } return {status, info}; } void check_parallel_appliers( shcore::Array_t issues, const mysqlshdk::utils::Version &instance_version, const Parallel_applier_options &parallel_applier_options) { auto current_values = parallel_applier_options.get_current_settings(instance_version); auto required_values = parallel_applier_options.get_required_values(instance_version); for (const auto &setting : required_values) { auto current_value = current_values[std::get<0>(setting)].value_or(""); if (!current_value.empty() && current_value != std::get<1>(setting)) { issues->push_back(shcore::Value( "NOTE: The required parallel-appliers settings are not enabled " "on the instance. Use dba.configureInstance() to fix it.")); break; } } } void check_channel_error(shcore::Array_t issues, const std::string &channel_label, const mysqlshdk::mysql::Replication_channel &channel) { using mysqlshdk::mysql::Replication_channel; switch (channel.status()) { case Replication_channel::OFF: case Replication_channel::ON: case Replication_channel::RECEIVER_OFF: case Replication_channel::APPLIER_OFF: case Replication_channel::CONNECTING: break; case Replication_channel::CONNECTION_ERROR: issues->push_back(shcore::Value( "ERROR: receiver thread of " + channel_label + " channel stopped with an error: " + mysqlshdk::mysql::to_string(channel.receiver.last_error))); break; case Replication_channel::APPLIER_ERROR: for (const auto &a : channel.appliers) { if (a.last_error.code != 0) { issues->push_back( shcore::Value("ERROR: applier thread of " + channel_label + " channel stopped with an error: " + mysqlshdk::mysql::to_string(a.last_error))); break; } } break; } } void check_unrecognized_channels(shcore::Array_t issues, Instance *instance, bool is_cluster_set_member) { if (instance) { auto channels = mysqlshdk::mysql::get_incoming_channel_names(*instance, false); for (const std::string &name : channels) { if (!(name == mysqlshdk::gr::k_gr_applier_channel || name == mysqlshdk::gr::k_gr_recovery_channel || (is_cluster_set_member && name == k_clusterset_async_channel_name))) { issues->push_back(shcore::Value( "ERROR: Unrecognized replication channel '" + name + "' found. Unmanaged replication channels are not supported.")); } } } } void check_host_metadata(shcore::Array_t issues, Instance *instance, const Instance_metadata_info &instance_md) { auto address = instance->get_canonical_address(); if (!mysqlshdk::utils::are_endpoints_equal(instance_md.md.endpoint, address)) { issues->push_back( shcore::Value("ERROR: Metadata for this instance does not match " "hostname reported by instance (metadata=" + instance_md.md.endpoint + ", actual=" + address + "). Use rescan() to update the metadata.")); } // Check if the instance's X plugin port matches the metadata std::optional<int> xport = instance->get_xport(); std::string x_address; if (xport.has_value()) { x_address = mysqlshdk::utils::make_host_and_port( instance->get_canonical_hostname(), *xport); } if (!mysqlshdk::utils::are_endpoints_equal(instance_md.md.xendpoint, x_address)) { issues->push_back(shcore::Value( "ERROR: Metadata for this instance does not match " "X plugin port reported by instance (metadata=" + instance_md.md.xendpoint + ", actual=" + (x_address.empty() ? "<disabled>" : x_address) + "). Use rescan() to update the metadata.")); } } void check_transaction_size_limit(shcore::Array_t issues, Instance *instance, int64_t cluster_transaction_size_limit) { // Check if the instance's value for group_replication_transaction_size_limit // matches the Cluster's one int64_t instance_transaction_size_limit = instance->get_sysvar_int(kGrTransactionSizeLimit).value_or(0); if (cluster_transaction_size_limit != -1 && instance_transaction_size_limit != cluster_transaction_size_limit) { issues->push_back(shcore::Value( "WARNING: The value of 'group_replication_transaction_size_limit' does " "not match the Cluster's configured value. Use Cluster.rescan() to " "repair.")); } } void check_auth_type_instance_ssl(shcore::Array_t issues, const Instance &instance, const Cluster_impl &cluster) { auto auth_type = cluster.query_cluster_auth_type(); if (auth_type == Replication_auth_type::PASSWORD) return; // if we can be more specific switch (auth_type) { case Replication_auth_type::CERT_SUBJECT: case Replication_auth_type::CERT_SUBJECT_PASSWORD: if (cluster.query_cluster_instance_auth_cert_subject(instance).empty()) { auto msg = shcore::str_format( "WARNING: memberAuthType is set to '%s' but there's no " "'certSubject' configured for this instance, which prevents all " "other instances from reaching it, compromising the Cluster. " "Please remove the instance from the Cluster and use the most " "recent version of the shell to re-add it back.", to_string(auth_type).c_str()); issues->push_back(shcore::Value(std::move(msg))); return; } default: break; } // check if instance user was created correctly bool has_issuer{false}, has_subject{false}; { auto instance_repl_account = cluster.get_metadata_storage()->get_instance_repl_account( instance.get_uuid(), Cluster_type::GROUP_REPLICATION); if (std::get<0>(instance_repl_account).empty()) { // if this happens, a warning is already shown to the user in // validate_instance_recovery_user(), so there's no point showing a second // error message related to the same thing, even though it almost surely // means that the user was created with an older version of the shell return; } if (auto server = cluster.get_cluster_server(); server) { auto res = server->queryf( "SELECT coalesce(length(x509_issuer)), " "coalesce(length(x509_subject)) FROM mysql.user WHERE (user = ?)", std::get<0>(instance_repl_account)); if (auto row = res->fetch_one()) { has_issuer = row->get_int(0) > 0; has_subject = row->get_int(1) > 0; } } } DBUG_EXECUTE_IF("fail_recovery_user_status_check_issuer", { has_issuer = false; }); DBUG_EXECUTE_IF("fail_recovery_user_status_check_subject", { has_subject = false; }); switch (auth_type) { case Replication_auth_type::CERT_ISSUER: case Replication_auth_type::CERT_ISSUER_PASSWORD: if (!has_issuer) { auto msg = shcore::str_format( "WARNING: memberAuthType is set to '%s' but there's no " "'certIssuer' configured for this instance recovery user, which " "prevents all other instances from reaching it, compromising the " "Cluster. Please remove the instance from the Cluster and use the " "most recent version of the shell to re-add it back.", to_string(auth_type).c_str()); issues->push_back(shcore::Value(std::move(msg))); } break; case Replication_auth_type::CERT_SUBJECT: case Replication_auth_type::CERT_SUBJECT_PASSWORD: if (!has_issuer || !has_subject) { auto msg = shcore::str_format( "WARNING: memberAuthType is set to '%s' but there's no " "'certIssuer' and/or 'certSubject' configured for this instance " "recovery user, which prevents all other instances from reaching " "it, compromising the Cluster. Please remove the instance from the " "Cluster and use the most recent version of the shell to re-add it " "back.", to_string(auth_type).c_str()); issues->push_back(shcore::Value(std::move(msg))); } break; default: break; } } shcore::Array_t instance_diagnostics( Instance *instance, const Cluster_impl *cluster, const Instance_metadata_info &instance_md, const mysqlshdk::mysql::Replication_channel &recovery_channel, const mysqlshdk::mysql::Replication_channel &applier_channel, std::optional<bool> super_read_only, const mysqlshdk::gr::Member &minfo, mysqlshdk::gr::Member_state self_state, const Parallel_applier_options &parallel_applier_options, int64_t cluster_transaction_size_limit) { shcore::Array_t issues = shcore::make_array(); // split-brain if ((minfo.state == mysqlshdk::gr::Member_state::UNREACHABLE || minfo.state == mysqlshdk::gr::Member_state::MISSING) && (self_state == mysqlshdk::gr::Member_state::ONLINE || self_state == mysqlshdk::gr::Member_state::RECOVERING)) { issues->push_back(shcore::Value( "ERROR: split-brain! Instance is not part of the majority group, " "but has state " + to_string(self_state))); } // SECONDARY (or PRIMARY of replica cluster) that's not SRO if (cluster->is_cluster_set_member() && !(minfo.role == mysqlshdk::gr::Member_role::PRIMARY && cluster->is_primary_cluster()) && self_state != mysqlshdk::gr::Member_state::RECOVERING && super_read_only.has_value() && !*super_read_only) { issues->push_back(shcore::Value( "WARNING: Instance is NOT the global PRIMARY but super_read_only " "option is OFF. Errant transactions and inconsistencies may be " "accidentally introduced.")); } else if (minfo.role == mysqlshdk::gr::Member_role::SECONDARY && self_state != mysqlshdk::gr::Member_state::RECOVERING && super_read_only.has_value() && !*super_read_only) { issues->push_back( shcore::Value("WARNING: Instance is NOT a PRIMARY but super_read_only " "option is OFF.")); } check_channel_error(issues, "Group Replication Recovery", recovery_channel); check_channel_error(issues, "Group Replication Applier", applier_channel); check_unrecognized_channels(issues, instance, cluster->is_cluster_set_member()); if (self_state == mysqlshdk::gr::Member_state::OFFLINE) { issues->push_back(shcore::Value("NOTE: group_replication is stopped.")); } else if (self_state == mysqlshdk::gr::Member_state::ERROR) { issues->push_back( shcore::Value("ERROR: group_replication has stopped with an error.")); } if (instance_md.actual_server_uuid != instance_md.md.uuid) { issues->push_back( shcore::Value("WARNING: server_uuid for instance has changed from " "its last known value. Use cluster.rescan() to update " "the metadata.")); } else if (instance_md.md.cluster_id.empty() && self_state != mysqlshdk::gr::Member_state::RECOVERING) { issues->push_back( shcore::Value("WARNING: Instance is not managed by InnoDB cluster. Use " "cluster.rescan() to repair.")); } else if (instance_md.md.server_id == 0) { issues->push_back(shcore::Value( "NOTE: instance server_id is not registered in the metadata. Use " "cluster.rescan() to update the metadata.")); } try { mysqlshdk::utils::split_host_and_port(instance_md.md.grendpoint); } catch (const std::invalid_argument &e) { issues->push_back(shcore::Value( "ERROR: Invalid or missing information of Group Replication's network " "address in metadata. Use Cluster.rescan() to update the metadata.")); } // check if value of report_host matches what's in the metadata if (instance) check_host_metadata(issues, instance, instance_md); // Check if parallel-appliers are not configured. The requirement was // introduced in 8.0.23 so only check if the version is equal or higher to // that. mysqlshdk::utils::Version instance_version; if (minfo.version.empty()) { if (instance) instance_version = instance->get_version(); } else { instance_version = mysqlshdk::utils::Version(minfo.version); } if (instance_version >= mysqlshdk::utils::Version(8, 0, 23)) { check_parallel_appliers(issues, instance_version, parallel_applier_options); } if (instance) { check_transaction_size_limit(issues, instance, cluster_transaction_size_limit); } // check if instance (if secondary) has a seemingly correct SSL settings if (instance) check_auth_type_instance_ssl(issues, *instance, *cluster); return issues; } void check_comm_protocol_upgrade_possible( shcore::Array_t issues, const std::vector<mysqlshdk::gr::Member> &member_info, const mysqlshdk::utils::Version &protocol_version) { std::vector<mysqlshdk::utils::Version> all_protocol_versions; // Check if protocol upgrade is possible // If there's no version field in member_info it's because it was queried on // an old version, but in that case it means there's at least 1 server // < 8.0.16 so an upgrade isn't possible anyway for (const auto &m : member_info) { if (!m.version.empty()) { auto version = mysqlshdk::gr::get_max_supported_group_protocol_version( mysqlshdk::utils::Version(m.version)); all_protocol_versions.emplace_back(version); } } if (!all_protocol_versions.empty()) { std::sort(all_protocol_versions.begin(), all_protocol_versions.end()); // get the lowest protocol version supported by the group members auto highest_possible_version = all_protocol_versions.front(); if (highest_possible_version > protocol_version) { std::string str = "Group communication protocol in use is version " + protocol_version.get_full() + " but it is possible to upgrade to " + highest_possible_version.get_full() + "."; if (protocol_version < mysqlshdk::utils::Version(8, 0, 16)) { str += " Message fragmentation for large transactions"; if (highest_possible_version == mysqlshdk::utils::Version(8, 0, 27)) { str += " and Single Consensus Leader"; } str += " can only be enabled after upgrade."; } else { str += " Single Consensus Leader can only be enabled after upgrade."; } str += " Use Cluster.rescan({upgradeCommProtocol:true}) to upgrade."; issues->emplace_back(shcore::Value(str)); } } } void check_view_change_uuid_md(shcore::Array_t issues, const Cluster_impl &cluster) { auto group_instance = cluster.get_cluster_server(); // If there's primary we can't check anything, exit if (!group_instance) return; // Check if group_replication_view_change_uuid is set on the cluster and if // so, if it matches the value stored in the Metadata auto view_change_uuid = cluster.get_cluster_server()->get_sysvar_string( "group_replication_view_change_uuid", ""); // Not in use, exit if (view_change_uuid.empty() || view_change_uuid == "AUTOMATIC") return; auto md_view_change_uuid = cluster.get_view_change_uuid(); if (md_view_change_uuid.empty()) { issues->push_back(shcore::Value( "WARNING: The Cluster's group_replication_view_change_uuid is not " "stored in the Metadata. Please use <Cluster>.rescan() to update the " "metadata.")); } else { if (md_view_change_uuid != view_change_uuid) { issues->push_back(shcore::Value( "WARNING: The Cluster's group_replication_view_change_uuid value " "in use does not match the value stored in the Metadata. Please " "use <Cluster>.rescan() to update the metadata.")); } } } shcore::Array_t cluster_diagnostics( const Cluster_impl &cluster, const std::vector<mysqlshdk::gr::Member> &member_info, const mysqlshdk::utils::Version &protocol_version) { shcore::Array_t issues = shcore::make_array(); if (protocol_version && !member_info.empty()) check_comm_protocol_upgrade_possible(issues, member_info, protocol_version); // Verify Metadata consistency regarding group_replication_view_change_uuid check_view_change_uuid_md(issues, cluster); switch (cluster.cluster_availability()) { case Cluster_availability::NO_QUORUM: issues->push_back( shcore::Value("ERROR: Could not find ONLINE members forming a " "quorum. Cluster will be unable to perform updates " "until it's restored.")); break; case Cluster_availability::OFFLINE: case Cluster_availability::SOME_UNREACHABLE: issues->push_back(shcore::Value( "ERROR: Cluster members are reachable but they're all OFFLINE.")); break; case Cluster_availability::UNREACHABLE: issues->push_back(shcore::Value( "ERROR: Could not connect to any ONLINE members but " "there are unreachable instances that could still be ONLINE.")); break; case Cluster_availability::ONLINE: case Cluster_availability::ONLINE_NO_PRIMARY: break; } if (cluster.is_fenced_from_writes()) { issues->push_back( shcore::Value("WARNING: Cluster is fenced from Write traffic. Use " "cluster.unfenceWrites() to unfence the Cluster.")); } return issues; } shcore::Array_t validate_instance_recovery_user( std::shared_ptr<Instance> instance, const std::map<std::string, std::string> &endpoints, const std::string &actual_server_uuid, mysqlshdk::gr::Member_state state) { auto issues = shcore::make_array(); if (state != mysqlshdk::gr::Member_state::ONLINE && state != mysqlshdk::gr::Member_state::RECOVERING) return issues; if (!instance || endpoints.empty()) return issues; auto it = endpoints.find(actual_server_uuid); if (it == endpoints.end()) return issues; std::string recovery_user = mysqlshdk::mysql::get_replication_user( *instance, mysqlshdk::gr::k_gr_recovery_channel); // it->second is recovery user from MD if (!recovery_user.empty()) { bool recovery_is_valid = shcore::str_beginswith( recovery_user, mysqlshdk::gr::k_group_recovery_old_user_prefix) || shcore::str_beginswith(recovery_user, mysqlshdk::gr::k_group_recovery_user_prefix); if (it->second.empty() && recovery_is_valid) { issues->push_back( shcore::Value("WARNING: The replication recovery account in use " "by the instance is not stored in the metadata. " "Use Cluster.rescan() to update the metadata.")); } else if (it->second != recovery_user) { if (recovery_is_valid) { issues->push_back( shcore::Value("WARNING: The replication recovery account in use " "by the instance is not stored in the metadata. " "Use Cluster.rescan() to update the metadata.")); } else { issues->push_back(shcore::Value( "WARNING: Unsupported recovery account '" + recovery_user + "' has been found for instance '" + instance->descr() + "'. Operations such as Cluster.resetRecoveryAccountsPassword() and " "Cluster.addInstance() may fail. Please remove and add " "the instance back to the Cluster to ensure a supported recovery " "account is used.")); } } } else { issues->push_back(shcore::Value( "WARNING: Recovery user account not found for server address: " + instance->descr() + " with UUID " + instance->get_uuid())); } return issues; } } // namespace shcore::Dictionary_t Status::get_topology( const std::vector<mysqlshdk::gr::Member> &member_info) { using mysqlshdk::gr::Member_role; using mysqlshdk::gr::Member_state; using mysqlshdk::mysql::Replication_channel; Member_stats_map member_stats = query_member_stats(); shcore::Dictionary_t dict = shcore::make_dict(); auto get_member = [&member_info](const std::string &uuid) { for (const auto &m : member_info) { if (m.uuid == uuid) return m; } return mysqlshdk::gr::Member(); }; std::vector<Instance_metadata_info> instances; // add placeholders for unmanaged members for (const auto &m : member_info) { bool found = false; for (const auto &i : m_instances) { if (i.uuid == m.uuid) { Instance_metadata_info mdi; mdi.md = i; mdi.actual_server_uuid = m.uuid; instances.emplace_back(std::move(mdi)); found = true; break; } } if (!found) { // if instance in MD was not found by uuid, then search by address for (const auto &i : m_instances) { if (!mysqlshdk::utils::are_endpoints_equal( i.address, mysqlshdk::utils::make_host_and_port(m.host, m.port))) continue; log_debug( "Instance with address=%s is in group, but UUID doesn't match " "group=%s MD=%s", i.endpoint.c_str(), m.uuid.c_str(), i.uuid.c_str()); Instance_metadata_info mdi; mdi.md = i; mdi.actual_server_uuid = m.uuid; instances.emplace_back(std::move(mdi)); found = true; break; } } if (!found) { Instance_metadata_info mdi; mdi.md.address = mysqlshdk::utils::make_host_and_port(m.host, m.port); mdi.md.label = mdi.md.address; mdi.md.uuid = m.uuid; mdi.md.endpoint = mdi.md.address; mdi.actual_server_uuid = mdi.md.uuid; log_debug("Instance %s with uuid=%s found in group but not in MD", mdi.md.address.c_str(), m.uuid.c_str()); auto group_instance = m_cluster.get_cluster_server(); mysqlshdk::db::Connection_options opts(mdi.md.endpoint); mysqlshdk::db::Connection_options group_session_copts( group_instance->get_connection_options()); opts.set_login_options_from(group_session_copts); try { m_member_sessions[mdi.md.endpoint] = Instance::connect(opts); } catch (const shcore::Error &e) { m_member_connect_errors[mdi.md.endpoint] = e.format(); } instances.emplace_back(std::move(mdi)); } } // look for instances in MD but not in group for (const auto &i : m_instances) { bool found = false; for (const auto &m : member_info) { if (m.uuid == i.uuid || mysqlshdk::utils::are_endpoints_equal( i.endpoint, mysqlshdk::utils::make_host_and_port(m.host, m.port))) { found = true; break; } } if (!found) { log_debug("Instance with uuid=%s address=%s is in MD but not in group", i.uuid.c_str(), i.endpoint.c_str()); auto &instance = m_member_sessions[i.endpoint]; Instance_metadata_info mdi; mdi.md = i; if (instance) { mdi.actual_server_uuid = instance->get_uuid(); } else { // fallback to assuming the server_uuid is OK if we can't connect to // the instance mdi.actual_server_uuid = i.uuid; } instances.emplace_back(std::move(mdi)); } } std::map<std::string, std::string> endpoints; if (m_cluster.get_metadata_storage()->real_version().get_major() > 1) { endpoints = m_cluster.get_metadata_storage()->get_instances_with_recovery_accounts( m_cluster.get_id()); } auto mismatched_recovery_accounts = m_cluster.get_mismatched_recovery_accounts(); for (const auto &inst : instances) { shcore::Dictionary_t member = shcore::make_dict(); mysqlshdk::gr::Member minfo(get_member(inst.actual_server_uuid)); mysqlshdk::gr::Member_state self_state = mysqlshdk::gr::Member_state::MISSING; auto &instance = m_member_sessions[inst.md.endpoint]; std::optional<bool> super_read_only; std::optional<bool> offline_mode; std::vector<std::string> fence_sysvars; bool auto_rejoin = false; Replication_channel applier_channel; Replication_channel recovery_channel; Parallel_applier_options parallel_applier_options; if (instance) { // Get the current parallel-applier options parallel_applier_options = Parallel_applier_options(*instance); // Get super_read_only value of each instance to set the mode accurately. super_read_only = instance->get_sysvar_bool("super_read_only"); // Get offline_mode value of each instance to set the mode accurately. offline_mode = instance->get_sysvar_bool("offline_mode"); // Check if auto-rejoin is running. auto_rejoin = mysqlshdk::gr::is_running_gr_auto_rejoin(*instance); self_state = mysqlshdk::gr::get_member_state(*instance); minfo.version = instance->get_version().get_base(); if (m_extended.has_value()) { if (*m_extended >= 1) { fence_sysvars = instance->get_fence_sysvars(); auto workers = parallel_applier_options.replica_parallel_workers; if (parallel_applier_options.replica_parallel_workers.value_or(0) > 0) { (*member)["applierWorkerThreads"] = shcore::Value(*workers); } } if (*m_extended >= 3) { collect_local_status(member, *instance, minfo.state == Member_state::RECOVERING); } if (minfo.state == Member_state::ONLINE) collect_basic_local_status(member, *instance, minfo.role == Member_role::PRIMARY); shcore::Value recovery_info; if (minfo.state == Member_state::RECOVERING) { std::string status; // Get the join timestamp from the Metadata shcore::Value join_time; m_cluster.get_metadata_storage()->query_instance_attribute( instance->get_uuid(), k_instance_attribute_join_time, &join_time); std::tie(status, recovery_info) = recovery_status( *instance, join_time.type == shcore::String ? join_time.as_string() : ""); if (!status.empty()) { (*member)["recoveryStatusText"] = shcore::Value(status); } } // Include recovery channel info if RECOVERING or if there's an error if (mysqlshdk::mysql::get_channel_status( *instance, mysqlshdk::gr::k_gr_recovery_channel, &recovery_channel) && *m_extended > 0) { if (minfo.state == Member_state::RECOVERING || recovery_channel.status() != Replication_channel::OFF) { mysqlshdk::mysql::Replication_channel_master_info master_info; mysqlshdk::mysql::Replication_channel_relay_log_info relay_info; mysqlshdk::mysql::get_channel_info( *instance, mysqlshdk::gr::k_gr_recovery_channel, &master_info, &relay_info); if (!recovery_info) recovery_info = shcore::Value::new_map(); (*recovery_info.as_map())["recoveryChannel"] = shcore::Value( channel_status(&recovery_channel, &master_info, &relay_info, "", *m_extended - 1, true, false)); } } if (recovery_info) (*member)["recovery"] = recovery_info; // Include applier channel info ONLINE and channel not ON // or != RECOVERING and channel not OFF if (mysqlshdk::mysql::get_channel_status( *instance, mysqlshdk::gr::k_gr_applier_channel, &applier_channel) && *m_extended > 0) { if ((self_state == Member_state::ONLINE && applier_channel.status() != Replication_channel::ON) || (self_state != Member_state::RECOVERING && self_state != Member_state::ONLINE && applier_channel.status() != Replication_channel::OFF)) { mysqlshdk::mysql::Replication_channel_master_info master_info; mysqlshdk::mysql::Replication_channel_relay_log_info relay_info; mysqlshdk::mysql::get_channel_info( *instance, mysqlshdk::gr::k_gr_applier_channel, &master_info, &relay_info); (*member)["applierChannel"] = shcore::Value( channel_status(&applier_channel, &master_info, &relay_info, "", *m_extended - 1, false, false)); } } } } else { (*member)["shellConnectError"] = shcore::Value(m_member_connect_errors[inst.md.endpoint]); } feed_metadata_info(member, inst.md); feed_member_info(member, minfo, offline_mode, super_read_only, fence_sysvars, self_state, auto_rejoin); { shcore::Array_t issues = instance_diagnostics( instance.get(), &m_cluster, inst, recovery_channel, applier_channel, super_read_only, minfo, self_state, parallel_applier_options, *m_cluster_transaction_size_limit); if (offline_mode.value_or(false)) { issues->push_back( shcore::Value("WARNING: Instance has 'offline_mode' enabled.")); } else if (instance && instance->is_set_persist_supported()) { auto value = instance->get_persisted_value("offline_mode"); if (value.has_value() && shcore::str_caseeq(*value, "ON")) { issues->push_back(shcore::Value( "WARNING: Instance has 'offline_mode' enabled and persisted. In " "the event that this instance becomes a primary, Shell or other " "members will be prevented from connecting to it disrupting the " "Cluster's normal functioning.")); } } if (instance) { auto ret_val = validate_instance_recovery_user( instance, endpoints, inst.actual_server_uuid, self_state); // if the last check returned an error, the next check will too, so // there's no need to show the user two different msgs that kind of // point to the same thing (although for different reasons) if (ret_val->empty()) { auto it = mismatched_recovery_accounts.find(instance->get_id()); if (it != mismatched_recovery_accounts.end()) { auto msg = shcore::str_format( "WARNING: Incorrect recovery account (%s) being used. Use " "Cluster.rescan() to repair.", it->second.c_str()); ret_val->push_back(shcore::Value(std::move(msg))); } } issues->insert(issues->end(), std::make_move_iterator(ret_val->begin()), std::make_move_iterator(ret_val->end())); } // check if primary has unused recovery accounts if (minfo.role == Member_role::PRIMARY) { auto ret_val = validate_recovery_accounts_unused(mismatched_recovery_accounts); issues->insert(issues->end(), std::make_move_iterator(ret_val->begin()), std::make_move_iterator(ret_val->end())); } if (issues && !issues->empty()) { (*member)["instanceErrors"] = shcore::Value(std::move(issues)); } } if ((m_extended.value_or(0) >= 2) && member_stats.find(inst.md.uuid) != member_stats.end()) { shcore::Dictionary_t mdict = member; auto dict_for = [mdict](const std::string &key) { if (!mdict->has_key(key)) { (*mdict)[key] = shcore::Value(shcore::make_dict()); } return mdict->get_map(key); }; if (member_stats[inst.md.uuid].first) { feed_member_stats(dict_for("recovery"), member_stats[inst.md.uuid].first); } if (member_stats[inst.md.uuid].second) { feed_member_stats(dict_for("transactions"), member_stats[inst.md.uuid].second); } } (*dict)[inst.md.label] = shcore::Value(member); } return dict; } shcore::Dictionary_t Status::collect_replicaset_status() { shcore::Dictionary_t tmp = shcore::make_dict(); shcore::Dictionary_t ret = shcore::make_dict(); auto group_instance = m_cluster.get_cluster_server(); bool gr_running = m_cluster.cluster_availability() != Cluster_availability::OFFLINE && m_cluster.cluster_availability() != Cluster_availability::SOME_UNREACHABLE && m_cluster.cluster_availability() != Cluster_availability::UNREACHABLE; std::string topology_mode = mysqlshdk::gr::to_string(m_cluster.get_cluster_topology_type()); // Set Cluster name (*ret)["name"] = shcore::Value("default"); (*ret)["topologyMode"] = shcore::Value(topology_mode); mysqlshdk::utils::Version protocol_version; if (group_instance && gr_running) { try { protocol_version = mysqlshdk::gr::get_group_protocol_version(*group_instance); } catch (const shcore::Exception &e) { if (e.code() == ER_CANT_INITIALIZE_UDF) log_warning("Can't get protocol version from %s: %s", group_instance->descr().c_str(), e.format().c_str()); else throw; } } if (m_extended.value_or(0) >= 1) { (*ret)["groupName"] = shcore::Value(m_cluster.get_group_name()); if (group_instance && group_instance->get_version() >= mysqlshdk::utils::Version(8, 0, 26)) { (*ret)["groupViewChangeUuid"] = shcore::Value(group_instance->get_sysvar_string( "group_replication_view_change_uuid", "")); } (*ret)["GRProtocolVersion"] = protocol_version ? shcore::Value(protocol_version.get_full()) : shcore::Value::Null(); // Add the communicationStack // // Always add the value of the communicationStack regardless of the version // in use, because: // // - Even before the 'MySQL' communication stack was introduced, 'XCOM' // was the name of the communication stack used by GR // - It slightly educates users that there's a possibility of having a // different communication stack. if (group_instance) { (*ret)[kCommunicationStack] = shcore::Value(get_communication_stack(*group_instance)); } // Add the value of paxosSingleLeader, when supported if (group_instance && supports_paxos_single_leader(group_instance->get_version())) { auto value = get_paxos_single_leader_enabled(*group_instance); if (value.has_value()) { std::string_view paxos_single_leader = *value == true ? "ON" : "OFF"; (*ret)[kPaxosSingleLeader] = shcore::Value(paxos_single_leader); } } } { auto ssl_mode = group_instance ? group_instance->get_sysvar_string( "group_replication_ssl_mode", "") : ""; if (!ssl_mode.empty()) { (*ret)["ssl"] = shcore::Value(ssl_mode); } } bool single_primary = true; std::vector<mysqlshdk::gr::Member> member_info; if (group_instance && gr_running) { bool has_quorum = true; std::string view_id; member_info = mysqlshdk::gr::get_members(*group_instance, &single_primary, &has_quorum, &view_id); tmp = check_group_status(*group_instance, member_info, has_quorum); (*ret)["statusText"] = shcore::Value(tmp->get_string("statusText")); (*ret)["status"] = shcore::Value(tmp->get_string("status")); if (m_extended.value_or(0) >= 1) (*ret)["groupViewId"] = shcore::Value(view_id); std::shared_ptr<Instance> primary_instance; { if (single_primary) { // In single primary mode we need to add the "primary" field (*ret)["primary"] = shcore::Value("?"); for (const auto &member : member_info) { if (member.role != mysqlshdk::gr::Member_role::PRIMARY) continue; const Instance_metadata *primary = instance_with_uuid(member.uuid); if (primary) { if (auto s = m_member_sessions.find(primary->endpoint); s != m_member_sessions.end()) { primary_instance = s->second; } (*ret)["primary"] = shcore::Value(primary->endpoint); } break; } } } } else { if (m_cluster.cluster_availability() == Cluster_availability::OFFLINE) { (*ret)["status"] = shcore::Value("OFFLINE"); (*ret)["statusText"] = shcore::Value("All members of the group are OFFLINE"); } else { (*ret)["status"] = shcore::Value("UNREACHABLE"); (*ret)["statusText"] = shcore::Value("Could not connect to any ONLINE members"); } } auto issues = cluster_diagnostics(m_cluster, member_info, protocol_version); // Get the Cluster's transaction size_limit stored in the Metadata if the // Cluster is standalone or a Primary. Otherwise, it's a Replica so it // should be the value of the primary member shcore::Value value; bool is_replica_cluster = m_cluster.is_cluster_set_member() && !m_cluster.is_primary_cluster(); if (group_instance) { m_cluster_transaction_size_limit = m_cluster.get_cluster_server() ->get_sysvar_int(kGrTransactionSizeLimit) .value_or(0); } if (!is_replica_cluster) { if (!m_cluster.get_metadata_storage()->query_cluster_attribute( m_cluster.get_id(), k_cluster_attribute_transaction_size_limit, &value)) { issues->push_back( shcore::Value("WARNING: Cluster's transaction size limit is not " "registered in the metadata. Use " "cluster.rescan() to update the metadata.")); } else { m_cluster_transaction_size_limit = value.as_int(); } } // If the cluster is operating in multi-primary mode and paxosSingleLeader // is enabled, add a NOTE informing the Cluster won't use a single // consensus leader in that case bool is_active_multi_primary = group_instance && gr_running && !single_primary; if (is_active_multi_primary && supports_paxos_single_leader(group_instance->get_version()) && get_paxos_single_leader_enabled(*group_instance).value_or(false)) { issues->push_back( shcore::Value("NOTE: The Cluster is configured to use a Single " "Consensus Leader, however, this setting is ineffective " "since the Cluster is operating in multi-primary mode.")); } if (issues && !issues->empty()) (*ret)["clusterErrors"] = shcore::Value(issues); (*ret)["topology"] = shcore::Value(get_topology(member_info)); return ret; } shcore::Array_t Status::validate_recovery_accounts_unused( const std::unordered_map<uint32_t, std::string> &mismatched_recovery_accounts) const { auto issues = shcore::make_array(); auto accounts = m_cluster.get_unused_recovery_accounts(mismatched_recovery_accounts); if (accounts.empty()) return issues; std::string msg{accounts.size() == 1 ? "WARNING: Detected an unused recovery account: " : "WARNING: Detected unused recovery accounts: "}; for (const auto &account : accounts) msg.append(std::get<0>(account)).append(", "); msg.erase(msg.size() - 2); // remove last ", " msg.append(". Use Cluster.rescan() to clean up."); issues->push_back(shcore::Value(std::move(msg))); return issues; } void Status::prepare() { // Sanity check: Verify if the topology type changed and issue an error if // needed. if (m_cluster.get_cluster_server()) m_cluster.sanity_check(); m_instances = m_cluster.get_instances(); // Always connect to members to be able to get an accurate mode, based on // their super_ready_only value. connect_to_members(); } shcore::Value Status::get_default_replicaset_status() { shcore::Dictionary_t dict = shcore::make_dict(); // Get the Cluster status shcore::Dictionary_t replicaset_dict; replicaset_dict = collect_replicaset_status(); // TODO(alfredo) - this needs to be reviewed, probably not necessary // Check if the Cluster group session is established to an instance with // a state different than // - Online R/W // - Online R/O // // Possibly with the state: // // - RECOVERING // - OFFLINE // - ERROR // // If that's the case, a warning must be added to the resulting JSON object auto group_instance = m_cluster.get_cluster_server(); if (group_instance) { auto state = get_replication_group_state( *group_instance, get_gr_instance_type(*group_instance)); bool show_warning = (state.source_state != ManagedInstance::OnlineRW && state.source_state != ManagedInstance::OnlineRO); if (show_warning) { std::string warning = "The cluster description may be inaccurate as it was generated from " "an instance in "; warning.append(ManagedInstance::describe( static_cast<ManagedInstance::State>(state.source_state))); warning.append(" state"); (*replicaset_dict)["warning"] = shcore::Value(std::move(warning)); } } return shcore::Value(replicaset_dict); } shcore::Value Status::execute() { shcore::Dictionary_t dict = shcore::make_dict(); (*dict)["clusterName"] = shcore::Value(m_cluster.get_name()); // If the Cluster belongs to a ClusterSet, include relevant ClusterSet // information: // - domainName // - clusterRole // - clusterSetReplicationStatus if (m_cluster.is_cluster_set_member()) { auto cs_md = m_cluster.get_clusterset_metadata(); Cluster_set_metadata cset; m_cluster.get_metadata_storage()->get_cluster_set(cs_md.cluster_set_id, true, &cset, nullptr); (*dict)["domainName"] = shcore::Value(cset.domain_name); (*dict)["clusterRole"] = shcore::Value(m_cluster.is_primary_cluster() ? "PRIMARY" : "REPLICA"); // Get the ClusterSet replication channel status Cluster_channel_status ch_status = m_cluster.get_cluster_set_object()->get_replication_channel_status( m_cluster); // If the Cluster is a Replica, add the status right away if (!m_cluster.is_primary_cluster()) { (*dict)["clusterSetReplicationStatus"] = shcore::Value(to_string(ch_status)); } else { // If it's a Primary, add the status in case is suspicious (not Missing or // Stopped) if (ch_status != Cluster_channel_status::MISSING && ch_status != Cluster_channel_status::STOPPED) { (*dict)["clusterSetReplicationStatus"] = shcore::Value(to_string(ch_status)); } } } // Get the default replicaSet options (*dict)["defaultReplicaSet"] = shcore::Value(get_default_replicaset_status()); if (m_cluster.get_cluster_server()) { // Gets the metadata version if (m_extended.value_or(0) >= 1) { auto version = mysqlsh::dba::metadata::installed_version( m_cluster.get_cluster_server()); (*dict)["metadataVersion"] = shcore::Value(version.get_base()); } // Iterate all replicasets and get the status for each one std::string addr = m_cluster.get_cluster_server()->get_canonical_address(); (*dict)["groupInformationSourceMember"] = shcore::Value(addr); } auto md_server = m_cluster.get_metadata_storage()->get_md_server(); // metadata server, if its a different one if (md_server && (!m_cluster.get_cluster_server() || md_server->get_uuid() != m_cluster.get_cluster_server()->get_uuid())) { (*dict)["metadataServer"] = shcore::Value(md_server->get_canonical_address()); } return shcore::Value(dict); } void Status::rollback() { // Do nothing right now, but it might be used in the future when // transactional command execution feature will be available. } void Status::finish() {} } // namespace cluster } // namespace dba } // namespace mysqlsh