in cloud/src/meta-service/meta_service_resource.cpp [2025:2387]
void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
const AlterClusterRequest* request,
AlterClusterResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(alter_cluster);
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = request->has_instance_id() ? request->instance_id() : "";
if (!cloud_unique_id.empty() && instance_id.empty()) {
auto [is_degraded_format, id] =
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (config::enable_check_instance_id && is_degraded_format &&
!resource_mgr_->is_instance_id_registered(id)) {
msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" +
cloud_unique_id;
LOG(WARNING) << msg;
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
return;
}
}
if (instance_id.empty() || !request->has_cluster()) {
msg = "invalid request instance_id or cluster not given";
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
if (!request->has_op()) {
msg = "op not given";
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
LOG(INFO) << "alter cluster instance_id=" << instance_id << " op=" << request->op();
ClusterInfo cluster;
cluster.cluster.CopyFrom(request->cluster());
switch (request->op()) {
case AlterClusterRequest::ADD_CLUSTER: {
auto r = resource_mgr_->add_cluster(instance_id, cluster);
code = r.first;
msg = r.second;
} break;
case AlterClusterRequest::DROP_CLUSTER: {
auto r = resource_mgr_->drop_cluster(instance_id, cluster);
code = r.first;
msg = r.second;
} break;
case AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME: {
msg = resource_mgr_->update_cluster(
instance_id, cluster,
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
[&](ClusterPB& c, std::set<std::string>&) {
auto& mysql_user_names = cluster.cluster.mysql_user_name();
c.mutable_mysql_user_name()->CopyFrom(mysql_user_names);
return "";
});
} break;
case AlterClusterRequest::ADD_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
if (msg != "") {
LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
std::vector<NodeInfo> to_del;
for (auto& n : request->cluster().nodes()) {
NodeInfo node;
node.instance_id = request->instance_id();
node.node_info = n;
node.cluster_id = request->cluster().cluster_id();
node.cluster_name = request->cluster().cluster_name();
node.role =
(request->cluster().type() == ClusterPB::SQL
? Role::SQL_SERVER
: (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE
: Role::UNDEFINED));
node.node_info.set_status(NodeStatusPB::NODE_STATUS_RUNNING);
to_add.emplace_back(std::move(node));
}
msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
} break;
case AlterClusterRequest::DROP_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
if (msg != "") {
LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
std::vector<NodeInfo> to_del;
for (auto& n : request->cluster().nodes()) {
NodeInfo node;
node.instance_id = request->instance_id();
node.node_info = n;
node.cluster_id = request->cluster().cluster_id();
node.cluster_name = request->cluster().cluster_name();
node.role =
(request->cluster().type() == ClusterPB::SQL
? Role::SQL_SERVER
: (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE
: Role::UNDEFINED));
to_del.emplace_back(std::move(node));
}
msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
} break;
case AlterClusterRequest::DECOMMISSION_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
if (msg != "") {
LOG(WARNING) << msg;
break;
}
std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id();
std::vector<NodeInfo> nodes;
std::string err = resource_mgr_->get_node(be_unique_id, &nodes);
if (!err.empty()) {
LOG(INFO) << "failed to check instance info, err=" << err;
msg = err;
break;
}
std::vector<NodeInfo> decomission_nodes;
for (auto& node : nodes) {
for (auto req_node : request->cluster().nodes()) {
bool ip_processed = false;
if (node.node_info.has_ip() && req_node.has_ip()) {
std::string endpoint = node.node_info.ip() + ":" +
std::to_string(node.node_info.heartbeat_port());
std::string req_endpoint =
req_node.ip() + ":" + std::to_string(req_node.heartbeat_port());
if (endpoint == req_endpoint) {
decomission_nodes.push_back(node);
node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
}
ip_processed = true;
}
if (!ip_processed && node.node_info.has_host() && req_node.has_host()) {
std::string endpoint = node.node_info.host() + ":" +
std::to_string(node.node_info.heartbeat_port());
std::string req_endpoint =
req_node.host() + ":" + std::to_string(req_node.heartbeat_port());
if (endpoint == req_endpoint) {
decomission_nodes.push_back(node);
node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
}
}
}
}
{
std::vector<NodeInfo> to_add;
std::vector<NodeInfo>& to_del = decomission_nodes;
msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
}
{
std::vector<NodeInfo>& to_add = decomission_nodes;
std::vector<NodeInfo> to_del;
for (auto& node : to_add) {
node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
LOG(INFO) << "decomission node, "
<< "size: " << to_add.size() << " " << node.node_info.DebugString() << " "
<< node.cluster_id << " " << node.cluster_name;
}
msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
}
} break;
case AlterClusterRequest::NOTIFY_DECOMMISSIONED: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false, false);
if (msg != "") {
LOG(WARNING) << msg;
break;
}
std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id();
std::vector<NodeInfo> nodes;
std::string err = resource_mgr_->get_node(be_unique_id, &nodes);
if (!err.empty()) {
LOG(INFO) << "failed to check instance info, err=" << err;
msg = err;
break;
}
std::vector<NodeInfo> decomission_nodes;
for (auto& node : nodes) {
for (auto req_node : request->cluster().nodes()) {
bool ip_processed = false;
if (node.node_info.has_ip() && req_node.has_ip()) {
std::string endpoint = node.node_info.ip() + ":" +
std::to_string(node.node_info.heartbeat_port());
std::string req_endpoint =
req_node.ip() + ":" + std::to_string(req_node.heartbeat_port());
if (endpoint == req_endpoint) {
decomission_nodes.push_back(node);
}
ip_processed = true;
}
if (!ip_processed && node.node_info.has_host() && req_node.has_host()) {
std::string endpoint = node.node_info.host() + ":" +
std::to_string(node.node_info.heartbeat_port());
std::string req_endpoint =
req_node.host() + ":" + std::to_string(req_node.heartbeat_port());
if (endpoint == req_endpoint) {
decomission_nodes.push_back(node);
}
}
}
}
{
std::vector<NodeInfo> to_add;
std::vector<NodeInfo>& to_del = decomission_nodes;
msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
}
{
std::vector<NodeInfo>& to_add = decomission_nodes;
std::vector<NodeInfo> to_del;
for (auto& node : to_add) {
node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONED);
LOG(INFO) << "notify node decomissioned, "
<< " size: " << to_add.size() << " " << node.node_info.DebugString()
<< " " << node.cluster_id << " " << node.cluster_name;
}
msg = resource_mgr_->modify_nodes(instance_id, to_add, to_del);
}
} break;
case AlterClusterRequest::RENAME_CLUSTER: {
// SQL mode, cluster cluster name eq empty cluster name, need drop empty cluster first.
// but in http api, cloud control will drop empty cluster
bool replace_if_existing_empty_target_cluster =
request->has_replace_if_existing_empty_target_cluster()
? request->replace_if_existing_empty_target_cluster()
: false;
msg = resource_mgr_->update_cluster(
instance_id, cluster,
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
[&](ClusterPB& c, std::set<std::string>& cluster_names) {
std::string msg;
auto it = cluster_names.find(cluster.cluster.cluster_name());
LOG(INFO) << "cluster.cluster.cluster_name(): "
<< cluster.cluster.cluster_name();
for (auto itt : cluster_names) {
LOG(INFO) << "instance's cluster name : " << itt;
}
if (it != cluster_names.end()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "failed to rename cluster, a cluster with the same name already "
"exists in this instance "
<< proto_to_json(c);
msg = ss.str();
return msg;
}
if (c.cluster_name() == cluster.cluster.cluster_name()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "failed to rename cluster, name eq original name, original cluster "
"is "
<< proto_to_json(c);
msg = ss.str();
return msg;
}
c.set_cluster_name(cluster.cluster.cluster_name());
return msg;
},
replace_if_existing_empty_target_cluster);
} break;
case AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT: {
msg = resource_mgr_->update_cluster(
instance_id, cluster,
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
[&](ClusterPB& c, std::set<std::string>&) {
std::string msg;
if (!cluster.cluster.has_private_endpoint() ||
cluster.cluster.private_endpoint().empty()) {
code = MetaServiceCode::CLUSTER_ENDPOINT_MISSING;
ss << "missing private endpoint";
msg = ss.str();
return msg;
}
c.set_public_endpoint(cluster.cluster.public_endpoint());
c.set_private_endpoint(cluster.cluster.private_endpoint());
return msg;
});
} break;
case AlterClusterRequest::SET_CLUSTER_STATUS: {
msg = resource_mgr_->update_cluster(
instance_id, cluster,
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
[&](ClusterPB& c, std::set<std::string>&) {
std::string msg;
if (c.cluster_status() == request->cluster().cluster_status()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "failed to set cluster status, status eq original status, original "
"cluster is "
<< print_cluster_status(c.cluster_status());
msg = ss.str();
return msg;
}
// status from -> to
std::set<std::pair<cloud::ClusterStatus, cloud::ClusterStatus>>
can_work_directed_edges {
{ClusterStatus::UNKNOWN, ClusterStatus::NORMAL},
{ClusterStatus::NORMAL, ClusterStatus::SUSPENDED},
{ClusterStatus::SUSPENDED, ClusterStatus::TO_RESUME},
{ClusterStatus::TO_RESUME, ClusterStatus::NORMAL},
{ClusterStatus::SUSPENDED, ClusterStatus::NORMAL},
{ClusterStatus::NORMAL, ClusterStatus::MANUAL_SHUTDOWN},
{ClusterStatus::MANUAL_SHUTDOWN, ClusterStatus::NORMAL},
};
auto from = c.cluster_status();
auto to = request->cluster().cluster_status();
if (can_work_directed_edges.count({from, to}) == 0) {
// can't find a directed edge in set, so refuse it
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "failed to set cluster status, original cluster is "
<< print_cluster_status(from) << " and want set "
<< print_cluster_status(to);
msg = ss.str();
return msg;
}
c.set_cluster_status(request->cluster().cluster_status());
return msg;
});
} break;
default: {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "invalid request op, op=" << request->op();
msg = ss.str();
return;
}
}
if (!msg.empty() && code == MetaServiceCode::OK) {
code = MetaServiceCode::UNDEFINED_ERR;
}
// ugly but easy to repair
// not change cloud.proto add err_code
if (request->op() == AlterClusterRequest::DROP_NODE &&
msg.find("not found") != std::string::npos) {
// see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404
code = MetaServiceCode::CLUSTER_NOT_FOUND;
}
if (code != MetaServiceCode::OK) return;
auto f = new std::function<void()>([instance_id = request->instance_id(), txn_kv = txn_kv_] {
notify_refresh_instance(txn_kv, instance_id);
});
bthread_t bid;
if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0) {
LOG(WARNING) << "notify refresh instance inplace, instance_id=" << request->instance_id();
run_bthread_work(f);
}
} // alter cluster