void server_state::update_configuration_locally()

in src/meta/server_state.cpp [1766:1917]


void server_state::update_configuration_locally(
    app_state &app, std::shared_ptr<configuration_update_request> &config_request)
{
    dsn::gpid &gpid = config_request->config.pid;
    partition_configuration &old_pc = app.pcs[gpid.get_partition_index()];
    partition_configuration &new_pc = config_request->config;

    int min_2pc_count =
        _meta_svc->get_options().app_mutation_2pc_min_replica_count(app.max_replica_count);
    health_status old_health_status = partition_health_status(old_pc, min_2pc_count);
    health_status new_health_status = partition_health_status(new_pc, min_2pc_count);

    host_port node;
    GET_HOST_PORT(*config_request, node, node);

    if (app.is_stateful) {
        CHECK(old_pc.ballot == invalid_ballot || old_pc.ballot + 1 == new_pc.ballot,
              "invalid configuration update request, old ballot {}, new ballot {}",
              old_pc.ballot,
              new_pc.ballot);

        node_state *ns = nullptr;
        if (config_request->type != config_type::CT_DROP_PARTITION) {
            ns = get_node_state(_nodes, node, false);
            CHECK_NOTNULL(ns, "invalid node: {}", node);
        }
#ifndef NDEBUG
        request_check(old_pc, *config_request);
#endif
        switch (config_request->type) {
        case config_type::CT_ASSIGN_PRIMARY:
        case config_type::CT_UPGRADE_TO_PRIMARY:
            ns->put_partition(gpid, true);
            break;

        case config_type::CT_UPGRADE_TO_SECONDARY:
            ns->put_partition(gpid, false);
            break;

        case config_type::CT_DOWNGRADE_TO_SECONDARY:
            ns->remove_partition(gpid, true);
            break;

        case config_type::CT_DOWNGRADE_TO_INACTIVE:
        case config_type::CT_REMOVE:
            ns->remove_partition(gpid, false);
            break;
        // nothing to handle, the ballot will updated in below
        case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT:
            break;

        case config_type::CT_DROP_PARTITION: {
            for (const auto &last_drop : new_pc.hp_last_drops) {
                ns = get_node_state(_nodes, last_drop, false);
                if (ns != nullptr) {
                    ns->remove_partition(gpid, false);
                }
            }
            break;
        }
        case config_type::CT_ADD_SECONDARY:
        case config_type::CT_ADD_SECONDARY_FOR_LB:
            CHECK(false, "invalid execution work flow");
            break;
        case config_type::CT_REGISTER_CHILD: {
            ns->put_partition(gpid, true);
            // TODO(yingchun): optimize the duplicate loops.
            if (config_request->config.__isset.hp_secondaries) {
                for (const auto &secondary : config_request->config.hp_secondaries) {
                    auto *secondary_node = get_node_state(_nodes, secondary, false);
                    secondary_node->put_partition(gpid, false);
                }
            } else {
                for (const auto &secondary : config_request->config.secondaries) {
                    const auto hp = host_port::from_address(secondary);
                    if (!hp) {
                        LOG_ERROR("The registering secondary {} for pid {} can no be reverse "
                                  "resolved, skip registering it, please check the network "
                                  "configuration",
                                  secondary,
                                  config_request->config.pid);
                        continue;
                    }
                    auto secondary_node = get_node_state(_nodes, hp, false);
                    secondary_node->put_partition(gpid, false);
                }
            }
            break;
        }
        default:
            CHECK(false, "");
            break;
        }
    } else {
        CHECK_EQ(old_pc.ballot, new_pc.ballot);
        const auto host_node = host_port::from_address(config_request->host_node);
        // The non-stateful app is just for testing, so just check the host_node is resolvable.
        CHECK(host_node, "'{}' can not be reverse resolved", config_request->host_node);
        new_pc = old_pc;
        partition_configuration_stateless pcs(new_pc);
        if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
            pcs.hosts().emplace_back(host_node);
            pcs.workers().emplace_back(node);
        } else {
            auto it = std::remove(pcs.hosts().begin(), pcs.hosts().end(), host_node);
            pcs.hosts().erase(it);

            it = std::remove(pcs.workers().begin(), pcs.workers().end(), node);
            pcs.workers().erase(it);
        }

        auto it = _nodes.find(host_node);
        CHECK(it != _nodes.end(), "invalid node: {}", host_node);
        if (config_type::CT_REMOVE == config_request->type) {
            it->second.remove_partition(gpid, false);
        } else {
            it->second.put_partition(gpid, false);
        }
    }

    // we assume config in config_request stores the proper new config
    // as we sync to remote storage according to it
    std::string old_config_str = boost::lexical_cast<std::string>(old_pc);
    old_pc = config_request->config;
    auto find_name = _config_type_VALUES_TO_NAMES.find(config_request->type);
    if (find_name != _config_type_VALUES_TO_NAMES.end()) {
        LOG_INFO("meta update config ok: type({}), old_config={}, {}",
                 find_name->second,
                 old_config_str,
                 boost::lexical_cast<std::string>(*config_request));
    } else {
        LOG_INFO("meta update config ok: type({}), old_config={}, {}",
                 config_request->type,
                 old_config_str,
                 boost::lexical_cast<std::string>(*config_request));
    }

#ifndef NDEBUG
    check_consistency(gpid);
#endif
    if (_config_change_subscriber) {
        _config_change_subscriber(_all_apps);
    }

    METRIC_INCREMENT(_table_metric_entities, partition_configuration_changes, gpid);
    if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
        METRIC_INCREMENT(_table_metric_entities, unwritable_partition_changes, gpid);
    }
    if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
        METRIC_INCREMENT(_table_metric_entities, writable_partition_changes, gpid);
    }
}