void replica::init_learn()

in src/replica/replica_learn.cpp [95:271]


void replica::init_learn(uint64_t signature)
{
    _checker.only_one_thread_access();

    if (status() != partition_status::PS_POTENTIAL_SECONDARY) {
        LOG_WARNING_PREFIX(
            "state is not potential secondary but {}, skip learning with signature[{:#018x}]",
            enum_to_string(status()),
            signature);
        return;
    }

    if (signature == invalid_signature) {
        LOG_WARNING_PREFIX("invalid learning signature, skip");
        return;
    }

    // at most one learning task running
    if (_potential_secondary_states.learning_round_is_running) {
        LOG_WARNING_PREFIX(
            "previous learning is still running, skip learning with signature [{:#018x}]",
            signature);
        return;
    }

    if (signature < _potential_secondary_states.learning_version) {
        LOG_WARNING_PREFIX(
            "learning request is out-dated, therefore skipped: [{:#018x}] vs [{:#018x}]",
            signature,
            _potential_secondary_states.learning_version);
        return;
    }

    // learn timeout or primary change, the (new) primary starts another round of learning process
    // be cautious: primary should not issue signatures frequently to avoid learning abort
    if (signature != _potential_secondary_states.learning_version) {
        if (!_potential_secondary_states.cleanup(false)) {
            LOG_WARNING_PREFIX(
                "previous learning with signature[{:#018x}] is still in-process, skip "
                "init new learning with signature [{:#018x}]",
                _potential_secondary_states.learning_version,
                signature);
            return;
        }

        _stub->_counter_replicas_learning_recent_start_count->increment();

        _potential_secondary_states.learning_version = signature;
        _potential_secondary_states.learning_start_ts_ns = dsn_now_ns();
        _potential_secondary_states.learning_status = learner_status::LearningWithoutPrepare;
        _prepare_list->truncate(_app->last_committed_decree());
    } else {
        switch (_potential_secondary_states.learning_status) {
        // any failues in the process
        case learner_status::LearningFailed:
            break;

        // learned state (app state) completed
        case learner_status::LearningWithPrepare:
            CHECK_GE_MSG(_app->last_durable_decree() + 1,
                         _potential_secondary_states.learning_start_prepare_decree,
                         "learned state is incomplete");
            {
                // check missing state due to _app->flush to checkpoint the learned state
                auto ac = _app->last_committed_decree();
                auto pc = _prepare_list->last_committed_decree();

                // TODO(qinzuoyan): to test the following lines
                // missing commits
                if (pc > ac) {
                    // missed ones are covered by prepare list
                    if (_prepare_list->count() > 0 && ac + 1 >= _prepare_list->min_decree()) {
                        for (auto d = ac + 1; d <= pc; d++) {
                            auto mu = _prepare_list->get_mutation_by_decree(d);
                            CHECK_NOTNULL(mu, "mutation must not be nullptr, decree = {}", d);
                            auto err = _app->apply_mutation(mu);
                            if (ERR_OK != err) {
                                handle_learning_error(err, true);
                                return;
                            }
                        }
                    }

                    // missed ones need to be loaded via private logs
                    else {
                        _stub->_counter_replicas_learning_recent_round_start_count->increment();
                        _potential_secondary_states.learning_round_is_running = true;
                        _potential_secondary_states.catchup_with_private_log_task =
                            tasking::create_task(LPC_CATCHUP_WITH_PRIVATE_LOGS,
                                                 &_tracker,
                                                 [this]() {
                                                     this->catch_up_with_private_logs(
                                                         partition_status::PS_POTENTIAL_SECONDARY);
                                                 },
                                                 get_gpid().thread_hash());
                        _potential_secondary_states.catchup_with_private_log_task->enqueue();

                        return; // incomplete
                    }
                }

                // no missing commits
                else {
                }

                // convert to success if app state and prepare list is connected
                _potential_secondary_states.learning_status = learner_status::LearningSucceeded;
                // fall through to success
            }

        // app state and prepare list is connected
        case learner_status::LearningSucceeded: {
            check_state_completeness();
            notify_learn_completion();
            return;
        } break;
        case learner_status::LearningWithoutPrepare:
            break;
        default:
            CHECK(false,
                  "invalid learner_status, status = {}",
                  enum_to_string(_potential_secondary_states.learning_status));
        }
    }

    if (_app->last_committed_decree() == 0 &&
        _stub->_learn_app_concurrent_count.load() >= FLAGS_learn_app_max_concurrent_count) {
        LOG_WARNING_PREFIX(
            "init_learn[{:#018x}]: learnee = {}, learn_duration = {} ms, need to learn app "
            "because app_committed_decree = 0, but learn_app_concurrent_count({}) >= "
            "FLAGS_learn_app_max_concurrent_count({}), skip",
            _potential_secondary_states.learning_version,
            _config.primary,
            _potential_secondary_states.duration_ms(),
            _stub->_learn_app_concurrent_count,
            FLAGS_learn_app_max_concurrent_count);
        return;
    }

    _stub->_counter_replicas_learning_recent_round_start_count->increment();
    _potential_secondary_states.learning_round_is_running = true;

    learn_request request;
    request.pid = get_gpid();
    request.__set_max_gced_decree(get_max_gced_decree_for_learn());
    request.last_committed_decree_in_app = _app->last_committed_decree();
    request.last_committed_decree_in_prepare_list = _prepare_list->last_committed_decree();
    request.learner = _stub->_primary_address;
    request.signature = _potential_secondary_states.learning_version;
    _app->prepare_get_checkpoint(request.app_specific_learn_request);

    LOG_INFO_PREFIX("init_learn[{:#018x}]: learnee = {}, learn_duration = {} ms, max_gced_decree = "
                    "{}, local_committed_decree = {}, app_committed_decree = {}, "
                    "app_durable_decree = {}, current_learning_status = {}, total_copy_file_count "
                    "= {}, total_copy_file_size = {}, total_copy_buffer_size = {}",
                    request.signature,
                    _config.primary,
                    _potential_secondary_states.duration_ms(),
                    request.max_gced_decree,
                    last_committed_decree(),
                    _app->last_committed_decree(),
                    _app->last_durable_decree(),
                    enum_to_string(_potential_secondary_states.learning_status),
                    _potential_secondary_states.learning_copy_file_count,
                    _potential_secondary_states.learning_copy_file_size,
                    _potential_secondary_states.learning_copy_buffer_size);

    dsn::message_ex *msg = dsn::message_ex::create_request(RPC_LEARN, 0, get_gpid().thread_hash());
    dsn::marshall(msg, request);
    _potential_secondary_states.learning_task = rpc::call(
        _config.primary,
        msg,
        &_tracker,
        [ this, req_cap = std::move(request) ](error_code err, learn_response && resp) mutable {
            on_learn_reply(err, std::move(req_cap), std::move(resp));
        });
}