in src/meta/server_state.cpp [838:991]
void server_state::on_config_sync(configuration_query_by_node_rpc rpc)
{
configuration_query_by_node_response &response = rpc.response();
const configuration_query_by_node_request &request = rpc.request();
bool reject_this_request = false;
response.__isset.gc_replicas = false;
LOG_INFO("got config sync request from {}, stored_replicas_count({})",
request.node,
request.stored_replicas.size());
{
zauto_read_lock l(_lock);
// sync the partitions to the replica server
node_state *ns = get_node_state(_nodes, request.node, false);
if (ns == nullptr) {
LOG_INFO("node({}) not found in meta server", request.node);
response.err = ERR_OBJECT_NOT_FOUND;
} else {
response.err = ERR_OK;
unsigned int i = 0;
response.partitions.resize(ns->partition_count());
ns->for_each_partition([&, this](const gpid &pid) {
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
CHECK(app, "invalid app_id, app_id = {}", pid.get_app_id());
config_context &cc = app->helpers->contexts[pid.get_partition_index()];
// config sync need the newest data to keep the perfect FD,
// so if the syncing config is related to the node, we may need to reject this
// request
if (cc.stage == config_status::pending_remote_sync) {
configuration_update_request *req = cc.pending_sync_request.get();
// when register child partition, stage is config_status::pending_remote_sync,
// but cc.pending_sync_request is not set, see more in function
// 'register_child_on_meta'
if (req == nullptr || req->node == request.node)
return false;
}
response.partitions[i].info = *app;
response.partitions[i].config = app->partitions[pid.get_partition_index()];
response.partitions[i].host_node = request.node;
// set meta_split_status
const split_state &app_split_states = app->helpers->split_states;
if (app->splitting()) {
auto iter = app_split_states.status.find(pid.get_partition_index());
if (iter != app_split_states.status.end()) {
response.partitions[i].__set_meta_split_status(iter->second);
}
}
++i;
return true;
});
if (i < response.partitions.size()) {
reject_this_request = true;
}
}
// handle the stored replicas & the gc replicas
if (!reject_this_request && request.__isset.stored_replicas) {
if (ns != nullptr)
ns->set_replicas_collect_flag(true);
const std::vector<replica_info> &replicas = request.stored_replicas;
meta_function_level::type level = _meta_svc->get_function_level();
// if the node serve the replica on the meta server, then we ignore it
// if the dropped servers on the meta servers are enough, we need to gc it
// there are not enough dropped servers, we need to add it to dropped
// the app is deleted but not expired, we need to ignore it
// if the app is deleted and expired, we need to gc it
for (const replica_info &rep : replicas) {
LOG_DEBUG("receive stored replica from {}, pid({})", request.node, rep.pid);
std::shared_ptr<app_state> app = get_app(rep.pid.get_app_id());
if (app == nullptr || rep.pid.get_partition_index() >= app->partition_count) {
// This app has garbage partition after cancel split, the canceled child
// partition should be gc
if (app != nullptr &&
rep.pid.get_partition_index() < app->partition_count * 2 &&
rep.status == partition_status::PS_ERROR) {
response.gc_replicas.push_back(rep);
LOG_WARNING(
"notify node({}) to gc replica({}) because it is useless partition "
"which is caused by cancel split",
request.node.to_string(),
rep.pid);
} else {
// app is not recognized or partition is not recognized
CHECK(false,
"gpid({}) on node({}) is not exist on meta server, administrator "
"should check consistency of meta data",
rep.pid,
request.node);
}
} else if (app->status == app_status::AS_DROPPED) {
if (app->expire_second == 0) {
LOG_INFO("gpid({}) on node({}) is of dropped table, but expire second is "
"not specified, do not delete it for safety reason",
rep.pid,
request.node);
} else if (has_seconds_expired(app->expire_second)) {
// can delete replica only when expire second is explicitely specified and
// expired.
if (level <= meta_function_level::fl_steady) {
LOG_INFO("gpid({}) on node({}) is of dropped and expired table, but "
"current function level is {}, do not delete it for safety "
"reason",
rep.pid,
request.node,
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
} else {
response.gc_replicas.push_back(rep);
LOG_WARNING("notify node({}) to gc replica({}) coz the app is "
"dropped and expired",
request.node,
rep.pid);
}
}
} else if (app->status == app_status::AS_AVAILABLE) {
bool is_useful_replica =
collect_replica({&_all_apps, &_nodes}, request.node, rep);
if (!is_useful_replica) {
if (level <= meta_function_level::fl_steady) {
LOG_INFO("gpid({}) on node({}) is useless, but current function "
"level is {}, do not delete it for safety reason",
rep.pid,
request.node,
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
} else {
response.gc_replicas.push_back(rep);
LOG_WARNING("notify node({}) to gc replica({}) coz it is useless",
request.node,
rep.pid);
}
}
}
}
if (!response.gc_replicas.empty()) {
response.__isset.gc_replicas = true;
}
}
}
if (reject_this_request) {
response.err = ERR_BUSY;
response.partitions.clear();
}
LOG_INFO("send config sync response to {}, err({}), partitions_count({}), "
"gc_replicas_count({})",
request.node.to_string(),
response.err,
response.partitions.size(),
response.gc_replicas.size());
}