void replica::on_learn()

in src/replica/replica_learn.cpp [364:570]


void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
{
    _checker.only_one_thread_access();

    learn_response response;
    if (partition_status::PS_PRIMARY != status()) {
        response.err = (partition_status::PS_INACTIVE == status() && _inactive_is_transient)
                           ? ERR_INACTIVE_STATE
                           : ERR_INVALID_STATE;
        reply(msg, response);
        return;
    }

    // but just set state to partition_status::PS_POTENTIAL_SECONDARY
    _primary_states.get_replica_config(partition_status::PS_POTENTIAL_SECONDARY, response.config);

    auto it = _primary_states.learners.find(request.learner);
    if (it == _primary_states.learners.end()) {
        response.config.status = partition_status::PS_INACTIVE;
        response.err = ERR_OBJECT_NOT_FOUND;
        reply(msg, response);
        return;
    }

    remote_learner_state &learner_state = it->second;
    if (learner_state.signature != request.signature) {
        response.config.learner_signature = learner_state.signature;
        response.err = ERR_WRONG_CHECKSUM; // means invalid signature
        reply(msg, response);
        return;
    }

    // prepare learn_start_decree
    decree local_committed_decree = last_committed_decree();

    // TODO: learner machine has been down for a long time, and DDD MUST happened before
    // which leads to state lost. Now the lost state is back, what shall we do?
    if (request.last_committed_decree_in_app > last_prepared_decree()) {
        LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner state is newer than learnee, "
                         "learner_app_committed_decree = {}, local_committed_decree = {}, learn "
                         "from scratch",
                         request.signature,
                         request.learner,
                         request.last_committed_decree_in_app,
                         local_committed_decree);

        *(decree *)&request.last_committed_decree_in_app = 0;
    }

    // mutations are previously committed already on learner (old primary)
    // this happens when the new primary does not commit the previously prepared mutations
    // yet, which it should do, so let's help it now.
    else if (request.last_committed_decree_in_app > local_committed_decree) {
        LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner's last_committed_decree_in_app "
                         "is newer than learnee, learner_app_committed_decree = {}, "
                         "local_committed_decree = {}, commit local soft",
                         request.signature,
                         request.learner,
                         request.last_committed_decree_in_app,
                         local_committed_decree);

        // we shouldn't commit mutations hard coz these mutations may preparing on another learner
        _prepare_list->commit(request.last_committed_decree_in_app, COMMIT_TO_DECREE_SOFT);
        local_committed_decree = last_committed_decree();

        if (request.last_committed_decree_in_app > local_committed_decree) {
            LOG_ERROR_PREFIX("on_learn[{:#018x}]: try to commit primary to {}, still less than "
                             "learner({})'s committed decree({}), wait mutations to be commitable",
                             request.signature,
                             local_committed_decree,
                             request.learner,
                             request.last_committed_decree_in_app);
            response.err = ERR_INCONSISTENT_STATE;
            reply(msg, response);
            return;
        }
    }

    CHECK_LE(request.last_committed_decree_in_app, local_committed_decree);

    const decree learn_start_decree = get_learn_start_decree(request);
    response.state.__set_learn_start_decree(learn_start_decree);
    bool delayed_replay_prepare_list = false;

    LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, remote_committed_decree = {}, "
                    "remote_app_committed_decree = {}, local_committed_decree = {}, "
                    "app_committed_decree = {}, app_durable_decree = {}, "
                    "prepare_min_decree = {}, prepare_list_count = {}, learn_start_decree = {}",
                    request.signature,
                    request.learner,
                    request.last_committed_decree_in_prepare_list,
                    request.last_committed_decree_in_app,
                    local_committed_decree,
                    _app->last_committed_decree(),
                    _app->last_durable_decree(),
                    _prepare_list->min_decree(),
                    _prepare_list->count(),
                    learn_start_decree);

    response.address = _stub->_primary_address;
    response.prepare_start_decree = invalid_decree;
    response.last_committed_decree = local_committed_decree;
    response.err = ERR_OK;

    // learn delta state or checkpoint
    bool should_learn_cache = prepare_cached_learn_state(request,
                                                         learn_start_decree,
                                                         local_committed_decree,
                                                         learner_state,
                                                         response,
                                                         delayed_replay_prepare_list);
    if (!should_learn_cache) {
        if (learn_start_decree > _app->last_durable_decree()) {
            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn private logs, "
                            "because learn_start_decree({}) > _app->last_durable_decree({})",
                            request.signature,
                            request.learner,
                            learn_start_decree,
                            _app->last_durable_decree());
            _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state);
            response.type = learn_type::LT_LOG;
        } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) {
            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn private logs, "
                            "because mutation_log::get_learn_state() returns true",
                            request.signature,
                            request.learner);
            response.type = learn_type::LT_LOG;
        } else if (learn_start_decree < request.last_committed_decree_in_app + 1) {
            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn private logs, "
                            "because learn_start_decree steps back for duplication",
                            request.signature,
                            request.learner);
            response.type = learn_type::LT_LOG;
        } else {
            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn app, beacuse "
                            "learn_start_decree({}) <= _app->last_durable_decree({}), and "
                            "mutation_log::get_learn_state() returns false",
                            request.signature,
                            request.learner,
                            learn_start_decree,
                            _app->last_durable_decree());
            response.type = learn_type::LT_APP;
            response.state = learn_state();
        }

        if (response.type == learn_type::LT_LOG) {
            response.base_local_dir = _private_log->dir();
            if (response.state.files.size() > 0) {
                auto &last_file = response.state.files.back();
                if (last_file == learner_state.last_learn_log_file) {
                    LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn the same file {} "
                                    "repeatedly, hint to switch file",
                                    request.signature,
                                    request.learner,
                                    last_file);
                    _private_log->hint_switch_file();
                } else {
                    learner_state.last_learn_log_file = last_file;
                }
            }
            // it is safe to commit to last_committed_decree() now
            response.state.to_decree_included = last_committed_decree();
            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn private logs succeed, "
                            "learned_meta_size = {}, learned_file_count = {}, to_decree_included = "
                            "{}",
                            request.signature,
                            request.learner,
                            response.state.meta.length(),
                            response.state.files.size(),
                            response.state.to_decree_included);
        } else {
            ::dsn::error_code err = _app->get_checkpoint(
                learn_start_decree, request.app_specific_learn_request, response.state);

            if (err != ERR_OK) {
                response.err = ERR_GET_LEARN_STATE_FAILED;
                LOG_ERROR_PREFIX(
                    "on_learn[{:#018x}]: learner = {}, get app checkpoint failed, error = {}",
                    request.signature,
                    request.learner,
                    err);
            } else {
                response.base_local_dir = _app->data_dir();
                response.__set_replica_disk_tag(_dir_node->tag);
                LOG_INFO_PREFIX(
                    "on_learn[{:#018x}]: learner = {}, get app learn state succeed, "
                    "learned_meta_size = {}, learned_file_count = {}, learned_to_decree = {}",
                    request.signature,
                    request.learner,
                    response.state.meta.length(),
                    response.state.files.size(),
                    response.state.to_decree_included);
            }
        }
    }

    for (auto &file : response.state.files) {
        file = file.substr(response.base_local_dir.length() + 1);
    }

    reply(msg, response);

    // the replayed prepare msg needs to be AFTER the learning response msg
    if (delayed_replay_prepare_list) {
        replay_prepare_list();
    }
}