pc_status partition_guardian::on_missing_primary()

in src/meta/partition_guardian.cpp [226:489]


pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpid &gpid)
{
    const partition_configuration &pc = *get_config(*(view.apps), gpid);
    proposal_actions &acts = get_config_context(*view.apps, gpid)->lb_actions;

    char gpid_name[64];
    snprintf(gpid_name, 64, "%d.%d", gpid.get_app_id(), gpid.get_partition_index());

    configuration_proposal_action action;
    pc_status result = pc_status::invalid;

    action.type = config_type::CT_INVALID;
    // try to upgrade a secondary to primary if the primary is missing
    if (pc.secondaries.size() > 0) {
        action.node.set_invalid();

        for (int i = 0; i < pc.secondaries.size(); ++i) {
            node_state *ns = get_node_state(*(view.nodes), pc.secondaries[i], false);
            CHECK_NOTNULL(
                ns, "invalid secondary address, address = {}", pc.secondaries[i].to_string());
            if (!ns->alive())
                continue;

            // find a node with minimal primaries
            newly_partitions *np = newly_partitions_ext::get_inited(ns);
            if (action.node.is_invalid() ||
                np->less_primaries(*get_newly_partitions(*(view.nodes), action.node),
                                   gpid.get_app_id())) {
                action.node = ns->addr();
            }
        }

        if (action.node.is_invalid()) {
            LOG_ERROR(
                "all nodes for gpid({}) are dead, waiting for some secondary to come back....",
                gpid_name);
            result = pc_status::dead;
        } else {
            action.type = config_type::CT_UPGRADE_TO_PRIMARY;
            newly_partitions *np = get_newly_partitions(*(view.nodes), action.node);
            np->newly_add_primary(gpid.get_app_id(), true);

            action.target = action.node;
            result = pc_status::ill;
        }
    }
    // if nothing in the last_drops, it means that this is a newly created partition, so let's
    // just find a node and assign primary for it.
    else if (pc.last_drops.empty()) {
        dsn::rpc_address min_primary_server;
        newly_partitions *min_primary_server_np = nullptr;

        for (auto &pairs : *view.nodes) {
            node_state &ns = pairs.second;
            if (!ns.alive())
                continue;
            newly_partitions *np = newly_partitions_ext::get_inited(&ns);
            // find a node which has minimal primaries
            if (min_primary_server_np == nullptr ||
                np->less_primaries(*min_primary_server_np, gpid.get_app_id())) {
                min_primary_server = ns.addr();
                min_primary_server_np = np;
            }
        }

        if (min_primary_server_np != nullptr) {
            action.node = min_primary_server;
            action.target = action.node;
            action.type = config_type::CT_ASSIGN_PRIMARY;
            min_primary_server_np->newly_add_primary(gpid.get_app_id(), false);
        }

        result = pc_status::ill;
    }
    // well, all replicas in this partition is dead
    else {
        LOG_WARNING("{} enters DDD state, we are waiting for all replicas to come back, "
                    "and select primary according to informations collected",
                    gpid_name);
        // when considering how to handle the DDD state, we must keep in mind that our
        // shared/private-log data only write to OS-cache.
        // so the last removed replica can't act as primary directly.
        std::string reason;
        config_context &cc = *get_config_context(*view.apps, gpid);
        action.node.set_invalid();
        for (int i = 0; i < cc.dropped.size(); ++i) {
            const dropped_replica &dr = cc.dropped[i];
            char time_buf[30] = {0};
            ::dsn::utils::time_ms_to_string(dr.time, time_buf);
            LOG_INFO("{}: config_context.dropped[{}]: "
                     "node({}), time({})[{}], ballot({}), "
                     "commit_decree({}), prepare_decree({})",
                     gpid_name,
                     i,
                     dr.node,
                     dr.time,
                     time_buf,
                     dr.ballot,
                     dr.last_committed_decree,
                     dr.last_prepared_decree);
        }

        for (int i = 0; i < pc.last_drops.size(); ++i) {
            int dropped_index = -1;
            for (int k = 0; k < cc.dropped.size(); k++) {
                if (cc.dropped[k].node == pc.last_drops[i]) {
                    dropped_index = k;
                    break;
                }
            }
            LOG_INFO("{}: config_context.last_drops[{}]: node({}), dropped_index({})",
                     gpid_name,
                     i,
                     pc.last_drops[i],
                     dropped_index);
        }

        if (pc.last_drops.size() == 1) {
            LOG_WARNING("{}: the only node({}) is dead, waiting it to come back",
                        gpid_name,
                        pc.last_drops.back());
            action.node = pc.last_drops.back();
        } else {
            std::vector<dsn::rpc_address> nodes(pc.last_drops.end() - 2, pc.last_drops.end());
            std::vector<dropped_replica> collected_info(2);
            bool ready = true;

            LOG_INFO("{}: last two drops are {} and {} (the latest dropped)",
                     gpid_name,
                     nodes[0],
                     nodes[1]);

            for (unsigned int i = 0; i < nodes.size(); ++i) {
                node_state *ns = get_node_state(*view.nodes, nodes[i], false);
                if (ns == nullptr || !ns->alive()) {
                    ready = false;
                    reason = "the last dropped node(" + nodes[i].to_std_string() +
                             ") haven't come back yet";
                    LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
                } else {
                    std::vector<dropped_replica>::iterator it = cc.find_from_dropped(nodes[i]);
                    if (it == cc.dropped.end() || it->ballot == invalid_ballot) {
                        if (ns->has_collected()) {
                            LOG_INFO("{}: ignore {}'s replica info as it doesn't exist on "
                                     "replica server",
                                     gpid_name,
                                     nodes[i]);
                            collected_info[i] = {nodes[i], 0, -1, -1, -1};
                        } else {
                            ready = false;
                            reason = "the last dropped node(" + nodes[i].to_std_string() +
                                     ") is unavailable because ";
                            if (it == cc.dropped.end()) {
                                reason += "the node is not exist in dropped_nodes";
                            } else {
                                reason += "replica info has not been collected from the node";
                            }
                            LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
                        }
                    } else {
                        collected_info[i] = *it;
                    }
                }
            }

            if (ready && collected_info[0].ballot == -1 && collected_info[1].ballot == -1) {
                ready = false;
                reason = "no replica info collected from the last two drops";
                LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
            }

            if (ready) {
                dropped_replica &previous_dead = collected_info[0];
                dropped_replica &recent_dead = collected_info[1];

                // 1. larger ballot should have larger committed decree
                // 2. max_prepared_decree should larger than meta's committed decree
                int64_t gap1 = previous_dead.ballot - recent_dead.ballot;
                int64_t gap2 =
                    previous_dead.last_committed_decree - recent_dead.last_committed_decree;
                if (gap1 * gap2 >= 0) {
                    int64_t larger_cd = std::max(previous_dead.last_committed_decree,
                                                 recent_dead.last_committed_decree);
                    int64_t larger_pd = std::max(previous_dead.last_prepared_decree,
                                                 recent_dead.last_prepared_decree);
                    if (larger_pd >= pc.last_committed_decree && larger_pd >= larger_cd) {
                        if (gap1 != 0) {
                            // 1. choose node with larger ballot
                            action.node = gap1 < 0 ? recent_dead.node : previous_dead.node;
                        } else if (gap2 != 0) {
                            // 2. choose node with larger last_committed_decree
                            action.node = gap2 < 0 ? recent_dead.node : previous_dead.node;
                        } else {
                            // 3. choose node with larger last_prepared_decree
                            action.node = previous_dead.last_prepared_decree >
                                                  recent_dead.last_prepared_decree
                                              ? previous_dead.node
                                              : recent_dead.node;
                        }
                        LOG_INFO("{}: select {} as a new primary", gpid_name, action.node);
                    } else {
                        char buf[1000];
                        sprintf(buf,
                                "for the last two drops, larger_prepared_decree(%" PRId64 "), "
                                "last committed decree on meta(%" PRId64 "), "
                                "larger_committed_decree(%" PRId64 ")",
                                larger_pd,
                                pc.last_committed_decree,
                                larger_cd);
                        LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
                    }
                } else {
                    reason = "for the last two drops, the node with larger ballot has smaller last "
                             "committed decree";
                    LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
                }
            }
        }

        if (!action.node.is_invalid()) {
            action.target = action.node;
            action.type = config_type::CT_ASSIGN_PRIMARY;

            get_newly_partitions(*view.nodes, action.node)
                ->newly_add_primary(gpid.get_app_id(), false);
        } else {
            LOG_WARNING("{}: don't select any node for security reason, administrator can select "
                        "a proper one by shell",
                        gpid_name);
            _recent_choose_primary_fail_count->increment();
            ddd_partition_info pinfo;
            pinfo.config = pc;
            for (int i = 0; i < cc.dropped.size(); ++i) {
                const dropped_replica &dr = cc.dropped[i];
                ddd_node_info ninfo;
                ninfo.node = dr.node;
                ninfo.drop_time_ms = dr.time;
                ninfo.ballot = invalid_ballot;
                ninfo.last_committed_decree = invalid_decree;
                ninfo.last_prepared_decree = invalid_decree;
                node_state *ns = get_node_state(*view.nodes, dr.node, false);
                if (ns != nullptr && ns->alive()) {
                    ninfo.is_alive = true;
                    if (ns->has_collected()) {
                        ninfo.is_collected = true;
                        ninfo.ballot = dr.ballot;
                        ninfo.last_committed_decree = dr.last_committed_decree;
                        ninfo.last_prepared_decree = dr.last_prepared_decree;
                    }
                }
                pinfo.dropped.emplace_back(std::move(ninfo));
            }
            pinfo.reason = reason;
            set_ddd_partition(std::move(pinfo));
        }

        result = pc_status::dead;
    }

    if (action.type != config_type::CT_INVALID) {
        acts.assign_cure_proposal(action);
    }
    return result;
}