void MetaServiceImpl::alter_cluster()

in cloud/src/meta-service/meta_service_resource.cpp [2025:2387]


void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
                                    const AlterClusterRequest* request,
                                    AlterClusterResponse* response,
                                    ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(alter_cluster);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = request->has_instance_id() ? request->instance_id() : "";
    if (!cloud_unique_id.empty() && instance_id.empty()) {
        auto [is_degraded_format, id] =
                ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
        if (config::enable_check_instance_id && is_degraded_format &&
            !resource_mgr_->is_instance_id_registered(id)) {
            msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" +
                  cloud_unique_id;
            LOG(WARNING) << msg;
            code = MetaServiceCode::INVALID_ARGUMENT;
            return;
        }
        instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
        if (instance_id.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "empty instance_id";
            LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
            return;
        }
    }

    if (instance_id.empty() || !request->has_cluster()) {
        msg = "invalid request instance_id or cluster not given";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    if (!request->has_op()) {
        msg = "op not given";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    LOG(INFO) << "alter cluster instance_id=" << instance_id << " op=" << request->op();
    ClusterInfo cluster;
    cluster.cluster.CopyFrom(request->cluster());

    switch (request->op()) {
    case AlterClusterRequest::ADD_CLUSTER: {
        auto r = resource_mgr_->add_cluster(instance_id, cluster);
        code = r.first;
        msg = r.second;
    } break;
    case AlterClusterRequest::DROP_CLUSTER: {
        auto r = resource_mgr_->drop_cluster(instance_id, cluster);
        code = r.first;
        msg = r.second;
    } break;
    case AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME: {
        msg = resource_mgr_->update_cluster(
                instance_id, cluster,
                [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
                [&](ClusterPB& c, std::set<std::string>&) {
                    auto& mysql_user_names = cluster.cluster.mysql_user_name();
                    c.mutable_mysql_user_name()->CopyFrom(mysql_user_names);
                    return "";
                });
    } break;
    case AlterClusterRequest::ADD_NODE: {
        resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
        if (msg != "") {
            LOG(WARNING) << msg;
            break;
        }
        std::vector<NodeInfo> to_add;
        std::vector<NodeInfo> to_del;
        for (auto& n : request->cluster().nodes()) {
            NodeInfo node;
            node.instance_id = request->instance_id();
            node.node_info = n;
            node.cluster_id = request->cluster().cluster_id();
            node.cluster_name = request->cluster().cluster_name();
            node.role =
                    (request->cluster().type() == ClusterPB::SQL
                             ? Role::SQL_SERVER
                             : (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE
                                                                                : Role::UNDEFINED));
            node.node_info.set_status(NodeStatusPB::NODE_STATUS_RUNNING);
            to_add.emplace_back(std::move(node));
        }
        msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
    } break;
    case AlterClusterRequest::DROP_NODE: {
        resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
        if (msg != "") {
            LOG(WARNING) << msg;
            break;
        }
        std::vector<NodeInfo> to_add;
        std::vector<NodeInfo> to_del;
        for (auto& n : request->cluster().nodes()) {
            NodeInfo node;
            node.instance_id = request->instance_id();
            node.node_info = n;
            node.cluster_id = request->cluster().cluster_id();
            node.cluster_name = request->cluster().cluster_name();
            node.role =
                    (request->cluster().type() == ClusterPB::SQL
                             ? Role::SQL_SERVER
                             : (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE
                                                                                : Role::UNDEFINED));
            to_del.emplace_back(std::move(node));
        }
        msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
    } break;
    case AlterClusterRequest::DECOMMISSION_NODE: {
        resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
        if (msg != "") {
            LOG(WARNING) << msg;
            break;
        }

        std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id();
        std::vector<NodeInfo> nodes;
        std::string err = resource_mgr_->get_node(be_unique_id, &nodes);
        if (!err.empty()) {
            LOG(INFO) << "failed to check instance info, err=" << err;
            msg = err;
            break;
        }

        std::vector<NodeInfo> decomission_nodes;
        for (auto& node : nodes) {
            for (auto req_node : request->cluster().nodes()) {
                bool ip_processed = false;
                if (node.node_info.has_ip() && req_node.has_ip()) {
                    std::string endpoint = node.node_info.ip() + ":" +
                                           std::to_string(node.node_info.heartbeat_port());
                    std::string req_endpoint =
                            req_node.ip() + ":" + std::to_string(req_node.heartbeat_port());
                    if (endpoint == req_endpoint) {
                        decomission_nodes.push_back(node);
                        node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
                    }
                    ip_processed = true;
                }

                if (!ip_processed && node.node_info.has_host() && req_node.has_host()) {
                    std::string endpoint = node.node_info.host() + ":" +
                                           std::to_string(node.node_info.heartbeat_port());
                    std::string req_endpoint =
                            req_node.host() + ":" + std::to_string(req_node.heartbeat_port());
                    if (endpoint == req_endpoint) {
                        decomission_nodes.push_back(node);
                        node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
                    }
                }
            }
        }

        {
            std::vector<NodeInfo> to_add;
            std::vector<NodeInfo>& to_del = decomission_nodes;
            msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
        }
        {
            std::vector<NodeInfo>& to_add = decomission_nodes;
            std::vector<NodeInfo> to_del;
            for (auto& node : to_add) {
                node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
                LOG(INFO) << "decomission node, "
                          << "size: " << to_add.size() << " " << node.node_info.DebugString() << " "
                          << node.cluster_id << " " << node.cluster_name;
            }
            msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
        }
    } break;
    case AlterClusterRequest::NOTIFY_DECOMMISSIONED: {
        resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
        if (msg != "") {
            LOG(WARNING) << msg;
            break;
        }

        std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id();
        std::vector<NodeInfo> nodes;
        std::string err = resource_mgr_->get_node(be_unique_id, &nodes);
        if (!err.empty()) {
            LOG(INFO) << "failed to check instance info, err=" << err;
            msg = err;
            break;
        }

        std::vector<NodeInfo> decomission_nodes;
        for (auto& node : nodes) {
            for (auto req_node : request->cluster().nodes()) {
                bool ip_processed = false;
                if (node.node_info.has_ip() && req_node.has_ip()) {
                    std::string endpoint = node.node_info.ip() + ":" +
                                           std::to_string(node.node_info.heartbeat_port());
                    std::string req_endpoint =
                            req_node.ip() + ":" + std::to_string(req_node.heartbeat_port());
                    if (endpoint == req_endpoint) {
                        decomission_nodes.push_back(node);
                    }
                    ip_processed = true;
                }

                if (!ip_processed && node.node_info.has_host() && req_node.has_host()) {
                    std::string endpoint = node.node_info.host() + ":" +
                                           std::to_string(node.node_info.heartbeat_port());
                    std::string req_endpoint =
                            req_node.host() + ":" + std::to_string(req_node.heartbeat_port());
                    if (endpoint == req_endpoint) {
                        decomission_nodes.push_back(node);
                    }
                }
            }
        }

        {
            std::vector<NodeInfo> to_add;
            std::vector<NodeInfo>& to_del = decomission_nodes;
            msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
        }
        {
            std::vector<NodeInfo>& to_add = decomission_nodes;
            std::vector<NodeInfo> to_del;
            for (auto& node : to_add) {
                node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONED);
                LOG(INFO) << "notify node decomissioned, "
                          << " size: " << to_add.size() << " " << node.node_info.DebugString()
                          << " " << node.cluster_id << " " << node.cluster_name;
            }
            msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
        }
    } break;
    case AlterClusterRequest::RENAME_CLUSTER: {
        // SQL mode, cluster cluster name eq empty cluster name, need drop empty cluster first.
        // but in http api, cloud control will drop empty cluster
        bool replace_if_existing_empty_target_cluster =
                request->has_replace_if_existing_empty_target_cluster()
                        ? request->replace_if_existing_empty_target_cluster()
                        : false;

        msg = resource_mgr_->update_cluster(
                instance_id, cluster,
                [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
                [&](ClusterPB& c, std::set<std::string>& cluster_names) {
                    std::string msg;
                    auto it = cluster_names.find(cluster.cluster.cluster_name());
                    LOG(INFO) << "cluster.cluster.cluster_name(): "
                              << cluster.cluster.cluster_name();
                    for (auto itt : cluster_names) {
                        LOG(INFO) << "instance's cluster name : " << itt;
                    }
                    if (it != cluster_names.end()) {
                        code = MetaServiceCode::INVALID_ARGUMENT;
                        ss << "failed to rename cluster, a cluster with the same name already "
                              "exists in this instance "
                           << proto_to_json(c);
                        msg = ss.str();
                        return msg;
                    }
                    if (c.cluster_name() == cluster.cluster.cluster_name()) {
                        code = MetaServiceCode::INVALID_ARGUMENT;
                        ss << "failed to rename cluster, name eq original name, original cluster "
                              "is "
                           << proto_to_json(c);
                        msg = ss.str();
                        return msg;
                    }
                    c.set_cluster_name(cluster.cluster.cluster_name());
                    return msg;
                },
                replace_if_existing_empty_target_cluster);
    } break;
    case AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT: {
        msg = resource_mgr_->update_cluster(
                instance_id, cluster,
                [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
                [&](ClusterPB& c, std::set<std::string>&) {
                    std::string msg;
                    if (!cluster.cluster.has_private_endpoint() ||
                        cluster.cluster.private_endpoint().empty()) {
                        code = MetaServiceCode::CLUSTER_ENDPOINT_MISSING;
                        ss << "missing private endpoint";
                        msg = ss.str();
                        return msg;
                    }

                    c.set_public_endpoint(cluster.cluster.public_endpoint());
                    c.set_private_endpoint(cluster.cluster.private_endpoint());

                    return msg;
                });
    } break;
    case AlterClusterRequest::SET_CLUSTER_STATUS: {
        msg = resource_mgr_->update_cluster(
                instance_id, cluster,
                [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
                [&](ClusterPB& c, std::set<std::string>&) {
                    std::string msg;
                    if (c.cluster_status() == request->cluster().cluster_status()) {
                        code = MetaServiceCode::INVALID_ARGUMENT;
                        ss << "failed to set cluster status, status eq original status, original "
                              "cluster is "
                           << print_cluster_status(c.cluster_status());
                        msg = ss.str();
                        return msg;
                    }
                    // status from -> to
                    std::set<std::pair<cloud::ClusterStatus, cloud::ClusterStatus>>
                            can_work_directed_edges {
                                    {ClusterStatus::UNKNOWN, ClusterStatus::NORMAL},
                                    {ClusterStatus::NORMAL, ClusterStatus::SUSPENDED},
                                    {ClusterStatus::SUSPENDED, ClusterStatus::TO_RESUME},
                                    {ClusterStatus::TO_RESUME, ClusterStatus::NORMAL},
                                    {ClusterStatus::SUSPENDED, ClusterStatus::NORMAL},
                                    {ClusterStatus::NORMAL, ClusterStatus::MANUAL_SHUTDOWN},
                                    {ClusterStatus::MANUAL_SHUTDOWN, ClusterStatus::NORMAL},
                            };
                    auto from = c.cluster_status();
                    auto to = request->cluster().cluster_status();
                    if (can_work_directed_edges.count({from, to}) == 0) {
                        // can't find a directed edge in set, so refuse it
                        code = MetaServiceCode::INVALID_ARGUMENT;
                        ss << "failed to set cluster status, original cluster is "
                           << print_cluster_status(from) << " and want set "
                           << print_cluster_status(to);
                        msg = ss.str();
                        return msg;
                    }
                    c.set_cluster_status(request->cluster().cluster_status());
                    return msg;
                });
    } break;
    default: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid request op, op=" << request->op();
        msg = ss.str();
        return;
    }
    }
    if (!msg.empty() && code == MetaServiceCode::OK) {
        code = MetaServiceCode::UNDEFINED_ERR;
    }

    // ugly but easy to repair
    // not change cloud.proto add err_code
    if (request->op() == AlterClusterRequest::DROP_NODE &&
        msg.find("not found") != std::string::npos) {
        // see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
    }

    if (code != MetaServiceCode::OK) return;

    auto f = new std::function<void()>([instance_id = request->instance_id(), txn_kv = txn_kv_] {
        notify_refresh_instance(txn_kv, instance_id);
    });
    bthread_t bid;
    if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0) {
        LOG(WARNING) << "notify refresh instance inplace, instance_id=" << request->instance_id();
        run_bthread_work(f);
    }
} // alter cluster