in src/replica/replica_learn.cpp [572:943]
void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp)
{
_checker.only_one_thread_access();
CHECK_EQ(partition_status::PS_POTENTIAL_SECONDARY, status());
CHECK_EQ(req.signature, _potential_secondary_states.learning_version);
if (err != ERR_OK) {
handle_learning_error(err, false);
return;
}
LOG_INFO_PREFIX(
"on_learn_reply_start[{}]: learnee = {}, learn_duration ={} ms, response_err = "
"{}, remote_committed_decree = {}, prepare_start_decree = {}, learn_type = {} "
"learned_buffer_size = {}, learned_file_count = {},to_decree_included = "
"{}, learn_start_decree = {}, last_commit_decree = {}, current_learning_status = "
"{} ",
req.signature,
resp.config.primary.to_string(),
_potential_secondary_states.duration_ms(),
resp.err.to_string(),
resp.last_committed_decree,
resp.prepare_start_decree,
enum_to_string(resp.type),
resp.state.meta.length(),
static_cast<uint32_t>(resp.state.files.size()),
resp.state.to_decree_included,
resp.state.learn_start_decree,
_app->last_committed_decree(),
enum_to_string(_potential_secondary_states.learning_status));
_potential_secondary_states.learning_copy_buffer_size += resp.state.meta.length();
_stub->_counter_replicas_learning_recent_copy_buffer_size->add(resp.state.meta.length());
if (resp.err != ERR_OK) {
if (resp.err == ERR_INACTIVE_STATE || resp.err == ERR_INCONSISTENT_STATE) {
LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learnee is updating "
"ballot(inactive state) or reconciliation(inconsistent state), "
"delay to start another round of learning",
req.signature,
resp.config.primary);
_potential_secondary_states.learning_round_is_running = false;
_potential_secondary_states.delay_learning_task =
tasking::create_task(LPC_DELAY_LEARN,
&_tracker,
std::bind(&replica::init_learn, this, req.signature),
get_gpid().thread_hash());
_potential_secondary_states.delay_learning_task->enqueue(std::chrono::seconds(1));
} else {
handle_learning_error(resp.err, false);
}
return;
}
if (resp.config.ballot > get_ballot()) {
LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, update configuration because "
"ballot have changed",
req.signature,
resp.config.primary);
CHECK(update_local_configuration(resp.config), "");
}
if (status() != partition_status::PS_POTENTIAL_SECONDARY) {
LOG_ERROR_PREFIX(
"on_learn_reply[{:#018x}]: learnee = {}, current_status = {}, stop learning",
req.signature,
resp.config.primary,
enum_to_string(status()));
return;
}
// local state is newer than learnee
if (resp.last_committed_decree < _app->last_committed_decree()) {
LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learner state is newer than "
"learnee (primary): {} vs {}, create new app",
req.signature,
resp.config.primary,
_app->last_committed_decree(),
resp.last_committed_decree);
_stub->_counter_replicas_learning_recent_learn_reset_count->increment();
// close app
auto err = _app->close(true);
if (err != ERR_OK) {
LOG_ERROR_PREFIX(
"on_learn_reply[{:#018x}]: learnee = {}, close app (with clear_state=true) "
"failed, err = {}",
req.signature,
resp.config.primary,
err);
}
// backup old data dir
if (err == ERR_OK) {
std::string old_dir = _app->data_dir();
if (dsn::utils::filesystem::directory_exists(old_dir)) {
char rename_dir[1024];
sprintf(rename_dir, "%s.%" PRIu64 ".discarded", old_dir.c_str(), dsn_now_us());
CHECK(dsn::utils::filesystem::rename_path(old_dir, rename_dir),
"{}: failed to move directory from '{}' to '{}'",
name(),
old_dir,
rename_dir);
LOG_WARNING_PREFIX("replica_dir_op succeed to move directory from '{}' to '{}'",
old_dir,
rename_dir);
}
}
if (err == ERR_OK) {
err = _app->open_new_internal(this,
_stub->_log->on_partition_reset(get_gpid(), 0),
_private_log->on_partition_reset(get_gpid(), 0));
if (err != ERR_OK) {
LOG_ERROR_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, open app (with "
"create_new=true) failed, err = {}",
req.signature,
resp.config.primary,
err);
}
}
if (err == ERR_OK) {
CHECK_EQ_MSG(_app->last_committed_decree(), 0, "must be zero after app::open(true)");
CHECK_EQ_MSG(_app->last_durable_decree(), 0, "must be zero after app::open(true)");
// reset prepare list
_prepare_list->reset(0);
}
if (err != ERR_OK) {
_potential_secondary_states.learn_remote_files_task =
tasking::create_task(LPC_LEARN_REMOTE_DELTA_FILES, &_tracker, [
this,
err,
copy_start = _potential_secondary_states.duration_ms(),
req_cap = std::move(req),
resp_cap = std::move(resp)
]() mutable {
on_copy_remote_state_completed(
err, 0, copy_start, std::move(req_cap), std::move(resp_cap));
});
_potential_secondary_states.learn_remote_files_task->enqueue();
return;
}
}
if (resp.type == learn_type::LT_APP) {
if (++_stub->_learn_app_concurrent_count > FLAGS_learn_app_max_concurrent_count) {
--_stub->_learn_app_concurrent_count;
LOG_WARNING_PREFIX(
"on_learn_reply[{:#018x}]: learnee = {}, learn_app_concurrent_count({}) >= "
"FLAGS_learn_app_max_concurrent_count({}), skip this round",
_potential_secondary_states.learning_version,
_config.primary,
_stub->_learn_app_concurrent_count,
FLAGS_learn_app_max_concurrent_count);
_potential_secondary_states.learning_round_is_running = false;
return;
} else {
_potential_secondary_states.learn_app_concurrent_count_increased = true;
LOG_INFO_PREFIX(
"on_learn_reply[{:#018x}]: learnee = {}, ++learn_app_concurrent_count = {}",
_potential_secondary_states.learning_version,
_config.primary,
_stub->_learn_app_concurrent_count.load());
}
}
switch (resp.type) {
case learn_type::LT_CACHE:
_stub->_counter_replicas_learning_recent_learn_cache_count->increment();
break;
case learn_type::LT_APP:
_stub->_counter_replicas_learning_recent_learn_app_count->increment();
break;
case learn_type::LT_LOG:
_stub->_counter_replicas_learning_recent_learn_log_count->increment();
break;
default:
// do nothing
break;
}
if (resp.prepare_start_decree != invalid_decree) {
CHECK_EQ(resp.type, learn_type::LT_CACHE);
CHECK(resp.state.files.empty(), "");
CHECK_EQ(_potential_secondary_states.learning_status,
learner_status::LearningWithoutPrepare);
_potential_secondary_states.learning_status = learner_status::LearningWithPrepareTransient;
// reset log positions for later mutations
// WARNING: it still requires checkpoint operation in later
// on_copy_remote_state_completed to ensure the state is completed
// if there is a failure in between, our checking
// during app::open_internal will invalidate the logs
// appended by the mutations AFTER current position
err = _app->update_init_info(
this,
_stub->_log->on_partition_reset(get_gpid(), _app->last_committed_decree()),
_private_log->on_partition_reset(get_gpid(), _app->last_committed_decree()),
_app->last_committed_decree());
// switch private log to make learning easier
_private_log->demand_switch_file();
// reset preparelist
_potential_secondary_states.learning_start_prepare_decree = resp.prepare_start_decree;
_prepare_list->truncate(_app->last_committed_decree());
LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, truncate prepare list, "
"local_committed_decree = {}, current_learning_status = {}",
req.signature,
resp.config.primary,
_app->last_committed_decree(),
enum_to_string(_potential_secondary_states.learning_status));
// persist incoming mutations into private log and apply them to prepare-list
std::pair<decree, decree> cache_range;
binary_reader reader(resp.state.meta);
while (!reader.is_eof()) {
auto mu = mutation::read_from(reader, nullptr);
if (mu->data.header.decree > last_committed_decree()) {
LOG_DEBUG_PREFIX("on_learn_reply[{:#018x}]: apply learned mutation {}",
req.signature,
mu->name());
// write to private log with no callback, the later 2pc ensures that logs
// are written to the disk
_private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
// because private log are written without callback, need to manully set flag
mu->set_logged();
// then we prepare, it is possible that a committed mutation exists in learner's
// prepare log,
// but with DIFFERENT ballot. Reference https://github.com/imzhenyu/rDSN/issues/496
mutation_ptr existing_mutation =
_prepare_list->get_mutation_by_decree(mu->data.header.decree);
if (existing_mutation != nullptr &&
existing_mutation->data.header.ballot > mu->data.header.ballot) {
LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, mutation({}) exist on "
"the learner with larger ballot {}",
req.signature,
resp.config.primary,
mu->name(),
existing_mutation->data.header.ballot);
} else {
_prepare_list->prepare(mu, partition_status::PS_POTENTIAL_SECONDARY);
}
if (cache_range.first == 0 || mu->data.header.decree < cache_range.first)
cache_range.first = mu->data.header.decree;
if (cache_range.second == 0 || mu->data.header.decree > cache_range.second)
cache_range.second = mu->data.header.decree;
}
}
LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learn_duration = {} ms, apply "
"cache done, prepare_cache_range = <{}, {}>, local_committed_decree = {}, "
"app_committed_decree = {}, current_learning_status = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
cache_range.first,
cache_range.second,
last_committed_decree(),
_app->last_committed_decree(),
enum_to_string(_potential_secondary_states.learning_status));
// further states are synced using 2pc, and we must commit now as those later 2pc messages
// thinks they should
_prepare_list->commit(resp.prepare_start_decree - 1, COMMIT_TO_DECREE_HARD);
CHECK_EQ(_prepare_list->last_committed_decree(), _app->last_committed_decree());
CHECK(resp.state.files.empty(), "");
// all state is complete
CHECK_GE_MSG(_app->last_committed_decree() + 1,
_potential_secondary_states.learning_start_prepare_decree,
"state is incomplete");
// go to next stage
_potential_secondary_states.learning_status = learner_status::LearningWithPrepare;
_potential_secondary_states.learn_remote_files_task =
tasking::create_task(LPC_LEARN_REMOTE_DELTA_FILES, &_tracker, [
this,
err,
copy_start = _potential_secondary_states.duration_ms(),
req_cap = std::move(req),
resp_cap = std::move(resp)
]() mutable {
on_copy_remote_state_completed(
err, 0, copy_start, std::move(req_cap), std::move(resp_cap));
});
_potential_secondary_states.learn_remote_files_task->enqueue();
}
else if (resp.state.files.size() > 0) {
auto learn_dir = _app->learn_dir();
utils::filesystem::remove_path(learn_dir);
utils::filesystem::create_directory(learn_dir);
if (!dsn::utils::filesystem::directory_exists(learn_dir)) {
LOG_ERROR_PREFIX(
"on_learn_reply[{:#018x}]: learnee = {}, create replica learn dir {} failed",
req.signature,
resp.config.primary,
learn_dir);
_potential_secondary_states.learn_remote_files_task =
tasking::create_task(LPC_LEARN_REMOTE_DELTA_FILES, &_tracker, [
this,
copy_start = _potential_secondary_states.duration_ms(),
req_cap = std::move(req),
resp_cap = std::move(resp)
]() mutable {
on_copy_remote_state_completed(ERR_FILE_OPERATION_FAILED,
0,
copy_start,
std::move(req_cap),
std::move(resp_cap));
});
_potential_secondary_states.learn_remote_files_task->enqueue();
return;
}
bool high_priority = (resp.type == learn_type::LT_APP ? false : true);
LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learn_duration = {} ms, start to "
"copy remote files, copy_file_count = {}, priority = {}",
req.signature,
resp.config.primary,
_potential_secondary_states.duration_ms(),
resp.state.files.size(),
high_priority ? "high" : "low");
_potential_secondary_states.learn_remote_files_task = _stub->_nfs->copy_remote_files(
resp.config.primary,
resp.replica_disk_tag,
resp.base_local_dir,
resp.state.files,
_dir_node->tag,
learn_dir,
get_gpid(),
true, // overwrite
high_priority,
LPC_REPLICATION_COPY_REMOTE_FILES,
&_tracker,
[
this,
copy_start = _potential_secondary_states.duration_ms(),
req_cap = std::move(req),
resp_copy = resp
](error_code err, size_t sz) mutable {
on_copy_remote_state_completed(
err, sz, copy_start, std::move(req_cap), std::move(resp_copy));
});
} else {
_potential_secondary_states.learn_remote_files_task =
tasking::create_task(LPC_LEARN_REMOTE_DELTA_FILES, &_tracker, [
this,
copy_start = _potential_secondary_states.duration_ms(),
req_cap = std::move(req),
resp_cap = std::move(resp)
]() mutable {
on_copy_remote_state_completed(
ERR_OK, 0, copy_start, std::move(req_cap), std::move(resp_cap));
});
_potential_secondary_states.learn_remote_files_task->enqueue();
}
}