in src/meta/server_state.cpp [2470:2644]
bool server_state::check_all_partitions()
{
int healthy_partitions = 0;
int total_partitions = 0;
meta_function_level::type level = _meta_svc->get_function_level();
zauto_write_lock l(_lock);
update_partition_perf_counter();
// first the cure stage
if (level <= meta_function_level::fl_freezed) {
LOG_INFO("service is in level({}), don't do any cure or balancer actions",
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
return false;
}
LOG_INFO("start to check all partitions, add_secondary_enable_flow_control = {}, "
"add_secondary_max_count_for_one_node = {}",
_add_secondary_enable_flow_control ? "true" : "false",
_add_secondary_max_count_for_one_node);
_meta_svc->get_partition_guardian()->clear_ddd_partitions();
int send_proposal_count = 0;
std::vector<configuration_proposal_action> add_secondary_actions;
std::vector<gpid> add_secondary_gpids;
std::vector<bool> add_secondary_proposed;
std::map<rpc_address, int> add_secondary_running_nodes; // node --> running_count
for (auto &app_pair : _exist_apps) {
std::shared_ptr<app_state> &app = app_pair.second;
if (app->status == app_status::AS_CREATING || app->status == app_status::AS_DROPPING) {
LOG_INFO("ignore app({})({}) because it's status is {}",
app->app_name,
app->app_id,
::dsn::enum_to_string(app->status));
continue;
}
for (unsigned int i = 0; i != app->partition_count; ++i) {
partition_configuration &pc = app->partitions[i];
config_context &cc = app->helpers->contexts[i];
// partition is under re-configuration or is child partition
if (cc.stage != config_status::pending_remote_sync && pc.ballot != invalid_ballot) {
configuration_proposal_action action;
pc_status s = _meta_svc->get_partition_guardian()->cure(
{&_all_apps, &_nodes}, pc.pid, action);
LOG_DEBUG("gpid({}) is in status({})", pc.pid, enum_to_string(s));
if (pc_status::healthy != s) {
if (action.type != config_type::CT_INVALID) {
if (action.type == config_type::CT_ADD_SECONDARY ||
action.type == config_type::CT_ADD_SECONDARY_FOR_LB) {
add_secondary_actions.push_back(std::move(action));
add_secondary_gpids.push_back(pc.pid);
add_secondary_proposed.push_back(false);
} else {
send_proposal(action, pc, *app);
send_proposal_count++;
}
}
} else {
healthy_partitions++;
}
} else {
LOG_INFO("ignore gpid({}) as it's stage is pending_remote_sync", pc.pid);
}
}
total_partitions += app->partition_count;
}
// assign secondary for urgent
for (int i = 0; i < add_secondary_actions.size(); ++i) {
gpid &pid = add_secondary_gpids[i];
partition_configuration &pc = *get_config(_all_apps, pid);
if (!add_secondary_proposed[i] && pc.secondaries.empty()) {
configuration_proposal_action &action = add_secondary_actions[i];
if (_add_secondary_enable_flow_control &&
add_secondary_running_nodes[action.node] >= _add_secondary_max_count_for_one_node) {
// ignore
continue;
}
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
send_proposal(action, pc, *app);
send_proposal_count++;
add_secondary_proposed[i] = true;
add_secondary_running_nodes[action.node]++;
}
}
// assign secondary for all
for (int i = 0; i < add_secondary_actions.size(); ++i) {
if (!add_secondary_proposed[i]) {
configuration_proposal_action &action = add_secondary_actions[i];
gpid pid = add_secondary_gpids[i];
partition_configuration &pc = *get_config(_all_apps, pid);
if (_add_secondary_enable_flow_control &&
add_secondary_running_nodes[action.node] >= _add_secondary_max_count_for_one_node) {
LOG_INFO("do not send {} proposal for gpid({}) for flow control reason, target = "
"{}, node = {}",
::dsn::enum_to_string(action.type),
pc.pid,
action.target,
action.node);
continue;
}
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
send_proposal(action, pc, *app);
send_proposal_count++;
add_secondary_proposed[i] = true;
add_secondary_running_nodes[action.node]++;
}
}
int ignored_add_secondary_count = 0;
int add_secondary_count = 0;
for (int i = 0; i < add_secondary_actions.size(); ++i) {
if (!add_secondary_proposed[i]) {
ignored_add_secondary_count++;
} else {
add_secondary_count++;
}
}
LOG_INFO("check all partitions done, send_proposal_count = {}, add_secondary_count = {}, "
"ignored_add_secondary_count = {}",
send_proposal_count,
add_secondary_count,
ignored_add_secondary_count);
// then the balancer stage
if (level < meta_function_level::fl_steady) {
LOG_INFO("don't do replica migration coz meta server is in level({})",
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
return false;
}
if (healthy_partitions != total_partitions) {
LOG_INFO("don't do replica migration coz {}/{} partitions aren't healthy",
total_partitions - healthy_partitions,
total_partitions);
return false;
}
if (!can_run_balancer()) {
LOG_INFO("don't do replica migration coz can_run_balancer() returns false");
return false;
}
if (level == meta_function_level::fl_steady) {
LOG_INFO("check if any replica migration can be done when meta server is in level({})",
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
_meta_svc->get_balancer()->check({&_all_apps, &_nodes}, _temporary_list);
LOG_INFO("balance checker operation count = {}", _temporary_list.size());
// update balance checker operation count
_meta_svc->get_balancer()->report(_temporary_list, true);
return false;
}
if (_meta_svc->get_balancer()->balance({&_all_apps, &_nodes}, _temporary_list)) {
LOG_INFO("try to do replica migration");
_meta_svc->get_balancer()->apply_balancer({&_all_apps, &_nodes}, _temporary_list);
// update balancer action details
_meta_svc->get_balancer()->report(_temporary_list, false);
if (_replica_migration_subscriber)
_replica_migration_subscriber(_temporary_list);
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&meta_service::balancer_run, _meta_svc));
return false;
}
LOG_INFO("check if any replica migration left");
_meta_svc->get_balancer()->check({&_all_apps, &_nodes}, _temporary_list);
LOG_INFO("balance checker operation count = {}", _temporary_list.size());
// update balance checker operation count
_meta_svc->get_balancer()->report(_temporary_list, true);
return true;
}