shcore::Dictionary_t Status::get_topology()

in modules/adminapi/cluster/status.cc [1352:1679]


shcore::Dictionary_t Status::get_topology(
    const std::vector<mysqlshdk::gr::Member> &member_info) {
  using mysqlshdk::gr::Member_role;
  using mysqlshdk::gr::Member_state;
  using mysqlshdk::mysql::Replication_channel;

  Member_stats_map member_stats = query_member_stats();

  shcore::Dictionary_t dict = shcore::make_dict();

  auto get_member = [&member_info](const std::string &uuid) {
    for (const auto &m : member_info) {
      if (m.uuid == uuid) return m;
    }
    return mysqlshdk::gr::Member();
  };

  std::vector<Instance_metadata_info> instances;

  // add placeholders for unmanaged members
  for (const auto &m : member_info) {
    bool found = false;
    for (const auto &i : m_instances) {
      if (i.uuid == m.uuid) {
        Instance_metadata_info mdi;
        mdi.md = i;
        mdi.actual_server_uuid = m.uuid;
        instances.emplace_back(std::move(mdi));
        found = true;
        break;
      }
    }
    if (!found) {
      // if instance in MD was not found by uuid, then search by address
      for (const auto &i : m_instances) {
        if (!mysqlshdk::utils::are_endpoints_equal(
                i.address,
                mysqlshdk::utils::make_host_and_port(m.host, m.port)))
          continue;

        log_debug(
            "Instance with address=%s is in group, but UUID doesn't match "
            "group=%s MD=%s",
            i.endpoint.c_str(), m.uuid.c_str(), i.uuid.c_str());

        Instance_metadata_info mdi;
        mdi.md = i;
        mdi.actual_server_uuid = m.uuid;
        instances.emplace_back(std::move(mdi));
        found = true;
        break;
      }
    }
    if (!found) {
      Instance_metadata_info mdi;
      mdi.md.address = mysqlshdk::utils::make_host_and_port(m.host, m.port);
      mdi.md.label = mdi.md.address;
      mdi.md.uuid = m.uuid;
      mdi.md.endpoint = mdi.md.address;
      mdi.actual_server_uuid = mdi.md.uuid;

      log_debug("Instance %s with uuid=%s found in group but not in MD",
                mdi.md.address.c_str(), m.uuid.c_str());

      auto group_instance = m_cluster.get_cluster_server();

      mysqlshdk::db::Connection_options opts(mdi.md.endpoint);
      mysqlshdk::db::Connection_options group_session_copts(
          group_instance->get_connection_options());
      opts.set_login_options_from(group_session_copts);
      try {
        m_member_sessions[mdi.md.endpoint] = Instance::connect(opts);
      } catch (const shcore::Error &e) {
        m_member_connect_errors[mdi.md.endpoint] = e.format();
      }

      instances.emplace_back(std::move(mdi));
    }
  }
  // look for instances in MD but not in group
  for (const auto &i : m_instances) {
    bool found = false;
    for (const auto &m : member_info) {
      if (m.uuid == i.uuid ||
          mysqlshdk::utils::are_endpoints_equal(
              i.endpoint,
              mysqlshdk::utils::make_host_and_port(m.host, m.port))) {
        found = true;
        break;
      }
    }
    if (!found) {
      log_debug("Instance with uuid=%s address=%s is in MD but not in group",
                i.uuid.c_str(), i.endpoint.c_str());

      auto &instance = m_member_sessions[i.endpoint];

      Instance_metadata_info mdi;
      mdi.md = i;
      if (instance) {
        mdi.actual_server_uuid = instance->get_uuid();
      } else {
        // fallback to assuming the server_uuid is OK if we can't connect to
        // the instance
        mdi.actual_server_uuid = i.uuid;
      }
      instances.emplace_back(std::move(mdi));
    }
  }

  std::map<std::string, std::string> endpoints;
  if (m_cluster.get_metadata_storage()->real_version().get_major() > 1) {
    endpoints =
        m_cluster.get_metadata_storage()->get_instances_with_recovery_accounts(
            m_cluster.get_id());
  }

  auto mismatched_recovery_accounts =
      m_cluster.get_mismatched_recovery_accounts();

  for (const auto &inst : instances) {
    shcore::Dictionary_t member = shcore::make_dict();
    mysqlshdk::gr::Member minfo(get_member(inst.actual_server_uuid));
    mysqlshdk::gr::Member_state self_state =
        mysqlshdk::gr::Member_state::MISSING;

    auto &instance = m_member_sessions[inst.md.endpoint];

    std::optional<bool> super_read_only;
    std::optional<bool> offline_mode;
    std::vector<std::string> fence_sysvars;
    bool auto_rejoin = false;

    Replication_channel applier_channel;
    Replication_channel recovery_channel;

    Parallel_applier_options parallel_applier_options;

    if (instance) {
      // Get the current parallel-applier options
      parallel_applier_options = Parallel_applier_options(*instance);

      // Get super_read_only value of each instance to set the mode accurately.
      super_read_only = instance->get_sysvar_bool("super_read_only");

      // Get offline_mode value of each instance to set the mode accurately.
      offline_mode = instance->get_sysvar_bool("offline_mode");

      // Check if auto-rejoin is running.
      auto_rejoin = mysqlshdk::gr::is_running_gr_auto_rejoin(*instance);

      self_state = mysqlshdk::gr::get_member_state(*instance);

      minfo.version = instance->get_version().get_base();

      if (m_extended.has_value()) {
        if (*m_extended >= 1) {
          fence_sysvars = instance->get_fence_sysvars();

          auto workers = parallel_applier_options.replica_parallel_workers;

          if (parallel_applier_options.replica_parallel_workers.value_or(0) >
              0) {
            (*member)["applierWorkerThreads"] = shcore::Value(*workers);
          }
        }

        if (*m_extended >= 3) {
          collect_local_status(member, *instance,
                               minfo.state == Member_state::RECOVERING);
        }
        if (minfo.state == Member_state::ONLINE)
          collect_basic_local_status(member, *instance,
                                     minfo.role == Member_role::PRIMARY);

        shcore::Value recovery_info;
        if (minfo.state == Member_state::RECOVERING) {
          std::string status;
          // Get the join timestamp from the Metadata
          shcore::Value join_time;
          m_cluster.get_metadata_storage()->query_instance_attribute(
              instance->get_uuid(), k_instance_attribute_join_time, &join_time);

          std::tie(status, recovery_info) = recovery_status(
              *instance,
              join_time.type == shcore::String ? join_time.as_string() : "");
          if (!status.empty()) {
            (*member)["recoveryStatusText"] = shcore::Value(status);
          }
        }

        // Include recovery channel info if RECOVERING or if there's an error
        if (mysqlshdk::mysql::get_channel_status(
                *instance, mysqlshdk::gr::k_gr_recovery_channel,
                &recovery_channel) &&
            *m_extended > 0) {
          if (minfo.state == Member_state::RECOVERING ||
              recovery_channel.status() != Replication_channel::OFF) {
            mysqlshdk::mysql::Replication_channel_master_info master_info;
            mysqlshdk::mysql::Replication_channel_relay_log_info relay_info;

            mysqlshdk::mysql::get_channel_info(
                *instance, mysqlshdk::gr::k_gr_recovery_channel, &master_info,
                &relay_info);

            if (!recovery_info) recovery_info = shcore::Value::new_map();

            (*recovery_info.as_map())["recoveryChannel"] = shcore::Value(
                channel_status(&recovery_channel, &master_info, &relay_info, "",
                               *m_extended - 1, true, false));
          }
        }
        if (recovery_info) (*member)["recovery"] = recovery_info;

        // Include applier channel info ONLINE and channel not ON
        // or != RECOVERING and channel not OFF
        if (mysqlshdk::mysql::get_channel_status(
                *instance, mysqlshdk::gr::k_gr_applier_channel,
                &applier_channel) &&
            *m_extended > 0) {
          if ((self_state == Member_state::ONLINE &&
               applier_channel.status() != Replication_channel::ON) ||
              (self_state != Member_state::RECOVERING &&
               self_state != Member_state::ONLINE &&
               applier_channel.status() != Replication_channel::OFF)) {
            mysqlshdk::mysql::Replication_channel_master_info master_info;
            mysqlshdk::mysql::Replication_channel_relay_log_info relay_info;

            mysqlshdk::mysql::get_channel_info(
                *instance, mysqlshdk::gr::k_gr_applier_channel, &master_info,
                &relay_info);

            (*member)["applierChannel"] = shcore::Value(
                channel_status(&applier_channel, &master_info, &relay_info, "",
                               *m_extended - 1, false, false));
          }
        }
      }

    } else {
      (*member)["shellConnectError"] =
          shcore::Value(m_member_connect_errors[inst.md.endpoint]);
    }
    feed_metadata_info(member, inst.md);
    feed_member_info(member, minfo, offline_mode, super_read_only,
                     fence_sysvars, self_state, auto_rejoin);

    {
      shcore::Array_t issues = instance_diagnostics(
          instance.get(), &m_cluster, inst, recovery_channel, applier_channel,
          super_read_only, minfo, self_state, parallel_applier_options,
          *m_cluster_transaction_size_limit);

      if (offline_mode.value_or(false)) {
        issues->push_back(
            shcore::Value("WARNING: Instance has 'offline_mode' enabled."));
      } else if (instance && instance->is_set_persist_supported()) {
        auto value = instance->get_persisted_value("offline_mode");
        if (value.has_value() && shcore::str_caseeq(*value, "ON")) {
          issues->push_back(shcore::Value(
              "WARNING: Instance has 'offline_mode' enabled and persisted. In "
              "the event that this instance becomes a primary, Shell or other "
              "members will be prevented from connecting to it disrupting the "
              "Cluster's normal functioning."));
        }
      }

      if (instance) {
        auto ret_val = validate_instance_recovery_user(
            instance, endpoints, inst.actual_server_uuid, self_state);

        // if the last check returned an error, the next check will too, so
        // there's no need to show the user two different msgs that kind of
        // point to the same thing (although for different reasons)
        if (ret_val->empty()) {
          auto it = mismatched_recovery_accounts.find(instance->get_id());
          if (it != mismatched_recovery_accounts.end()) {
            auto msg = shcore::str_format(
                "WARNING: Incorrect recovery account (%s) being used. Use "
                "Cluster.rescan() to repair.",
                it->second.c_str());
            ret_val->push_back(shcore::Value(std::move(msg)));
          }
        }

        issues->insert(issues->end(), std::make_move_iterator(ret_val->begin()),
                       std::make_move_iterator(ret_val->end()));
      }

      // check if primary has unused recovery accounts
      if (minfo.role == Member_role::PRIMARY) {
        auto ret_val =
            validate_recovery_accounts_unused(mismatched_recovery_accounts);
        issues->insert(issues->end(), std::make_move_iterator(ret_val->begin()),
                       std::make_move_iterator(ret_val->end()));
      }

      if (issues && !issues->empty()) {
        (*member)["instanceErrors"] = shcore::Value(std::move(issues));
      }
    }

    if ((m_extended.value_or(0) >= 2) &&
        member_stats.find(inst.md.uuid) != member_stats.end()) {
      shcore::Dictionary_t mdict = member;

      auto dict_for = [mdict](const std::string &key) {
        if (!mdict->has_key(key)) {
          (*mdict)[key] = shcore::Value(shcore::make_dict());
        }
        return mdict->get_map(key);
      };

      if (member_stats[inst.md.uuid].first) {
        feed_member_stats(dict_for("recovery"),
                          member_stats[inst.md.uuid].first);
      }
      if (member_stats[inst.md.uuid].second) {
        feed_member_stats(dict_for("transactions"),
                          member_stats[inst.md.uuid].second);
      }
    }

    (*dict)[inst.md.label] = shcore::Value(member);
  }

  return dict;
}