in src/meta/server_state.cpp [1766:1917]
void server_state::update_configuration_locally(
app_state &app, std::shared_ptr<configuration_update_request> &config_request)
{
dsn::gpid &gpid = config_request->config.pid;
partition_configuration &old_pc = app.pcs[gpid.get_partition_index()];
partition_configuration &new_pc = config_request->config;
int min_2pc_count =
_meta_svc->get_options().app_mutation_2pc_min_replica_count(app.max_replica_count);
health_status old_health_status = partition_health_status(old_pc, min_2pc_count);
health_status new_health_status = partition_health_status(new_pc, min_2pc_count);
host_port node;
GET_HOST_PORT(*config_request, node, node);
if (app.is_stateful) {
CHECK(old_pc.ballot == invalid_ballot || old_pc.ballot + 1 == new_pc.ballot,
"invalid configuration update request, old ballot {}, new ballot {}",
old_pc.ballot,
new_pc.ballot);
node_state *ns = nullptr;
if (config_request->type != config_type::CT_DROP_PARTITION) {
ns = get_node_state(_nodes, node, false);
CHECK_NOTNULL(ns, "invalid node: {}", node);
}
#ifndef NDEBUG
request_check(old_pc, *config_request);
#endif
switch (config_request->type) {
case config_type::CT_ASSIGN_PRIMARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
ns->put_partition(gpid, true);
break;
case config_type::CT_UPGRADE_TO_SECONDARY:
ns->put_partition(gpid, false);
break;
case config_type::CT_DOWNGRADE_TO_SECONDARY:
ns->remove_partition(gpid, true);
break;
case config_type::CT_DOWNGRADE_TO_INACTIVE:
case config_type::CT_REMOVE:
ns->remove_partition(gpid, false);
break;
// nothing to handle, the ballot will updated in below
case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT:
break;
case config_type::CT_DROP_PARTITION: {
for (const auto &last_drop : new_pc.hp_last_drops) {
ns = get_node_state(_nodes, last_drop, false);
if (ns != nullptr) {
ns->remove_partition(gpid, false);
}
}
break;
}
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
CHECK(false, "invalid execution work flow");
break;
case config_type::CT_REGISTER_CHILD: {
ns->put_partition(gpid, true);
// TODO(yingchun): optimize the duplicate loops.
if (config_request->config.__isset.hp_secondaries) {
for (const auto &secondary : config_request->config.hp_secondaries) {
auto *secondary_node = get_node_state(_nodes, secondary, false);
secondary_node->put_partition(gpid, false);
}
} else {
for (const auto &secondary : config_request->config.secondaries) {
const auto hp = host_port::from_address(secondary);
if (!hp) {
LOG_ERROR("The registering secondary {} for pid {} can no be reverse "
"resolved, skip registering it, please check the network "
"configuration",
secondary,
config_request->config.pid);
continue;
}
auto secondary_node = get_node_state(_nodes, hp, false);
secondary_node->put_partition(gpid, false);
}
}
break;
}
default:
CHECK(false, "");
break;
}
} else {
CHECK_EQ(old_pc.ballot, new_pc.ballot);
const auto host_node = host_port::from_address(config_request->host_node);
// The non-stateful app is just for testing, so just check the host_node is resolvable.
CHECK(host_node, "'{}' can not be reverse resolved", config_request->host_node);
new_pc = old_pc;
partition_configuration_stateless pcs(new_pc);
if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
pcs.hosts().emplace_back(host_node);
pcs.workers().emplace_back(node);
} else {
auto it = std::remove(pcs.hosts().begin(), pcs.hosts().end(), host_node);
pcs.hosts().erase(it);
it = std::remove(pcs.workers().begin(), pcs.workers().end(), node);
pcs.workers().erase(it);
}
auto it = _nodes.find(host_node);
CHECK(it != _nodes.end(), "invalid node: {}", host_node);
if (config_type::CT_REMOVE == config_request->type) {
it->second.remove_partition(gpid, false);
} else {
it->second.put_partition(gpid, false);
}
}
// we assume config in config_request stores the proper new config
// as we sync to remote storage according to it
std::string old_config_str = boost::lexical_cast<std::string>(old_pc);
old_pc = config_request->config;
auto find_name = _config_type_VALUES_TO_NAMES.find(config_request->type);
if (find_name != _config_type_VALUES_TO_NAMES.end()) {
LOG_INFO("meta update config ok: type({}), old_config={}, {}",
find_name->second,
old_config_str,
boost::lexical_cast<std::string>(*config_request));
} else {
LOG_INFO("meta update config ok: type({}), old_config={}, {}",
config_request->type,
old_config_str,
boost::lexical_cast<std::string>(*config_request));
}
#ifndef NDEBUG
check_consistency(gpid);
#endif
if (_config_change_subscriber) {
_config_change_subscriber(_all_apps);
}
METRIC_INCREMENT(_table_metric_entities, partition_configuration_changes, gpid);
if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
METRIC_INCREMENT(_table_metric_entities, unwritable_partition_changes, gpid);
}
if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
METRIC_INCREMENT(_table_metric_entities, writable_partition_changes, gpid);
}
}