void partition_resolver_simple::query_config_reply()

in src/client/partition_resolver_simple.cpp [271:389]


void partition_resolver_simple::query_config_reply(error_code err,
                                                   dsn::message_ex *request,
                                                   dsn::message_ex *response,
                                                   int partition_index)
{
    auto client_err = ERR_OK;

    if (err == ERR_OK) {
        query_cfg_response resp;
        unmarshall(response, resp);
        if (resp.err == ERR_OK) {
            zauto_write_lock l(_config_lock);

            if (_app_id != -1 && _app_id != resp.app_id) {
                LOG_WARNING(
                    "app id is changed (mostly the app was removed and created with the same "
                    "name), local vs remote: {} vs {} ",
                    _app_id,
                    resp.app_id);
            }
            if (_app_partition_count != -1 && _app_partition_count != resp.partition_count &&
                _app_partition_count * 2 != resp.partition_count &&
                _app_partition_count != resp.partition_count * 2) {
                LOG_WARNING("partition count is changed (mostly the app was removed and created "
                            "with the same name), local vs remote: {} vs {} ",
                            _app_partition_count,
                            resp.partition_count);
            }
            _app_id = resp.app_id;
            _app_partition_count = resp.partition_count;
            _app_is_stateful = resp.is_stateful;

            for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
                auto &new_config = *it;

                LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {}, primary = {}",
                                 new_config.pid,
                                 new_config.ballot,
                                 new_config.primary);

                auto it2 = _config_cache.find(new_config.pid.get_partition_index());
                if (it2 == _config_cache.end()) {
                    std::unique_ptr<partition_info> pi(new partition_info);
                    pi->timeout_count = 0;
                    pi->config = new_config;
                    _config_cache.emplace(new_config.pid.get_partition_index(), std::move(pi));
                } else if (_app_is_stateful && it2->second->config.ballot < new_config.ballot) {
                    it2->second->timeout_count = 0;
                    it2->second->config = new_config;
                } else if (!_app_is_stateful) {
                    it2->second->timeout_count = 0;
                    it2->second->config = new_config;
                } else {
                    // nothing to do
                }
            }
        } else if (resp.err == ERR_OBJECT_NOT_FOUND) {
            LOG_ERROR_PREFIX(
                "query config reply, gpid = {}.{}, err = {}", _app_id, partition_index, resp.err);

            client_err = ERR_APP_NOT_EXIST;
        } else {
            LOG_ERROR_PREFIX(
                "query config reply, gpid = {}.{}, err = {}", _app_id, partition_index, resp.err);

            client_err = resp.err;
        }
    } else {
        LOG_ERROR_PREFIX(
            "query config reply, gpid = {}.{}, err = {}", _app_id, partition_index, err);
    }

    // get specific or all partition update
    if (partition_index != -1) {
        partition_context *pc = nullptr;
        {
            zauto_lock l(_requests_lock);
            auto it = _pending_requests.find(partition_index);
            if (it != _pending_requests.end()) {
                pc = it->second;
                _pending_requests.erase(partition_index);
            }
        }

        if (pc) {
            handle_pending_requests(pc->requests, client_err);
            delete pc;
        }
    }

    // get all partition update
    else {
        pending_replica_requests reqs;
        std::deque<request_context_ptr> reqs2;
        {
            zauto_lock l(_requests_lock);
            reqs.swap(_pending_requests);
            reqs2.swap(_pending_requests_before_partition_count_unknown);
        }

        if (!reqs2.empty()) {
            if (_app_partition_count != -1) {
                for (auto &req : reqs2) {
                    CHECK_EQ(req->partition_index, -1);
                    req->partition_index =
                        get_partition_index(_app_partition_count, req->partition_hash);
                }
            }
            handle_pending_requests(reqs2, client_err);
        }

        for (auto &r : reqs) {
            if (r.second) {
                handle_pending_requests(r.second->requests, client_err);
                delete r.second;
            }
        }
    }
}