in src/replica/replica_learn.cpp [89:268]
void replica::init_learn(uint64_t signature)
{
_checker.only_one_thread_access();
if (status() != partition_status::PS_POTENTIAL_SECONDARY) {
LOG_WARNING_PREFIX(
"state is not potential secondary but {}, skip learning with signature[{:#018x}]",
enum_to_string(status()),
signature);
return;
}
if (signature == invalid_signature) {
LOG_WARNING_PREFIX("invalid learning signature, skip");
return;
}
// at most one learning task running
if (_potential_secondary_states.learning_round_is_running) {
LOG_WARNING_PREFIX(
"previous learning is still running, skip learning with signature [{:#018x}]",
signature);
return;
}
if (signature < _potential_secondary_states.learning_version) {
LOG_WARNING_PREFIX(
"learning request is out-dated, therefore skipped: [{:#018x}] vs [{:#018x}]",
signature,
_potential_secondary_states.learning_version);
return;
}
// learn timeout or primary change, the (new) primary starts another round of learning process
// be cautious: primary should not issue signatures frequently to avoid learning abort
if (signature != _potential_secondary_states.learning_version) {
if (!_potential_secondary_states.cleanup(false)) {
LOG_WARNING_PREFIX(
"previous learning with signature[{:#018x}] is still in-process, skip "
"init new learning with signature [{:#018x}]",
_potential_secondary_states.learning_version,
signature);
return;
}
METRIC_VAR_INCREMENT(learn_count);
_potential_secondary_states.learning_version = signature;
_potential_secondary_states.learning_start_ts_ns = dsn_now_ns();
_potential_secondary_states.learning_status = learner_status::LearningWithoutPrepare;
_prepare_list->truncate(_app->last_committed_decree());
} else {
switch (_potential_secondary_states.learning_status) {
// any failues in the process
case learner_status::LearningFailed:
break;
// learned state (app state) completed
case learner_status::LearningWithPrepare:
CHECK_GE_MSG(_app->last_durable_decree() + 1,
_potential_secondary_states.learning_start_prepare_decree,
"learned state is incomplete");
{
// check missing state due to _app->flush to checkpoint the learned state
auto ac = _app->last_committed_decree();
auto pc = _prepare_list->last_committed_decree();
// TODO(qinzuoyan): to test the following lines
// missing commits
if (pc > ac) {
// missed ones are covered by prepare list
if (_prepare_list->count() > 0 && ac + 1 >= _prepare_list->min_decree()) {
for (auto d = ac + 1; d <= pc; d++) {
auto mu = _prepare_list->get_mutation_by_decree(d);
CHECK_NOTNULL(mu, "mutation must not be nullptr, decree = {}", d);
auto err = _app->apply_mutation(mu);
if (ERR_OK != err) {
handle_learning_error(err, true);
return;
}
}
}
// missed ones need to be loaded via private logs
else {
METRIC_VAR_INCREMENT(learn_rounds);
_potential_secondary_states.learning_round_is_running = true;
_potential_secondary_states.catchup_with_private_log_task =
tasking::create_task(
LPC_CATCHUP_WITH_PRIVATE_LOGS,
&_tracker,
[this]() {
this->catch_up_with_private_logs(
partition_status::PS_POTENTIAL_SECONDARY);
},
get_gpid().thread_hash());
_potential_secondary_states.catchup_with_private_log_task->enqueue();
return; // incomplete
}
}
// no missing commits
else {
}
// convert to success if app state and prepare list is connected
_potential_secondary_states.learning_status = learner_status::LearningSucceeded;
// fall through to success
}
// app state and prepare list is connected
case learner_status::LearningSucceeded: {
check_state_completeness();
notify_learn_completion();
return;
} break;
case learner_status::LearningWithoutPrepare:
break;
default:
CHECK(false,
"invalid learner_status, status = {}",
enum_to_string(_potential_secondary_states.learning_status));
}
}
if (_app->last_committed_decree() == 0 &&
_stub->_learn_app_concurrent_count.load() >= FLAGS_learn_app_max_concurrent_count) {
LOG_WARNING_PREFIX(
"init_learn[{:#018x}]: learnee = {}, learn_duration = {} ms, need to learn app "
"because app_committed_decree = 0, but learn_app_concurrent_count({}) >= "
"FLAGS_learn_app_max_concurrent_count({}), skip",
_potential_secondary_states.learning_version,
FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
_stub->_learn_app_concurrent_count,
FLAGS_learn_app_max_concurrent_count);
return;
}
METRIC_VAR_INCREMENT(learn_rounds);
_potential_secondary_states.learning_round_is_running = true;
learn_request request;
request.pid = get_gpid();
request.__set_max_gced_decree(get_max_gced_decree_for_learn());
request.last_committed_decree_in_app = _app->last_committed_decree();
request.last_committed_decree_in_prepare_list = _prepare_list->last_committed_decree();
SET_IP_AND_HOST_PORT(request, learner, _stub->primary_address(), _stub->primary_host_port());
request.signature = _potential_secondary_states.learning_version;
_app->prepare_get_checkpoint(request.app_specific_learn_request);
LOG_INFO_PREFIX("init_learn[{:#018x}]: learnee = {}, learn_duration = {} ms, max_gced_decree = "
"{}, local_committed_decree = {}, app_committed_decree = {}, "
"app_durable_decree = {}, current_learning_status = {}, total_copy_file_count "
"= {}, total_copy_file_size = {}, total_copy_buffer_size = {}",
request.signature,
FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
request.max_gced_decree,
last_committed_decree(),
_app->last_committed_decree(),
_app->last_durable_decree(),
enum_to_string(_potential_secondary_states.learning_status),
_potential_secondary_states.learning_copy_file_count,
_potential_secondary_states.learning_copy_file_size,
_potential_secondary_states.learning_copy_buffer_size);
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_LEARN, 0, get_gpid().thread_hash());
dsn::marshall(msg, request);
host_port primary;
GET_HOST_PORT(_config, primary, primary);
_potential_secondary_states.learning_task = rpc::call(
dsn::dns_resolver::instance().resolve_address(primary),
msg,
&_tracker,
[this, req_cap = std::move(request)](error_code err, learn_response &&resp) mutable {
on_learn_reply(err, std::move(req_cap), std::move(resp));
});
}