in src/meta/partition_guardian.cpp [226:489]
pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpid &gpid)
{
const partition_configuration &pc = *get_config(*(view.apps), gpid);
proposal_actions &acts = get_config_context(*view.apps, gpid)->lb_actions;
char gpid_name[64];
snprintf(gpid_name, 64, "%d.%d", gpid.get_app_id(), gpid.get_partition_index());
configuration_proposal_action action;
pc_status result = pc_status::invalid;
action.type = config_type::CT_INVALID;
// try to upgrade a secondary to primary if the primary is missing
if (pc.secondaries.size() > 0) {
action.node.set_invalid();
for (int i = 0; i < pc.secondaries.size(); ++i) {
node_state *ns = get_node_state(*(view.nodes), pc.secondaries[i], false);
CHECK_NOTNULL(
ns, "invalid secondary address, address = {}", pc.secondaries[i].to_string());
if (!ns->alive())
continue;
// find a node with minimal primaries
newly_partitions *np = newly_partitions_ext::get_inited(ns);
if (action.node.is_invalid() ||
np->less_primaries(*get_newly_partitions(*(view.nodes), action.node),
gpid.get_app_id())) {
action.node = ns->addr();
}
}
if (action.node.is_invalid()) {
LOG_ERROR(
"all nodes for gpid({}) are dead, waiting for some secondary to come back....",
gpid_name);
result = pc_status::dead;
} else {
action.type = config_type::CT_UPGRADE_TO_PRIMARY;
newly_partitions *np = get_newly_partitions(*(view.nodes), action.node);
np->newly_add_primary(gpid.get_app_id(), true);
action.target = action.node;
result = pc_status::ill;
}
}
// if nothing in the last_drops, it means that this is a newly created partition, so let's
// just find a node and assign primary for it.
else if (pc.last_drops.empty()) {
dsn::rpc_address min_primary_server;
newly_partitions *min_primary_server_np = nullptr;
for (auto &pairs : *view.nodes) {
node_state &ns = pairs.second;
if (!ns.alive())
continue;
newly_partitions *np = newly_partitions_ext::get_inited(&ns);
// find a node which has minimal primaries
if (min_primary_server_np == nullptr ||
np->less_primaries(*min_primary_server_np, gpid.get_app_id())) {
min_primary_server = ns.addr();
min_primary_server_np = np;
}
}
if (min_primary_server_np != nullptr) {
action.node = min_primary_server;
action.target = action.node;
action.type = config_type::CT_ASSIGN_PRIMARY;
min_primary_server_np->newly_add_primary(gpid.get_app_id(), false);
}
result = pc_status::ill;
}
// well, all replicas in this partition is dead
else {
LOG_WARNING("{} enters DDD state, we are waiting for all replicas to come back, "
"and select primary according to informations collected",
gpid_name);
// when considering how to handle the DDD state, we must keep in mind that our
// shared/private-log data only write to OS-cache.
// so the last removed replica can't act as primary directly.
std::string reason;
config_context &cc = *get_config_context(*view.apps, gpid);
action.node.set_invalid();
for (int i = 0; i < cc.dropped.size(); ++i) {
const dropped_replica &dr = cc.dropped[i];
char time_buf[30] = {0};
::dsn::utils::time_ms_to_string(dr.time, time_buf);
LOG_INFO("{}: config_context.dropped[{}]: "
"node({}), time({})[{}], ballot({}), "
"commit_decree({}), prepare_decree({})",
gpid_name,
i,
dr.node,
dr.time,
time_buf,
dr.ballot,
dr.last_committed_decree,
dr.last_prepared_decree);
}
for (int i = 0; i < pc.last_drops.size(); ++i) {
int dropped_index = -1;
for (int k = 0; k < cc.dropped.size(); k++) {
if (cc.dropped[k].node == pc.last_drops[i]) {
dropped_index = k;
break;
}
}
LOG_INFO("{}: config_context.last_drops[{}]: node({}), dropped_index({})",
gpid_name,
i,
pc.last_drops[i],
dropped_index);
}
if (pc.last_drops.size() == 1) {
LOG_WARNING("{}: the only node({}) is dead, waiting it to come back",
gpid_name,
pc.last_drops.back());
action.node = pc.last_drops.back();
} else {
std::vector<dsn::rpc_address> nodes(pc.last_drops.end() - 2, pc.last_drops.end());
std::vector<dropped_replica> collected_info(2);
bool ready = true;
LOG_INFO("{}: last two drops are {} and {} (the latest dropped)",
gpid_name,
nodes[0],
nodes[1]);
for (unsigned int i = 0; i < nodes.size(); ++i) {
node_state *ns = get_node_state(*view.nodes, nodes[i], false);
if (ns == nullptr || !ns->alive()) {
ready = false;
reason = "the last dropped node(" + nodes[i].to_std_string() +
") haven't come back yet";
LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
} else {
std::vector<dropped_replica>::iterator it = cc.find_from_dropped(nodes[i]);
if (it == cc.dropped.end() || it->ballot == invalid_ballot) {
if (ns->has_collected()) {
LOG_INFO("{}: ignore {}'s replica info as it doesn't exist on "
"replica server",
gpid_name,
nodes[i]);
collected_info[i] = {nodes[i], 0, -1, -1, -1};
} else {
ready = false;
reason = "the last dropped node(" + nodes[i].to_std_string() +
") is unavailable because ";
if (it == cc.dropped.end()) {
reason += "the node is not exist in dropped_nodes";
} else {
reason += "replica info has not been collected from the node";
}
LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
}
} else {
collected_info[i] = *it;
}
}
}
if (ready && collected_info[0].ballot == -1 && collected_info[1].ballot == -1) {
ready = false;
reason = "no replica info collected from the last two drops";
LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
}
if (ready) {
dropped_replica &previous_dead = collected_info[0];
dropped_replica &recent_dead = collected_info[1];
// 1. larger ballot should have larger committed decree
// 2. max_prepared_decree should larger than meta's committed decree
int64_t gap1 = previous_dead.ballot - recent_dead.ballot;
int64_t gap2 =
previous_dead.last_committed_decree - recent_dead.last_committed_decree;
if (gap1 * gap2 >= 0) {
int64_t larger_cd = std::max(previous_dead.last_committed_decree,
recent_dead.last_committed_decree);
int64_t larger_pd = std::max(previous_dead.last_prepared_decree,
recent_dead.last_prepared_decree);
if (larger_pd >= pc.last_committed_decree && larger_pd >= larger_cd) {
if (gap1 != 0) {
// 1. choose node with larger ballot
action.node = gap1 < 0 ? recent_dead.node : previous_dead.node;
} else if (gap2 != 0) {
// 2. choose node with larger last_committed_decree
action.node = gap2 < 0 ? recent_dead.node : previous_dead.node;
} else {
// 3. choose node with larger last_prepared_decree
action.node = previous_dead.last_prepared_decree >
recent_dead.last_prepared_decree
? previous_dead.node
: recent_dead.node;
}
LOG_INFO("{}: select {} as a new primary", gpid_name, action.node);
} else {
char buf[1000];
sprintf(buf,
"for the last two drops, larger_prepared_decree(%" PRId64 "), "
"last committed decree on meta(%" PRId64 "), "
"larger_committed_decree(%" PRId64 ")",
larger_pd,
pc.last_committed_decree,
larger_cd);
LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
}
} else {
reason = "for the last two drops, the node with larger ballot has smaller last "
"committed decree";
LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
}
}
}
if (!action.node.is_invalid()) {
action.target = action.node;
action.type = config_type::CT_ASSIGN_PRIMARY;
get_newly_partitions(*view.nodes, action.node)
->newly_add_primary(gpid.get_app_id(), false);
} else {
LOG_WARNING("{}: don't select any node for security reason, administrator can select "
"a proper one by shell",
gpid_name);
_recent_choose_primary_fail_count->increment();
ddd_partition_info pinfo;
pinfo.config = pc;
for (int i = 0; i < cc.dropped.size(); ++i) {
const dropped_replica &dr = cc.dropped[i];
ddd_node_info ninfo;
ninfo.node = dr.node;
ninfo.drop_time_ms = dr.time;
ninfo.ballot = invalid_ballot;
ninfo.last_committed_decree = invalid_decree;
ninfo.last_prepared_decree = invalid_decree;
node_state *ns = get_node_state(*view.nodes, dr.node, false);
if (ns != nullptr && ns->alive()) {
ninfo.is_alive = true;
if (ns->has_collected()) {
ninfo.is_collected = true;
ninfo.ballot = dr.ballot;
ninfo.last_committed_decree = dr.last_committed_decree;
ninfo.last_prepared_decree = dr.last_prepared_decree;
}
}
pinfo.dropped.emplace_back(std::move(ninfo));
}
pinfo.reason = reason;
set_ddd_partition(std::move(pinfo));
}
result = pc_status::dead;
}
if (action.type != config_type::CT_INVALID) {
acts.assign_cure_proposal(action);
}
return result;
}