in src/client/partition_resolver_simple.cpp [271:389]
void partition_resolver_simple::query_config_reply(error_code err,
dsn::message_ex *request,
dsn::message_ex *response,
int partition_index)
{
auto client_err = ERR_OK;
if (err == ERR_OK) {
query_cfg_response resp;
unmarshall(response, resp);
if (resp.err == ERR_OK) {
zauto_write_lock l(_config_lock);
if (_app_id != -1 && _app_id != resp.app_id) {
LOG_WARNING(
"app id is changed (mostly the app was removed and created with the same "
"name), local vs remote: {} vs {} ",
_app_id,
resp.app_id);
}
if (_app_partition_count != -1 && _app_partition_count != resp.partition_count &&
_app_partition_count * 2 != resp.partition_count &&
_app_partition_count != resp.partition_count * 2) {
LOG_WARNING("partition count is changed (mostly the app was removed and created "
"with the same name), local vs remote: {} vs {} ",
_app_partition_count,
resp.partition_count);
}
_app_id = resp.app_id;
_app_partition_count = resp.partition_count;
_app_is_stateful = resp.is_stateful;
for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
auto &new_config = *it;
LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {}, primary = {}",
new_config.pid,
new_config.ballot,
new_config.primary);
auto it2 = _config_cache.find(new_config.pid.get_partition_index());
if (it2 == _config_cache.end()) {
std::unique_ptr<partition_info> pi(new partition_info);
pi->timeout_count = 0;
pi->config = new_config;
_config_cache.emplace(new_config.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->config.ballot < new_config.ballot) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
} else if (!_app_is_stateful) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
} else {
// nothing to do
}
}
} else if (resp.err == ERR_OBJECT_NOT_FOUND) {
LOG_ERROR_PREFIX(
"query config reply, gpid = {}.{}, err = {}", _app_id, partition_index, resp.err);
client_err = ERR_APP_NOT_EXIST;
} else {
LOG_ERROR_PREFIX(
"query config reply, gpid = {}.{}, err = {}", _app_id, partition_index, resp.err);
client_err = resp.err;
}
} else {
LOG_ERROR_PREFIX(
"query config reply, gpid = {}.{}, err = {}", _app_id, partition_index, err);
}
// get specific or all partition update
if (partition_index != -1) {
partition_context *pc = nullptr;
{
zauto_lock l(_requests_lock);
auto it = _pending_requests.find(partition_index);
if (it != _pending_requests.end()) {
pc = it->second;
_pending_requests.erase(partition_index);
}
}
if (pc) {
handle_pending_requests(pc->requests, client_err);
delete pc;
}
}
// get all partition update
else {
pending_replica_requests reqs;
std::deque<request_context_ptr> reqs2;
{
zauto_lock l(_requests_lock);
reqs.swap(_pending_requests);
reqs2.swap(_pending_requests_before_partition_count_unknown);
}
if (!reqs2.empty()) {
if (_app_partition_count != -1) {
for (auto &req : reqs2) {
CHECK_EQ(req->partition_index, -1);
req->partition_index =
get_partition_index(_app_partition_count, req->partition_hash);
}
}
handle_pending_requests(reqs2, client_err);
}
for (auto &r : reqs) {
if (r.second) {
handle_pending_requests(r.second->requests, client_err);
delete r.second;
}
}
}
}