in src/replica/replica_learn.cpp [364:570]
void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
{
_checker.only_one_thread_access();
learn_response response;
if (partition_status::PS_PRIMARY != status()) {
response.err = (partition_status::PS_INACTIVE == status() && _inactive_is_transient)
? ERR_INACTIVE_STATE
: ERR_INVALID_STATE;
reply(msg, response);
return;
}
// but just set state to partition_status::PS_POTENTIAL_SECONDARY
_primary_states.get_replica_config(partition_status::PS_POTENTIAL_SECONDARY, response.config);
auto it = _primary_states.learners.find(request.learner);
if (it == _primary_states.learners.end()) {
response.config.status = partition_status::PS_INACTIVE;
response.err = ERR_OBJECT_NOT_FOUND;
reply(msg, response);
return;
}
remote_learner_state &learner_state = it->second;
if (learner_state.signature != request.signature) {
response.config.learner_signature = learner_state.signature;
response.err = ERR_WRONG_CHECKSUM; // means invalid signature
reply(msg, response);
return;
}
// prepare learn_start_decree
decree local_committed_decree = last_committed_decree();
// TODO: learner machine has been down for a long time, and DDD MUST happened before
// which leads to state lost. Now the lost state is back, what shall we do?
if (request.last_committed_decree_in_app > last_prepared_decree()) {
LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner state is newer than learnee, "
"learner_app_committed_decree = {}, local_committed_decree = {}, learn "
"from scratch",
request.signature,
request.learner,
request.last_committed_decree_in_app,
local_committed_decree);
*(decree *)&request.last_committed_decree_in_app = 0;
}
// mutations are previously committed already on learner (old primary)
// this happens when the new primary does not commit the previously prepared mutations
// yet, which it should do, so let's help it now.
else if (request.last_committed_decree_in_app > local_committed_decree) {
LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner's last_committed_decree_in_app "
"is newer than learnee, learner_app_committed_decree = {}, "
"local_committed_decree = {}, commit local soft",
request.signature,
request.learner,
request.last_committed_decree_in_app,
local_committed_decree);
// we shouldn't commit mutations hard coz these mutations may preparing on another learner
_prepare_list->commit(request.last_committed_decree_in_app, COMMIT_TO_DECREE_SOFT);
local_committed_decree = last_committed_decree();
if (request.last_committed_decree_in_app > local_committed_decree) {
LOG_ERROR_PREFIX("on_learn[{:#018x}]: try to commit primary to {}, still less than "
"learner({})'s committed decree({}), wait mutations to be commitable",
request.signature,
local_committed_decree,
request.learner,
request.last_committed_decree_in_app);
response.err = ERR_INCONSISTENT_STATE;
reply(msg, response);
return;
}
}
CHECK_LE(request.last_committed_decree_in_app, local_committed_decree);
const decree learn_start_decree = get_learn_start_decree(request);
response.state.__set_learn_start_decree(learn_start_decree);
bool delayed_replay_prepare_list = false;
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, remote_committed_decree = {}, "
"remote_app_committed_decree = {}, local_committed_decree = {}, "
"app_committed_decree = {}, app_durable_decree = {}, "
"prepare_min_decree = {}, prepare_list_count = {}, learn_start_decree = {}",
request.signature,
request.learner,
request.last_committed_decree_in_prepare_list,
request.last_committed_decree_in_app,
local_committed_decree,
_app->last_committed_decree(),
_app->last_durable_decree(),
_prepare_list->min_decree(),
_prepare_list->count(),
learn_start_decree);
response.address = _stub->_primary_address;
response.prepare_start_decree = invalid_decree;
response.last_committed_decree = local_committed_decree;
response.err = ERR_OK;
// learn delta state or checkpoint
bool should_learn_cache = prepare_cached_learn_state(request,
learn_start_decree,
local_committed_decree,
learner_state,
response,
delayed_replay_prepare_list);
if (!should_learn_cache) {
if (learn_start_decree > _app->last_durable_decree()) {
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn private logs, "
"because learn_start_decree({}) > _app->last_durable_decree({})",
request.signature,
request.learner,
learn_start_decree,
_app->last_durable_decree());
_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state);
response.type = learn_type::LT_LOG;
} else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) {
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn private logs, "
"because mutation_log::get_learn_state() returns true",
request.signature,
request.learner);
response.type = learn_type::LT_LOG;
} else if (learn_start_decree < request.last_committed_decree_in_app + 1) {
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn private logs, "
"because learn_start_decree steps back for duplication",
request.signature,
request.learner);
response.type = learn_type::LT_LOG;
} else {
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn app, beacuse "
"learn_start_decree({}) <= _app->last_durable_decree({}), and "
"mutation_log::get_learn_state() returns false",
request.signature,
request.learner,
learn_start_decree,
_app->last_durable_decree());
response.type = learn_type::LT_APP;
response.state = learn_state();
}
if (response.type == learn_type::LT_LOG) {
response.base_local_dir = _private_log->dir();
if (response.state.files.size() > 0) {
auto &last_file = response.state.files.back();
if (last_file == learner_state.last_learn_log_file) {
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn the same file {} "
"repeatedly, hint to switch file",
request.signature,
request.learner,
last_file);
_private_log->hint_switch_file();
} else {
learner_state.last_learn_log_file = last_file;
}
}
// it is safe to commit to last_committed_decree() now
response.state.to_decree_included = last_committed_decree();
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn private logs succeed, "
"learned_meta_size = {}, learned_file_count = {}, to_decree_included = "
"{}",
request.signature,
request.learner,
response.state.meta.length(),
response.state.files.size(),
response.state.to_decree_included);
} else {
::dsn::error_code err = _app->get_checkpoint(
learn_start_decree, request.app_specific_learn_request, response.state);
if (err != ERR_OK) {
response.err = ERR_GET_LEARN_STATE_FAILED;
LOG_ERROR_PREFIX(
"on_learn[{:#018x}]: learner = {}, get app checkpoint failed, error = {}",
request.signature,
request.learner,
err);
} else {
response.base_local_dir = _app->data_dir();
response.__set_replica_disk_tag(_dir_node->tag);
LOG_INFO_PREFIX(
"on_learn[{:#018x}]: learner = {}, get app learn state succeed, "
"learned_meta_size = {}, learned_file_count = {}, learned_to_decree = {}",
request.signature,
request.learner,
response.state.meta.length(),
response.state.files.size(),
response.state.to_decree_included);
}
}
}
for (auto &file : response.state.files) {
file = file.substr(response.base_local_dir.length() + 1);
}
reply(msg, response);
// the replayed prepare msg needs to be AFTER the learning response msg
if (delayed_replay_prepare_list) {
replay_prepare_list();
}
}