in cloud/src/resource-manager/resource_manager.cpp [854:1216]
std::string ResourceManager::modify_nodes(const std::string& instance_id,
const std::vector<NodeInfo>& to_add,
const std::vector<NodeInfo>& to_del) {
std::string msg;
std::stringstream ss;
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [&msg](int*) { LOG(INFO) << "modify_nodes err=" << msg; });
if ((to_add.size() && to_del.size()) || (!to_add.size() && !to_del.size())) {
msg = "to_add and to_del both empty or both not empty";
LOG(WARNING) << msg;
return msg;
}
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to create txn";
LOG(WARNING) << msg << " err=" << msg;
return msg;
}
std::shared_ptr<Transaction> txn(txn0.release());
InstanceInfoPB instance;
auto [c0, m0] = get_instance(txn, instance_id, &instance);
TEST_SYNC_POINT_CALLBACK("modify_nodes:get_instance", &c0, &instance);
if (c0 != TxnErrorCode::TXN_OK) {
msg = m0;
return msg;
}
if (instance.status() == InstanceInfoPB::DELETED) {
msg = "instance status has been set delete, plz check it";
LOG(WARNING) << msg;
return msg;
}
LOG(INFO) << "instance json=" << proto_to_json(instance);
if (!to_add.empty()) {
// add nodes
// Check duplicated nodes, one node cannot deploy on multiple clusters
// diff instance's nodes and to_add nodes
const auto& [fe_endpoints_registered, be_endpoints_registered] =
get_nodes_endpoint_registered(instance);
for (auto& add_node : to_add) {
const ClusterPB::Type type =
add_node.role == Role::SQL_SERVER ? ClusterPB::SQL : ClusterPB::COMPUTE;
const auto& [c1, m1] = check_node_has_been_registered(
type, add_node.node_info, fe_endpoints_registered, be_endpoints_registered);
if (c1 != MetaServiceCode::OK) {
return m1;
}
}
}
// a vector save (origin_cluster , changed_cluster), to update ms mem
std::vector<std::pair<ClusterPB, ClusterPB>> change_from_to_clusters;
using modify_impl_func = std::function<std::string(const ClusterPB& c, const NodeInfo& n)>;
using check_func = std::function<std::string(const NodeInfo& n)>;
auto modify_func = [&](const NodeInfo& node, check_func check,
modify_impl_func action) -> std::string {
std::string cluster_id = node.cluster_id;
std::string cluster_name = node.cluster_name;
{
std::shared_lock l(mtx_);
msg = check(node);
if (msg != "") {
return msg;
}
}
LOG(INFO) << "node to modify json=" << proto_to_json(node.node_info);
for (auto& c : instance.clusters()) {
if ((c.has_cluster_name() && c.cluster_name() == cluster_name) ||
(c.has_cluster_id() && c.cluster_id() == cluster_id)) {
msg = action(c, node);
if (msg != "") {
return msg;
}
}
}
return "";
};
// check in ms mem cache
check_func check_to_add = [&](const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id());
if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) {
return "";
}
for (auto it = start; it != end; ++it) {
if (it->second.instance_id != n.instance_id) {
// different instance, but has same cloud_unique_id
s << "cloud_unique_id is already occupied by an instance,"
<< " instance_id=" << it->second.instance_id
<< " cluster_name=" << it->second.cluster_name
<< " cluster_id=" << it->second.cluster_id
<< " cloud_unique_id=" << n.node_info.cloud_unique_id();
err = s.str();
LOG(INFO) << err;
return err;
}
}
return "";
};
// modify kv
modify_impl_func modify_to_add = [&](const ClusterPB& c, const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
ClusterPB copied_original_cluster;
ClusterPB copied_cluster;
bool is_compute_node = n.node_info.has_heartbeat_port();
for (auto it = c.nodes().begin(); it != c.nodes().end(); ++it) {
if (it->has_ip() && n.node_info.has_ip()) {
std::string c_endpoint = it->ip() + ":" +
(is_compute_node ? std::to_string(it->heartbeat_port())
: std::to_string(it->edit_log_port()));
std::string n_endpoint =
n.node_info.ip() + ":" +
(is_compute_node ? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (c_endpoint == n_endpoint) {
// replicate request, do nothing
return "";
}
}
if (it->has_host() && n.node_info.has_host()) {
std::string c_endpoint_host =
it->host() + ":" +
(is_compute_node ? std::to_string(it->heartbeat_port())
: std::to_string(it->edit_log_port()));
std::string n_endpoint_host =
n.node_info.host() + ":" +
(is_compute_node ? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (c_endpoint_host == n_endpoint_host) {
// replicate request, do nothing
return "";
}
}
}
// add ctime and mtime
auto& node = const_cast<std::decay_t<decltype(n.node_info)>&>(n.node_info);
auto now_time = std::chrono::system_clock::now();
uint64_t time =
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch())
.count();
if (!node.has_ctime()) {
node.set_ctime(time);
}
node.set_mtime(time);
copied_original_cluster.CopyFrom(c);
auto& change_cluster = const_cast<std::decay_t<decltype(c)>&>(c);
change_cluster.add_nodes()->CopyFrom(node);
copied_cluster.CopyFrom(change_cluster);
change_from_to_clusters.emplace_back(std::move(copied_original_cluster),
std::move(copied_cluster));
return "";
};
for (auto& it : to_add) {
msg = modify_func(it, check_to_add, modify_to_add);
if (msg != "") {
LOG(WARNING) << msg;
return msg;
}
}
std::string cache_err_help_msg =
"Ms nodes memory cache may be inconsistent, pls check registry key may be contain "
"127.0.0.1, and call get_instance api get instance info from fdb";
// check in ms mem cache
check_func check_to_del = [&](const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
auto [start, end] = node_info_.equal_range(n.node_info.cloud_unique_id());
if (start == node_info_.end() || start->first != n.node_info.cloud_unique_id()) {
s << "can not find to drop nodes by cloud_unique_id=" << n.node_info.cloud_unique_id()
<< " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name
<< " cluster_id=" << n.cluster_id << " help Msg=" << cache_err_help_msg;
err = s.str();
LOG(WARNING) << err;
return std::string("not found ,") + err;
}
bool found = false;
for (auto it = start; it != end; ++it) {
const auto& m_node = it->second.node_info;
if (m_node.has_ip() && n.node_info.has_ip()) {
std::string m_endpoint =
m_node.ip() + ":" +
(m_node.has_heartbeat_port() ? std::to_string(m_node.heartbeat_port())
: std::to_string(m_node.edit_log_port()));
std::string n_endpoint = n.node_info.ip() + ":" +
(n.node_info.has_heartbeat_port()
? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (m_endpoint == n_endpoint) {
found = true;
break;
}
}
if (m_node.has_host() && n.node_info.has_host()) {
std::string m_endpoint_host =
m_node.host() + ":" +
(m_node.has_heartbeat_port() ? std::to_string(m_node.heartbeat_port())
: std::to_string(m_node.edit_log_port()));
std::string n_endpoint_host =
n.node_info.host() + ":" +
(n.node_info.has_heartbeat_port()
? std::to_string(n.node_info.heartbeat_port())
: std::to_string(n.node_info.edit_log_port()));
if (m_endpoint_host == n_endpoint_host) {
found = true;
break;
}
}
}
if (!found) {
s << "cloud_unique_id can not find to drop node,"
<< " instance_id=" << n.instance_id << " cluster_name=" << n.cluster_name
<< " cluster_id=" << n.cluster_id << " node_info=" << n.node_info.DebugString()
<< " help Msg=" << cache_err_help_msg;
err = s.str();
LOG(WARNING) << err;
return std::string("not found ,") + err;
}
return "";
};
// modify kv
modify_impl_func modify_to_del = [&](const ClusterPB& c, const NodeInfo& n) -> std::string {
std::string err;
std::stringstream s;
ClusterPB copied_original_cluster;
ClusterPB copied_cluster;
bool found = false;
int idx = -1;
// ni: to drop node
const auto& ni = n.node_info;
// c.nodes: cluster registered nodes
NodeInfoPB copy_node;
for (auto& cn : c.nodes()) {
idx++;
if (cn.has_ip() && ni.has_ip()) {
std::string cn_endpoint =
cn.ip() + ":" +
(cn.has_heartbeat_port() ? std::to_string(cn.heartbeat_port())
: std::to_string(cn.edit_log_port()));
std::string ni_endpoint =
ni.ip() + ":" +
(ni.has_heartbeat_port() ? std::to_string(ni.heartbeat_port())
: std::to_string(ni.edit_log_port()));
if (ni.cloud_unique_id() == cn.cloud_unique_id() && cn_endpoint == ni_endpoint) {
copy_node.CopyFrom(cn);
found = true;
break;
}
}
if (cn.has_host() && ni.has_host()) {
std::string cn_endpoint_host =
cn.host() + ":" +
(cn.has_heartbeat_port() ? std::to_string(cn.heartbeat_port())
: std::to_string(cn.edit_log_port()));
std::string ni_endpoint_host =
ni.host() + ":" +
(ni.has_heartbeat_port() ? std::to_string(ni.heartbeat_port())
: std::to_string(ni.edit_log_port()));
if (ni.cloud_unique_id() == cn.cloud_unique_id() &&
cn_endpoint_host == ni_endpoint_host) {
copy_node.CopyFrom(cn);
found = true;
break;
}
}
}
if (!found) {
s << "failed to find node to drop,"
<< " instance_id=" << instance.instance_id() << " cluster_id=" << c.cluster_id()
<< " cluster_name=" << c.cluster_name() << " cluster=" << proto_to_json(c)
<< " help Msg =" << cache_err_help_msg;
err = s.str();
LOG(WARNING) << err;
return std::string("not found ,") + err;
}
// check drop fe node
if (ClusterPB::SQL == c.type() && !is_sql_node_exceeded_safe_drop_time(copy_node)) {
s << "drop fe node not in safe time, try later, node=" << copy_node.DebugString();
err = s.str();
LOG(WARNING) << err;
return err;
}
copied_original_cluster.CopyFrom(c);
auto& change_nodes = const_cast<std::decay_t<decltype(c.nodes())>&>(c.nodes());
change_nodes.DeleteSubrange(idx, 1); // Remove it
copied_cluster.CopyFrom(c);
change_from_to_clusters.emplace_back(std::move(copied_original_cluster),
std::move(copied_cluster));
return "";
};
for (auto& it : to_del) {
msg = modify_func(it, check_to_del, modify_to_del);
if (msg != "") {
LOG(WARNING) << msg;
return msg;
}
}
LOG(INFO) << "instance " << instance_id << " info: " << instance.DebugString();
// here, instance has been changed, not save in fdb
if ((!to_add.empty() || !to_del.empty()) && !is_instance_valid(instance)) {
msg = "instance invalid, cant modify, plz check";
LOG(WARNING) << msg;
return msg;
}
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
val = instance.SerializeAsString();
if (val.empty()) {
msg = "failed to serialize";
return msg;
}
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
TxnErrorCode err_code = txn->commit();
if (err_code != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
LOG(WARNING) << msg << " err=" << err_code;
return msg;
}
for (auto& it : change_from_to_clusters) {
update_cluster_to_index(instance_id, it.first, it.second);
}
return "";
}