void server_state::update_configuration_locally()

in src/meta/server_state.cpp [1530:1659]


void server_state::update_configuration_locally(
    app_state &app, std::shared_ptr<configuration_update_request> &config_request)
{
    dsn::gpid &gpid = config_request->config.pid;
    partition_configuration &old_cfg = app.partitions[gpid.get_partition_index()];
    partition_configuration &new_cfg = config_request->config;

    int min_2pc_count =
        _meta_svc->get_options().app_mutation_2pc_min_replica_count(app.max_replica_count);
    health_status old_health_status = partition_health_status(old_cfg, min_2pc_count);
    health_status new_health_status = partition_health_status(new_cfg, min_2pc_count);

    if (app.is_stateful) {
        CHECK(old_cfg.ballot == invalid_ballot || old_cfg.ballot + 1 == new_cfg.ballot,
              "invalid configuration update request, old ballot {}, new ballot {}",
              old_cfg.ballot,
              new_cfg.ballot);

        node_state *ns = nullptr;
        if (config_request->type != config_type::CT_DROP_PARTITION) {
            ns = get_node_state(_nodes, config_request->node, false);
            CHECK_NOTNULL(ns, "invalid node address, address = {}", config_request->node);
        }
#ifndef NDEBUG
        request_check(old_cfg, *config_request);
#endif
        switch (config_request->type) {
        case config_type::CT_ASSIGN_PRIMARY:
        case config_type::CT_UPGRADE_TO_PRIMARY:
            ns->put_partition(gpid, true);
            break;

        case config_type::CT_UPGRADE_TO_SECONDARY:
            ns->put_partition(gpid, false);
            break;

        case config_type::CT_DOWNGRADE_TO_SECONDARY:
            ns->remove_partition(gpid, true);
            break;

        case config_type::CT_DOWNGRADE_TO_INACTIVE:
        case config_type::CT_REMOVE:
            ns->remove_partition(gpid, false);
            break;
        // nothing to handle, the ballot will updated in below
        case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT:
            break;

        case config_type::CT_DROP_PARTITION:
            for (const rpc_address &node : new_cfg.last_drops) {
                ns = get_node_state(_nodes, node, false);
                if (ns != nullptr)
                    ns->remove_partition(gpid, false);
            }
            break;

        case config_type::CT_ADD_SECONDARY:
        case config_type::CT_ADD_SECONDARY_FOR_LB:
            CHECK(false, "invalid execution work flow");
            break;
        case config_type::CT_REGISTER_CHILD: {
            ns->put_partition(gpid, true);
            for (auto &secondary : config_request->config.secondaries) {
                auto secondary_node = get_node_state(_nodes, secondary, false);
                secondary_node->put_partition(gpid, false);
            }
            break;
        }
        default:
            CHECK(false, "");
            break;
        }
    } else {
        CHECK_EQ(old_cfg.ballot, new_cfg.ballot);

        new_cfg = old_cfg;
        partition_configuration_stateless pcs(new_cfg);
        if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
            pcs.hosts().emplace_back(config_request->host_node);
            pcs.workers().emplace_back(config_request->node);
        } else {
            auto it =
                std::remove(pcs.hosts().begin(), pcs.hosts().end(), config_request->host_node);
            pcs.hosts().erase(it);

            it = std::remove(pcs.workers().begin(), pcs.workers().end(), config_request->node);
            pcs.workers().erase(it);
        }

        auto it = _nodes.find(config_request->host_node);
        CHECK(it != _nodes.end(), "invalid node address, address = {}", config_request->host_node);
        if (config_type::CT_REMOVE == config_request->type) {
            it->second.remove_partition(gpid, false);
        } else {
            it->second.put_partition(gpid, false);
        }
    }

    // we assume config in config_request stores the proper new config
    // as we sync to remote storage according to it
    std::string old_config_str = boost::lexical_cast<std::string>(old_cfg);
    old_cfg = config_request->config;
    auto find_name = _config_type_VALUES_TO_NAMES.find(config_request->type);
    if (find_name != _config_type_VALUES_TO_NAMES.end()) {
        LOG_INFO("meta update config ok: type({}), old_config={}, {}",
                 find_name->second,
                 old_config_str,
                 boost::lexical_cast<std::string>(*config_request));
    } else {
        LOG_INFO("meta update config ok: type({}), old_config={}, {}",
                 config_request->type,
                 old_config_str,
                 boost::lexical_cast<std::string>(*config_request));
    }

#ifndef NDEBUG
    check_consistency(gpid);
#endif
    if (_config_change_subscriber) {
        _config_change_subscriber(_all_apps);
    }

    _recent_update_config_count->increment();
    if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
        _recent_partition_change_unwritable_count->increment();
    }
    if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
        _recent_partition_change_writable_count->increment();
    }
}