std::string ResourceManager::modify_nodes()

in cloud/src/resource-manager/resource_manager.cpp [854:1216]


std::string ResourceManager::modify_nodes(const std::string& instance_id,
                                          const std::vector<NodeInfo>& to_add,
                                          const std::vector<NodeInfo>& to_del) {
    std::string msg;
    std::stringstream ss;
    std::unique_ptr<int, std::function<void(int*)>> defer(
            (int*)0x01, [&msg](int*) { LOG(INFO) << "modify_nodes err=" << msg; });

    if ((to_add.size() && to_del.size()) || (!to_add.size() && !to_del.size())) {
        msg = "to_add and to_del both empty or both not empty";
        LOG(WARNING) << msg;
        return msg;
    }

    std::unique_ptr<Transaction> txn0;
    TxnErrorCode err = txn_kv_->create_txn(&txn0);
    if (err != TxnErrorCode::TXN_OK) {
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << msg;
        return msg;
    }

    std::shared_ptr<Transaction> txn(txn0.release());
    InstanceInfoPB instance;
    auto [c0, m0] = get_instance(txn, instance_id, &instance);
    TEST_SYNC_POINT_CALLBACK("modify_nodes:get_instance", &c0, &instance);
    if (c0 != TxnErrorCode::TXN_OK) {
        msg = m0;
        return msg;
    }

    if (instance.status() == InstanceInfoPB::DELETED) {
        msg = "instance status has been set delete, plz check it";
        LOG(WARNING) << msg;
        return msg;
    }

    LOG(INFO) << "instance json=" << proto_to_json(instance);
    if (!to_add.empty()) {
        // add nodes
        // Check duplicated nodes, one node cannot deploy on multiple clusters
        // diff instance's nodes and to_add nodes
        const auto& [fe_endpoints_registered, be_endpoints_registered] =
                get_nodes_endpoint_registered(instance);
        for (auto& add_node : to_add) {
            const ClusterPB::Type type =
                    add_node.role == Role::SQL_SERVER ? ClusterPB::SQL : ClusterPB::COMPUTE;
            const auto& [c1, m1] = check_node_has_been_registered(
                    type, add_node.node_info, fe_endpoints_registered, be_endpoints_registered);
            if (c1 != MetaServiceCode::OK) {
                return m1;
            }
        }
    }
    // a vector save (origin_cluster , changed_cluster), to update ms mem
    std::vector<std::pair<ClusterPB, ClusterPB>> change_from_to_clusters;
    using modify_impl_func = std::function<std::string(const ClusterPB& c, const NodeInfo& n)>;
    using check_func = std::function<std::string(const NodeInfo& n)>;
    auto modify_func = [&](const NodeInfo& node, check_func check,
                           modify_impl_func action) -> std::string {
        std::string cluster_id = node.cluster_id;
        std::string cluster_name = node.cluster_name;

        {
            std::shared_lock l(mtx_);
            msg = check(node);
            if (msg != "") {
                return msg;
            }
        }

        LOG(INFO) << "node to modify json=" << proto_to_json(node.node_info);

        for (auto& c : instance.clusters()) {
            if ((c.has_cluster_name() && c.cluster_name() == cluster_name) ||
                (c.has_cluster_id() && c.cluster_id() == cluster_id)) {
                msg = action(c, node);
                if (msg != "") {
                    return msg;
                }
            }
        }
        return "";
    };

    // check in ms mem cache
    check_func check_to_add = [&](const NodeInfo& n) -> std::string {
        std::string err;
        std::stringstream s;
        auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id());
        if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) {
            return "";
        }
        for (auto it = start; it != end; ++it) {
            if (it->second.instance_id != n.instance_id) {
                // different instance, but has same cloud_unique_id
                s << "cloud_unique_id is already occupied by an instance,"
                  << " instance_id=" << it->second.instance_id
                  << " cluster_name=" << it->second.cluster_name
                  << " cluster_id=" << it->second.cluster_id
                  << " cloud_unique_id=" << n.node_info.cloud_unique_id();
                err = s.str();
                LOG(INFO) << err;
                return err;
            }
        }
        return "";
    };

    // modify kv
    modify_impl_func modify_to_add = [&](const ClusterPB& c, const NodeInfo& n) -> std::string {
        std::string err;
        std::stringstream s;
        ClusterPB copied_original_cluster;
        ClusterPB copied_cluster;
        bool is_compute_node = n.node_info.has_heartbeat_port();
        for (auto it = c.nodes().begin(); it != c.nodes().end(); ++it) {
            if (it->has_ip() && n.node_info.has_ip()) {
                std::string c_endpoint = it->ip() + ":" +
                                         (is_compute_node ? std::to_string(it->heartbeat_port())
                                                          : std::to_string(it->edit_log_port()));
                std::string n_endpoint =
                        n.node_info.ip() + ":" +
                        (is_compute_node ? std::to_string(n.node_info.heartbeat_port())
                                         : std::to_string(n.node_info.edit_log_port()));
                if (c_endpoint == n_endpoint) {
                    // replicate request, do nothing
                    return "";
                }
            }

            if (it->has_host() && n.node_info.has_host()) {
                std::string c_endpoint_host =
                        it->host() + ":" +
                        (is_compute_node ? std::to_string(it->heartbeat_port())
                                         : std::to_string(it->edit_log_port()));
                std::string n_endpoint_host =
                        n.node_info.host() + ":" +
                        (is_compute_node ? std::to_string(n.node_info.heartbeat_port())
                                         : std::to_string(n.node_info.edit_log_port()));
                if (c_endpoint_host == n_endpoint_host) {
                    // replicate request, do nothing
                    return "";
                }
            }
        }

        // add ctime and mtime
        auto& node = const_cast<std::decay_t<decltype(n.node_info)>&>(n.node_info);
        auto now_time = std::chrono::system_clock::now();
        uint64_t time =
                std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch())
                        .count();
        if (!node.has_ctime()) {
            node.set_ctime(time);
        }
        node.set_mtime(time);
        copied_original_cluster.CopyFrom(c);
        auto& change_cluster = const_cast<std::decay_t<decltype(c)>&>(c);
        change_cluster.add_nodes()->CopyFrom(node);
        copied_cluster.CopyFrom(change_cluster);
        change_from_to_clusters.emplace_back(std::move(copied_original_cluster),
                                             std::move(copied_cluster));
        return "";
    };

    for (auto& it : to_add) {
        msg = modify_func(it, check_to_add, modify_to_add);
        if (msg != "") {
            LOG(WARNING) << msg;
            return msg;
        }
    }

    std::string cache_err_help_msg =
            "Ms nodes memory cache may be inconsistent, pls check registry key may be contain "
            "127.0.0.1, and call get_instance api get instance info from fdb";

    // check in ms mem cache
    check_func check_to_del = [&](const NodeInfo& n) -> std::string {
        std::string err;
        std::stringstream s;
        auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id());
        if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) {
            s << "can not find to drop nodes by cloud_unique_id=" << n.node_info.cloud_unique_id()
              << " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name
              << " cluster_id=" << n.cluster_id << " help Msg=" << cache_err_help_msg;
            err = s.str();
            LOG(WARNING) << err;
            return std::string("not found ,") + err;
        }

        bool found = false;
        for (auto it = start; it != end; ++it) {
            const auto& m_node = it->second.node_info;
            if (m_node.has_ip() && n.node_info.has_ip()) {
                std::string m_endpoint =
                        m_node.ip() + ":" +
                        (m_node.has_heartbeat_port() ? std::to_string(m_node.heartbeat_port())
                                                     : std::to_string(m_node.edit_log_port()));

                std::string n_endpoint = n.node_info.ip() + ":" +
                                         (n.node_info.has_heartbeat_port()
                                                  ? std::to_string(n.node_info.heartbeat_port())
                                                  : std::to_string(n.node_info.edit_log_port()));

                if (m_endpoint == n_endpoint) {
                    found = true;
                    break;
                }
            }

            if (m_node.has_host() && n.node_info.has_host()) {
                std::string m_endpoint_host =
                        m_node.host() + ":" +
                        (m_node.has_heartbeat_port() ? std::to_string(m_node.heartbeat_port())
                                                     : std::to_string(m_node.edit_log_port()));

                std::string n_endpoint_host =
                        n.node_info.host() + ":" +
                        (n.node_info.has_heartbeat_port()
                                 ? std::to_string(n.node_info.heartbeat_port())
                                 : std::to_string(n.node_info.edit_log_port()));

                if (m_endpoint_host == n_endpoint_host) {
                    found = true;
                    break;
                }
            }
        }
        if (!found) {
            s << "cloud_unique_id can not find to drop node,"
              << " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name
              << " cluster_id=" << n.cluster_id << " node_info=" << n.node_info.DebugString()
              << " help Msg=" << cache_err_help_msg;
            err = s.str();
            LOG(WARNING) << err;
            return std::string("not found ,") + err;
        }
        return "";
    };

    // modify kv
    modify_impl_func modify_to_del = [&](const ClusterPB& c, const NodeInfo& n) -> std::string {
        std::string err;
        std::stringstream s;
        ClusterPB copied_original_cluster;
        ClusterPB copied_cluster;

        bool found = false;
        int idx = -1;
        // ni: to drop node
        const auto& ni = n.node_info;
        // c.nodes: cluster registered nodes
        NodeInfoPB copy_node;
        for (auto& cn : c.nodes()) {
            idx++;
            if (cn.has_ip() && ni.has_ip()) {
                std::string cn_endpoint =
                        cn.ip() + ":" +
                        (cn.has_heartbeat_port() ? std::to_string(cn.heartbeat_port())
                                                 : std::to_string(cn.edit_log_port()));

                std::string ni_endpoint =
                        ni.ip() + ":" +
                        (ni.has_heartbeat_port() ? std::to_string(ni.heartbeat_port())
                                                 : std::to_string(ni.edit_log_port()));

                if (ni.cloud_unique_id() == cn.cloud_unique_id() && cn_endpoint == ni_endpoint) {
                    copy_node.CopyFrom(cn);
                    found = true;
                    break;
                }
            }

            if (cn.has_host() && ni.has_host()) {
                std::string cn_endpoint_host =
                        cn.host() + ":" +
                        (cn.has_heartbeat_port() ? std::to_string(cn.heartbeat_port())
                                                 : std::to_string(cn.edit_log_port()));

                std::string ni_endpoint_host =
                        ni.host() + ":" +
                        (ni.has_heartbeat_port() ? std::to_string(ni.heartbeat_port())
                                                 : std::to_string(ni.edit_log_port()));

                if (ni.cloud_unique_id() == cn.cloud_unique_id() &&
                    cn_endpoint_host == ni_endpoint_host) {
                    copy_node.CopyFrom(cn);
                    found = true;
                    break;
                }
            }
        }

        if (!found) {
            s << "failed to find node to drop,"
              << " instance_id=" << instance.instance_id() << " cluster_id=" << c.cluster_id()
              << " cluster_name=" << c.cluster_name() << " cluster=" << proto_to_json(c)
              << " help Msg =" << cache_err_help_msg;
            err = s.str();
            LOG(WARNING) << err;
            return std::string("not found ,") + err;
        }

        // check drop fe node
        if (ClusterPB::SQL == c.type() && !is_sql_node_exceeded_safe_drop_time(copy_node)) {
            s << "drop fe node not in safe time, try later, node=" << copy_node.DebugString();
            err = s.str();
            LOG(WARNING) << err;
            return err;
        }
        copied_original_cluster.CopyFrom(c);
        auto& change_nodes = const_cast<std::decay_t<decltype(c.nodes())>&>(c.nodes());
        change_nodes.DeleteSubrange(idx, 1); // Remove it
        copied_cluster.CopyFrom(c);
        change_from_to_clusters.emplace_back(std::move(copied_original_cluster),
                                             std::move(copied_cluster));
        return "";
    };

    for (auto& it : to_del) {
        msg = modify_func(it, check_to_del, modify_to_del);
        if (msg != "") {
            LOG(WARNING) << msg;
            return msg;
        }
    }

    LOG(INFO) << "instance " << instance_id << " info: " << instance.DebugString();
    // here, instance has been changed, not save in fdb
    if ((!to_add.empty() || !to_del.empty()) && !is_instance_valid(instance)) {
        msg = "instance invalid, cant modify, plz check";
        LOG(WARNING) << msg;
        return msg;
    }

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        return msg;
    }

    txn->put(key, val);
    LOG(INFO) << "put instance_key=" << hex(key);
    TxnErrorCode err_code = txn->commit();
    if (err_code != TxnErrorCode::TXN_OK) {
        msg = "failed to commit kv txn";
        LOG(WARNING) << msg << " err=" << err_code;
        return msg;
    }

    for (auto& it : change_from_to_clusters) {
        update_cluster_to_index(instance_id, it.first, it.second);
    }

    return "";
}