void server_state::on_config_sync()

in src/meta/server_state.cpp [838:991]


void server_state::on_config_sync(configuration_query_by_node_rpc rpc)
{
    configuration_query_by_node_response &response = rpc.response();
    const configuration_query_by_node_request &request = rpc.request();

    bool reject_this_request = false;
    response.__isset.gc_replicas = false;
    LOG_INFO("got config sync request from {}, stored_replicas_count({})",
             request.node,
             request.stored_replicas.size());

    {
        zauto_read_lock l(_lock);

        // sync the partitions to the replica server
        node_state *ns = get_node_state(_nodes, request.node, false);
        if (ns == nullptr) {
            LOG_INFO("node({}) not found in meta server", request.node);
            response.err = ERR_OBJECT_NOT_FOUND;
        } else {
            response.err = ERR_OK;
            unsigned int i = 0;
            response.partitions.resize(ns->partition_count());
            ns->for_each_partition([&, this](const gpid &pid) {
                std::shared_ptr<app_state> app = get_app(pid.get_app_id());
                CHECK(app, "invalid app_id, app_id = {}", pid.get_app_id());
                config_context &cc = app->helpers->contexts[pid.get_partition_index()];

                // config sync need the newest data to keep the perfect FD,
                // so if the syncing config is related to the node, we may need to reject this
                // request
                if (cc.stage == config_status::pending_remote_sync) {
                    configuration_update_request *req = cc.pending_sync_request.get();
                    // when register child partition, stage is config_status::pending_remote_sync,
                    // but cc.pending_sync_request is not set, see more in function
                    // 'register_child_on_meta'
                    if (req == nullptr || req->node == request.node)
                        return false;
                }

                response.partitions[i].info = *app;
                response.partitions[i].config = app->partitions[pid.get_partition_index()];
                response.partitions[i].host_node = request.node;
                // set meta_split_status
                const split_state &app_split_states = app->helpers->split_states;
                if (app->splitting()) {
                    auto iter = app_split_states.status.find(pid.get_partition_index());
                    if (iter != app_split_states.status.end()) {
                        response.partitions[i].__set_meta_split_status(iter->second);
                    }
                }
                ++i;
                return true;
            });
            if (i < response.partitions.size()) {
                reject_this_request = true;
            }
        }

        // handle the stored replicas & the gc replicas
        if (!reject_this_request && request.__isset.stored_replicas) {
            if (ns != nullptr)
                ns->set_replicas_collect_flag(true);
            const std::vector<replica_info> &replicas = request.stored_replicas;
            meta_function_level::type level = _meta_svc->get_function_level();
            // if the node serve the replica on the meta server, then we ignore it
            // if the dropped servers on the meta servers are enough, we need to gc it
            // there are not enough dropped servers, we need to add it to dropped
            // the app is deleted but not expired, we need to ignore it
            // if the app is deleted and expired, we need to gc it
            for (const replica_info &rep : replicas) {
                LOG_DEBUG("receive stored replica from {}, pid({})", request.node, rep.pid);
                std::shared_ptr<app_state> app = get_app(rep.pid.get_app_id());
                if (app == nullptr || rep.pid.get_partition_index() >= app->partition_count) {
                    // This app has garbage partition after cancel split, the canceled child
                    // partition should be gc
                    if (app != nullptr &&
                        rep.pid.get_partition_index() < app->partition_count * 2 &&
                        rep.status == partition_status::PS_ERROR) {
                        response.gc_replicas.push_back(rep);
                        LOG_WARNING(
                            "notify node({}) to gc replica({}) because it is useless partition "
                            "which is caused by cancel split",
                            request.node.to_string(),
                            rep.pid);
                    } else {
                        // app is not recognized or partition is not recognized
                        CHECK(false,
                              "gpid({}) on node({}) is not exist on meta server, administrator "
                              "should check consistency of meta data",
                              rep.pid,
                              request.node);
                    }
                } else if (app->status == app_status::AS_DROPPED) {
                    if (app->expire_second == 0) {
                        LOG_INFO("gpid({}) on node({}) is of dropped table, but expire second is "
                                 "not specified, do not delete it for safety reason",
                                 rep.pid,
                                 request.node);
                    } else if (has_seconds_expired(app->expire_second)) {
                        // can delete replica only when expire second is explicitely specified and
                        // expired.
                        if (level <= meta_function_level::fl_steady) {
                            LOG_INFO("gpid({}) on node({}) is of dropped and expired table, but "
                                     "current function level is {}, do not delete it for safety "
                                     "reason",
                                     rep.pid,
                                     request.node,
                                     _meta_function_level_VALUES_TO_NAMES.find(level)->second);
                        } else {
                            response.gc_replicas.push_back(rep);
                            LOG_WARNING("notify node({}) to gc replica({}) coz the app is "
                                        "dropped and expired",
                                        request.node,
                                        rep.pid);
                        }
                    }
                } else if (app->status == app_status::AS_AVAILABLE) {
                    bool is_useful_replica =
                        collect_replica({&_all_apps, &_nodes}, request.node, rep);
                    if (!is_useful_replica) {
                        if (level <= meta_function_level::fl_steady) {
                            LOG_INFO("gpid({}) on node({}) is useless, but current function "
                                     "level is {}, do not delete it for safety reason",
                                     rep.pid,
                                     request.node,
                                     _meta_function_level_VALUES_TO_NAMES.find(level)->second);
                        } else {
                            response.gc_replicas.push_back(rep);
                            LOG_WARNING("notify node({}) to gc replica({}) coz it is useless",
                                        request.node,
                                        rep.pid);
                        }
                    }
                }
            }

            if (!response.gc_replicas.empty()) {
                response.__isset.gc_replicas = true;
            }
        }
    }

    if (reject_this_request) {
        response.err = ERR_BUSY;
        response.partitions.clear();
    }
    LOG_INFO("send config sync response to {}, err({}), partitions_count({}), "
             "gc_replicas_count({})",
             request.node.to_string(),
             response.err,
             response.partitions.size(),
             response.gc_replicas.size());
}