in src/meta/server_state.cpp [1530:1659]
void server_state::update_configuration_locally(
app_state &app, std::shared_ptr<configuration_update_request> &config_request)
{
dsn::gpid &gpid = config_request->config.pid;
partition_configuration &old_cfg = app.partitions[gpid.get_partition_index()];
partition_configuration &new_cfg = config_request->config;
int min_2pc_count =
_meta_svc->get_options().app_mutation_2pc_min_replica_count(app.max_replica_count);
health_status old_health_status = partition_health_status(old_cfg, min_2pc_count);
health_status new_health_status = partition_health_status(new_cfg, min_2pc_count);
if (app.is_stateful) {
CHECK(old_cfg.ballot == invalid_ballot || old_cfg.ballot + 1 == new_cfg.ballot,
"invalid configuration update request, old ballot {}, new ballot {}",
old_cfg.ballot,
new_cfg.ballot);
node_state *ns = nullptr;
if (config_request->type != config_type::CT_DROP_PARTITION) {
ns = get_node_state(_nodes, config_request->node, false);
CHECK_NOTNULL(ns, "invalid node address, address = {}", config_request->node);
}
#ifndef NDEBUG
request_check(old_cfg, *config_request);
#endif
switch (config_request->type) {
case config_type::CT_ASSIGN_PRIMARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
ns->put_partition(gpid, true);
break;
case config_type::CT_UPGRADE_TO_SECONDARY:
ns->put_partition(gpid, false);
break;
case config_type::CT_DOWNGRADE_TO_SECONDARY:
ns->remove_partition(gpid, true);
break;
case config_type::CT_DOWNGRADE_TO_INACTIVE:
case config_type::CT_REMOVE:
ns->remove_partition(gpid, false);
break;
// nothing to handle, the ballot will updated in below
case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT:
break;
case config_type::CT_DROP_PARTITION:
for (const rpc_address &node : new_cfg.last_drops) {
ns = get_node_state(_nodes, node, false);
if (ns != nullptr)
ns->remove_partition(gpid, false);
}
break;
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
CHECK(false, "invalid execution work flow");
break;
case config_type::CT_REGISTER_CHILD: {
ns->put_partition(gpid, true);
for (auto &secondary : config_request->config.secondaries) {
auto secondary_node = get_node_state(_nodes, secondary, false);
secondary_node->put_partition(gpid, false);
}
break;
}
default:
CHECK(false, "");
break;
}
} else {
CHECK_EQ(old_cfg.ballot, new_cfg.ballot);
new_cfg = old_cfg;
partition_configuration_stateless pcs(new_cfg);
if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
pcs.hosts().emplace_back(config_request->host_node);
pcs.workers().emplace_back(config_request->node);
} else {
auto it =
std::remove(pcs.hosts().begin(), pcs.hosts().end(), config_request->host_node);
pcs.hosts().erase(it);
it = std::remove(pcs.workers().begin(), pcs.workers().end(), config_request->node);
pcs.workers().erase(it);
}
auto it = _nodes.find(config_request->host_node);
CHECK(it != _nodes.end(), "invalid node address, address = {}", config_request->host_node);
if (config_type::CT_REMOVE == config_request->type) {
it->second.remove_partition(gpid, false);
} else {
it->second.put_partition(gpid, false);
}
}
// we assume config in config_request stores the proper new config
// as we sync to remote storage according to it
std::string old_config_str = boost::lexical_cast<std::string>(old_cfg);
old_cfg = config_request->config;
auto find_name = _config_type_VALUES_TO_NAMES.find(config_request->type);
if (find_name != _config_type_VALUES_TO_NAMES.end()) {
LOG_INFO("meta update config ok: type({}), old_config={}, {}",
find_name->second,
old_config_str,
boost::lexical_cast<std::string>(*config_request));
} else {
LOG_INFO("meta update config ok: type({}), old_config={}, {}",
config_request->type,
old_config_str,
boost::lexical_cast<std::string>(*config_request));
}
#ifndef NDEBUG
check_consistency(gpid);
#endif
if (_config_change_subscriber) {
_config_change_subscriber(_all_apps);
}
_recent_update_config_count->increment();
if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
_recent_partition_change_unwritable_count->increment();
}
if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
_recent_partition_change_writable_count->increment();
}
}