in src/meta/partition_guardian.cpp [512:664]
pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::gpid &gpid)
{
partition_configuration &pc = *get_config(*(view.apps), gpid);
config_context &cc = *get_config_context(*(view.apps), gpid);
configuration_proposal_action action;
bool is_emergency = false;
if (cc.pc->max_replica_count >
_svc->get_options().app_mutation_2pc_min_replica_count(pc.max_replica_count) &&
replica_count(pc) <
_svc->get_options().app_mutation_2pc_min_replica_count(pc.max_replica_count)) {
// ATTENTION:
// when max_replica_count == 2, even if there is only 1 replica alive now, we will still
// wait for '_replica_assign_delay_ms_for_dropouts' before recover the second replica.
is_emergency = true;
LOG_INFO("gpid({}): is emergency due to too few replicas", gpid);
} else if (cc.dropped.empty()) {
is_emergency = true;
LOG_INFO("gpid({}): is emergency due to no dropped candidate", gpid);
} else if (has_milliseconds_expired(cc.dropped.back().time +
_replica_assign_delay_ms_for_dropouts)) {
is_emergency = true;
char time_buf[30] = {0};
::dsn::utils::time_ms_to_string(cc.dropped.back().time, time_buf);
LOG_INFO("gpid({}): is emergency due to lose secondary for a long time, "
"last_dropped_node({}), drop_time({}), delay_ms({})",
gpid,
cc.dropped.back().node,
time_buf,
_replica_assign_delay_ms_for_dropouts);
} else if (in_black_list(cc.dropped.back().node)) {
LOG_INFO("gpid({}) is emergency due to recent dropped({}) is in black list",
gpid,
cc.dropped.back().node);
is_emergency = true;
}
RESET_IP_AND_HOST_PORT(action, node);
if (is_emergency) {
std::ostringstream oss;
for (int i = 0; i < cc.dropped.size(); ++i) {
if (i != 0)
oss << ",";
oss << cc.dropped[i].node;
}
LOG_INFO(
"gpid({}): try to choose node in dropped list, dropped_list({}), prefered_dropped({})",
gpid,
oss.str(),
cc.prefered_dropped);
if (cc.prefered_dropped < 0 || cc.prefered_dropped >= (int)cc.dropped.size()) {
LOG_INFO("gpid({}): prefered_dropped({}) is invalid according to drop_list(size {}), "
"reset it to {} (drop_list.size - 1)",
gpid,
cc.prefered_dropped,
cc.dropped.size(),
cc.dropped.size() - 1);
cc.prefered_dropped = (int)cc.dropped.size() - 1;
}
while (cc.prefered_dropped >= 0) {
const dropped_replica &server = cc.dropped[cc.prefered_dropped];
if (is_node_alive(*view.nodes, server.node)) {
LOG_INFO("gpid({}): node({}) at cc.dropped[{}] is alive now, choose it, "
"and forward prefered_dropped from {} to {}",
gpid,
server.node,
cc.prefered_dropped,
cc.prefered_dropped,
cc.prefered_dropped - 1);
SET_IP_AND_HOST_PORT_BY_DNS(action, node, server.node);
cc.prefered_dropped--;
break;
} else {
LOG_INFO("gpid({}): node({}) at cc.dropped[{}] is not alive now, "
"changed prefered_dropped from {} to {}",
gpid,
server.node,
cc.prefered_dropped,
cc.prefered_dropped,
cc.prefered_dropped - 1);
cc.prefered_dropped--;
}
}
host_port node;
GET_HOST_PORT(action, node, node);
if (!node || in_black_list(node)) {
if (node) {
LOG_INFO(
"gpid({}) refuse to use selected node({}) as it is in black list", gpid, node);
}
newly_partitions *min_server_np = nullptr;
for (auto &[_, ns] : *view.nodes) {
if (!ns.alive() || is_member(pc, ns.host_port()) || in_black_list(ns.host_port())) {
continue;
}
newly_partitions *np = newly_partitions_ext::get_inited(&ns);
if (min_server_np == nullptr ||
np->less_partitions(*min_server_np, gpid.get_app_id())) {
SET_IP_AND_HOST_PORT_BY_DNS(action, node, ns.host_port());
min_server_np = np;
}
}
// Use the action.hp_node after being updated.
if (action.hp_node) {
LOG_INFO("gpid({}): can't find valid node in dropped list to add as secondary, "
"choose new node({}) with minimal partitions serving",
gpid,
action.hp_node);
} else {
LOG_INFO("gpid({}): can't find valid node in dropped list to add as secondary, "
"but also we can't find a new node to add as secondary",
gpid);
}
}
} else {
// if not emergency, only try to recover last dropped server
const dropped_replica &server = cc.dropped.back();
if (is_node_alive(*view.nodes, server.node)) {
CHECK(server.node, "invalid server address, address = {}", server.node);
SET_IP_AND_HOST_PORT_BY_DNS(action, node, server.node);
}
// Use the action.hp_node after being updated.
if (action.hp_node) {
LOG_INFO("gpid({}): choose node({}) as secondary coz it is last_dropped_node and is "
"alive now",
gpid,
server.node);
} else {
LOG_INFO("gpid({}): can't add secondary coz last_dropped_node({}) is not alive now, "
"ignore this as not in emergency",
gpid,
server.node);
}
}
// Use the action.hp_node after being updated.
if (action.hp_node) {
action.type = config_type::CT_ADD_SECONDARY;
SET_OBJ_IP_AND_HOST_PORT(action, target, pc, primary);
newly_partitions *np = get_newly_partitions(*(view.nodes), action.hp_node);
CHECK_NOTNULL(np, "");
np->newly_add_partition(gpid.get_app_id());
cc.lb_actions.assign_cure_proposal(action);
}
return pc_status::ill;
}