in src/replica/replica_learn.cpp [1005:1189]
void replica::on_copy_remote_state_completed(error_code err,
size_t size,
uint64_t copy_start_time,
learn_request &&req,
learn_response &&resp)
{
decree old_prepared = last_prepared_decree();
decree old_committed = last_committed_decree();
decree old_app_committed = _app->last_committed_decree();
decree old_app_durable = _app->last_durable_decree();
LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, learn_duration = {} "
"ms, copy remote state done, err = {}, copy_file_count = {}, copy_file_size = "
"{}, copy_time_used = {} ms, local_committed_decree = {}, app_committed_decree "
"= {}, app_durable_decree = {}, prepare_start_decree = {}, "
"current_learning_status = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
err,
resp.state.files.size(),
size,
_potential_secondary_states.duration_ms() - copy_start_time,
last_committed_decree(),
_app->last_committed_decree(),
_app->last_durable_decree(),
resp.prepare_start_decree,
enum_to_string(_potential_secondary_states.learning_status));
if (resp.type == learn_type::LT_APP) {
--_stub->_learn_app_concurrent_count;
_potential_secondary_states.learn_app_concurrent_count_increased = false;
LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
"--learn_app_concurrent_count = {}",
_potential_secondary_states.learning_version,
_config.primary,
_stub->_learn_app_concurrent_count.load());
}
if (err == ERR_OK) {
_potential_secondary_states.learning_copy_file_count += resp.state.files.size();
_potential_secondary_states.learning_copy_file_size += size;
_stub->_counter_replicas_learning_recent_copy_file_count->add(resp.state.files.size());
_stub->_counter_replicas_learning_recent_copy_file_size->add(size);
}
if (err != ERR_OK) {
// do nothing
} else if (_potential_secondary_states.learning_status == learner_status::LearningWithPrepare) {
CHECK_EQ(resp.type, learn_type::LT_CACHE);
} else {
CHECK(resp.type == learn_type::LT_APP || resp.type == learn_type::LT_LOG,
"invalid learn_type, type = {}",
enum_to_string(resp.type));
learn_state lstate;
lstate.from_decree_excluded = resp.state.from_decree_excluded;
lstate.to_decree_included = resp.state.to_decree_included;
lstate.meta = resp.state.meta;
if (resp.state.__isset.learn_start_decree) {
lstate.__set_learn_start_decree(resp.state.learn_start_decree);
}
for (auto &f : resp.state.files) {
std::string file = utils::filesystem::path_combine(_app->learn_dir(), f);
lstate.files.push_back(file);
}
// apply app learning
if (resp.type == learn_type::LT_APP) {
auto start_ts = dsn_now_ns();
err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err == ERR_OK) {
CHECK_GE(_app->last_committed_decree(), _app->last_durable_decree());
// because if the original _app->last_committed_decree > resp.last_committed_decree,
// the learn_start_decree will be set to 0, which makes learner to learn from
// scratch
CHECK_LE(_app->last_committed_decree(), resp.last_committed_decree);
LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
"learn_duration = {} ms, checkpoint duration = {} ns, apply "
"checkpoint succeed, app_committed_decree = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
_app->last_committed_decree());
} else {
LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
"learn_duration = {} ms, checkpoint duration = {} ns, apply "
"checkpoint failed, err = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
err);
}
}
// apply log learning
else {
auto start_ts = dsn_now_ns();
err = apply_learned_state_from_private_log(lstate);
if (err == ERR_OK) {
LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
"learn_duration = {} ms, apply_log_duration = {} ns, apply learned "
"state from private log succeed, app_committed_decree = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
_app->last_committed_decree());
} else {
LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
"learn_duration = {} ms, apply_log_duration = {} ns, apply "
"learned state from private log failed, err = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
err);
}
}
// reset prepare list to make it catch with app
_prepare_list->reset(_app->last_committed_decree());
LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, learn_duration = "
"{} ms, apply checkpoint/log done, err = {}, last_prepared_decree = ({} => "
"{}), last_committed_decree = ({} => {}), app_committed_decree = ({} => "
"{}), app_durable_decree = ({} => {}), remote_committed_decree = {}, "
"prepare_start_decree = {}, current_learning_status = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
err,
old_prepared,
last_prepared_decree(),
old_committed,
last_committed_decree(),
old_app_committed,
_app->last_committed_decree(),
old_app_durable,
_app->last_durable_decree(),
resp.last_committed_decree,
resp.prepare_start_decree,
enum_to_string(_potential_secondary_states.learning_status));
}
// if catch-up done, do flush to enable all learned state is durable
if (err == ERR_OK && resp.prepare_start_decree != invalid_decree &&
_app->last_committed_decree() + 1 >=
_potential_secondary_states.learning_start_prepare_decree &&
_app->last_committed_decree() > _app->last_durable_decree()) {
err = background_sync_checkpoint();
LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, learn_duration = "
"{} ms, flush done, err = {}, app_committed_decree = {}, "
"app_durable_decree = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
err,
_app->last_committed_decree(),
_app->last_durable_decree());
if (err == ERR_OK) {
CHECK_EQ(_app->last_committed_decree(), _app->last_durable_decree());
}
}
// it is possible that the _potential_secondary_states.learn_remote_files_task is still running
// while its body is definitely done already as being here, so we manually set its value to
// nullptr
// so that we don't have unnecessary failed reconfiguration later due to this non-nullptr in
// cleanup
_potential_secondary_states.learn_remote_files_task = nullptr;
_potential_secondary_states.learn_remote_files_completed_task =
tasking::create_task(LPC_LEARN_REMOTE_DELTA_FILES_COMPLETED,
&_tracker,
[this, err]() { on_learn_remote_state_completed(err); },
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
}