bool server_state::check_all_partitions()

in src/meta/server_state.cpp [2794:2970]


bool server_state::check_all_partitions()
{
    int healthy_partitions = 0;
    int total_partitions = 0;
    meta_function_level::type level = _meta_svc->get_function_level();

    zauto_write_lock l(_lock);

    update_partition_metrics();

    // first the cure stage
    if (level <= meta_function_level::fl_freezed) {
        LOG_INFO("service is in level({}), don't do any cure or balancer actions",
                 _meta_function_level_VALUES_TO_NAMES.find(level)->second);
        return false;
    }
    LOG_INFO("start to check all partitions, add_secondary_enable_flow_control = {}, "
             "add_secondary_max_count_for_one_node = {}",
             _add_secondary_enable_flow_control ? "true" : "false",
             _add_secondary_max_count_for_one_node);
    _meta_svc->get_partition_guardian()->clear_ddd_partitions();
    int send_proposal_count = 0;
    std::vector<configuration_proposal_action> add_secondary_actions;
    std::vector<gpid> add_secondary_gpids;
    std::vector<bool> add_secondary_proposed;
    std::map<host_port, int> add_secondary_running_nodes; // node --> running_count
    for (auto &app_pair : _exist_apps) {
        std::shared_ptr<app_state> &app = app_pair.second;
        if (app->status == app_status::AS_CREATING || app->status == app_status::AS_DROPPING) {
            LOG_INFO("ignore app({})({}) because it's status is {}",
                     app->app_name,
                     app->app_id,
                     ::dsn::enum_to_string(app->status));
            continue;
        }
        for (unsigned int i = 0; i != app->partition_count; ++i) {
            const auto &pc = app->pcs[i];
            const auto &cc = app->helpers->contexts[i];
            // partition is under re-configuration or is child partition
            if (cc.stage != config_status::pending_remote_sync && pc.ballot != invalid_ballot) {
                configuration_proposal_action action;
                pc_status s = _meta_svc->get_partition_guardian()->cure(
                    {&_all_apps, &_nodes}, pc.pid, action);
                LOG_DEBUG("gpid({}) is in status({})", pc.pid, enum_to_string(s));
                if (pc_status::healthy != s) {
                    if (action.type != config_type::CT_INVALID) {
                        if (action.type == config_type::CT_ADD_SECONDARY ||
                            action.type == config_type::CT_ADD_SECONDARY_FOR_LB) {
                            add_secondary_actions.push_back(std::move(action));
                            add_secondary_gpids.push_back(pc.pid);
                            add_secondary_proposed.push_back(false);
                        } else {
                            send_proposal(action, pc, *app);
                            send_proposal_count++;
                        }
                    }
                } else {
                    healthy_partitions++;
                }
            } else {
                LOG_INFO("ignore gpid({}) as it's stage is pending_remote_sync", pc.pid);
            }
        }
        total_partitions += app->partition_count;
    }

    // assign secondary for urgent
    for (int i = 0; i < add_secondary_actions.size(); ++i) {
        gpid &pid = add_secondary_gpids[i];
        const auto *pc = get_config(_all_apps, pid);
        if (!add_secondary_proposed[i] && pc->hp_secondaries.empty()) {
            const auto &action = add_secondary_actions[i];
            CHECK(action.hp_node, "");
            if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node] >=
                                                          _add_secondary_max_count_for_one_node) {
                // ignore
                continue;
            }
            std::shared_ptr<app_state> app = get_app(pid.get_app_id());
            send_proposal(action, *pc, *app);
            send_proposal_count++;
            add_secondary_proposed[i] = true;
            add_secondary_running_nodes[action.hp_node]++;
        }
    }

    // assign secondary for all
    for (int i = 0; i < add_secondary_actions.size(); ++i) {
        if (!add_secondary_proposed[i]) {
            const auto &action = add_secondary_actions[i];
            CHECK(action.hp_node, "");
            gpid pid = add_secondary_gpids[i];
            const auto *pc = get_config(_all_apps, pid);
            if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node] >=
                                                          _add_secondary_max_count_for_one_node) {
                LOG_INFO("do not send {} proposal for gpid({}) for flow control reason, target = "
                         "{}, node = {}",
                         ::dsn::enum_to_string(action.type),
                         pc->pid,
                         FMT_HOST_PORT_AND_IP(action, target),
                         FMT_HOST_PORT_AND_IP(action, node));
                continue;
            }
            std::shared_ptr<app_state> app = get_app(pid.get_app_id());
            send_proposal(action, *pc, *app);
            send_proposal_count++;
            add_secondary_proposed[i] = true;
            add_secondary_running_nodes[action.hp_node]++;
        }
    }

    int ignored_add_secondary_count = 0;
    int add_secondary_count = 0;
    for (int i = 0; i < add_secondary_actions.size(); ++i) {
        if (!add_secondary_proposed[i]) {
            ignored_add_secondary_count++;
        } else {
            add_secondary_count++;
        }
    }

    LOG_INFO("check all partitions done, send_proposal_count = {}, add_secondary_count = {}, "
             "ignored_add_secondary_count = {}",
             send_proposal_count,
             add_secondary_count,
             ignored_add_secondary_count);

    // then the balancer stage
    if (level < meta_function_level::fl_steady) {
        LOG_INFO("don't do replica migration coz meta server is in level({})",
                 _meta_function_level_VALUES_TO_NAMES.find(level)->second);
        return false;
    }

    if (healthy_partitions != total_partitions) {
        LOG_INFO("don't do replica migration coz {}/{} partitions aren't healthy",
                 total_partitions - healthy_partitions,
                 total_partitions);
        return false;
    }

    if (!can_run_balancer()) {
        LOG_INFO("don't do replica migration coz can_run_balancer() returns false");
        return false;
    }

    if (level == meta_function_level::fl_steady) {
        LOG_INFO("check if any replica migration can be done when meta server is in level({})",
                 _meta_function_level_VALUES_TO_NAMES.find(level)->second);
        _meta_svc->get_balancer()->check({&_all_apps, &_nodes}, _temporary_list);
        LOG_INFO("balance checker operation count = {}", _temporary_list.size());
        // update balance checker operation count
        _meta_svc->get_balancer()->report(_temporary_list, true);
        return false;
    }

    if (_meta_svc->get_balancer()->balance({&_all_apps, &_nodes}, _temporary_list)) {
        LOG_INFO("try to do replica migration");
        _meta_svc->get_balancer()->apply_balancer({&_all_apps, &_nodes}, _temporary_list);
        // update balancer action details
        _meta_svc->get_balancer()->report(_temporary_list, false);
        if (_replica_migration_subscriber)
            _replica_migration_subscriber(_temporary_list);
        tasking::enqueue(LPC_META_STATE_NORMAL,
                         _meta_svc->tracker(),
                         std::bind(&meta_service::balancer_run, _meta_svc));
        return false;
    }

    LOG_INFO("check if any replica migration left");
    _meta_svc->get_balancer()->check({&_all_apps, &_nodes}, _temporary_list);
    LOG_INFO("balance checker operation count = {}", _temporary_list.size());
    // update balance checker operation count
    _meta_svc->get_balancer()->report(_temporary_list, true);

    return true;
}