void replica::on_copy_remote_state_completed()

in src/replica/replica_learn.cpp [1001:1185]


void replica::on_copy_remote_state_completed(error_code err,
                                             size_t size,
                                             uint64_t copy_start_time,
                                             learn_request &&req,
                                             learn_response &&resp)
{
    decree old_prepared = last_prepared_decree();
    decree old_committed = last_committed_decree();
    decree old_app_committed = _app->last_committed_decree();
    decree old_app_durable = _app->last_durable_decree();

    LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, learn_duration = {} "
                    "ms, copy remote state done, err = {}, copy_file_count = {}, copy_file_size = "
                    "{}, copy_time_used = {} ms, local_committed_decree = {}, app_committed_decree "
                    "= {}, app_durable_decree = {}, prepare_start_decree = {}, "
                    "current_learning_status = {}",
                    req.signature,
                    FMT_HOST_PORT_AND_IP(resp.config, primary),
                    _potential_secondary_states.duration_ms(),
                    err,
                    resp.state.files.size(),
                    size,
                    _potential_secondary_states.duration_ms() - copy_start_time,
                    last_committed_decree(),
                    _app->last_committed_decree(),
                    _app->last_durable_decree(),
                    resp.prepare_start_decree,
                    enum_to_string(_potential_secondary_states.learning_status));

    if (resp.type == learn_type::LT_APP) {
        --_stub->_learn_app_concurrent_count;
        _potential_secondary_states.learn_app_concurrent_count_increased = false;
        LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
                        "--learn_app_concurrent_count = {}",
                        _potential_secondary_states.learning_version,
                        FMT_HOST_PORT_AND_IP(_config, primary),
                        _stub->_learn_app_concurrent_count.load());
    }

    if (err == ERR_OK) {
        _potential_secondary_states.learning_copy_file_count += resp.state.files.size();
        _potential_secondary_states.learning_copy_file_size += size;
        METRIC_VAR_INCREMENT_BY(learn_copy_files, resp.state.files.size());
        METRIC_VAR_INCREMENT_BY(learn_copy_file_bytes, size);
    }

    if (err != ERR_OK) {
        // do nothing
    } else if (_potential_secondary_states.learning_status == learner_status::LearningWithPrepare) {
        CHECK_EQ(resp.type, learn_type::LT_CACHE);
    } else {
        CHECK(resp.type == learn_type::LT_APP || resp.type == learn_type::LT_LOG,
              "invalid learn_type, type = {}",
              enum_to_string(resp.type));

        learn_state lstate;
        lstate.from_decree_excluded = resp.state.from_decree_excluded;
        lstate.to_decree_included = resp.state.to_decree_included;
        lstate.meta = resp.state.meta;
        if (resp.state.__isset.learn_start_decree) {
            lstate.__set_learn_start_decree(resp.state.learn_start_decree);
        }

        for (auto &f : resp.state.files) {
            std::string file = utils::filesystem::path_combine(_app->learn_dir(), f);
            lstate.files.push_back(file);
        }

        // apply app learning
        if (resp.type == learn_type::LT_APP) {
            auto start_ts = dsn_now_ns();
            err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
            if (err == ERR_OK) {

                CHECK_GE(_app->last_committed_decree(), _app->last_durable_decree());
                // because if the original _app->last_committed_decree > resp.last_committed_decree,
                // the learn_start_decree will be set to 0, which makes learner to learn from
                // scratch
                CHECK_LE(_app->last_committed_decree(), resp.last_committed_decree);
                LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
                                "learn_duration = {} ms, checkpoint duration = {} ns, apply "
                                "checkpoint succeed, app_committed_decree = {}",
                                req.signature,
                                FMT_HOST_PORT_AND_IP(resp.config, primary),
                                _potential_secondary_states.duration_ms(),
                                dsn_now_ns() - start_ts,
                                _app->last_committed_decree());
            } else {
                LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
                                 "learn_duration = {} ms, checkpoint duration = {} ns, apply "
                                 "checkpoint failed, err = {}",
                                 req.signature,
                                 FMT_HOST_PORT_AND_IP(resp.config, primary),
                                 _potential_secondary_states.duration_ms(),
                                 dsn_now_ns() - start_ts,
                                 err);
            }
        }

        // apply log learning
        else {
            auto start_ts = dsn_now_ns();
            err = apply_learned_state_from_private_log(lstate);
            if (err == ERR_OK) {
                LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
                                "learn_duration = {} ms, apply_log_duration = {} ns, apply learned "
                                "state from private log succeed, app_committed_decree = {}",
                                req.signature,
                                FMT_HOST_PORT_AND_IP(resp.config, primary),
                                _potential_secondary_states.duration_ms(),
                                dsn_now_ns() - start_ts,
                                _app->last_committed_decree());
            } else {
                LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, "
                                 "learn_duration = {} ms, apply_log_duration = {} ns, apply "
                                 "learned state from private log failed, err = {}",
                                 req.signature,
                                 FMT_HOST_PORT_AND_IP(resp.config, primary),
                                 _potential_secondary_states.duration_ms(),
                                 dsn_now_ns() - start_ts,
                                 err);
            }
        }

        // reset prepare list to make it catch with app
        _prepare_list->reset(_app->last_committed_decree());

        LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, learn_duration = "
                        "{} ms, apply checkpoint/log done, err = {}, last_prepared_decree = ({} => "
                        "{}), last_committed_decree = ({} => {}), app_committed_decree = ({} => "
                        "{}), app_durable_decree = ({} => {}), remote_committed_decree = {}, "
                        "prepare_start_decree = {}, current_learning_status = {}",
                        req.signature,
                        FMT_HOST_PORT_AND_IP(resp.config, primary),
                        _potential_secondary_states.duration_ms(),
                        err,
                        old_prepared,
                        last_prepared_decree(),
                        old_committed,
                        last_committed_decree(),
                        old_app_committed,
                        _app->last_committed_decree(),
                        old_app_durable,
                        _app->last_durable_decree(),
                        resp.last_committed_decree,
                        resp.prepare_start_decree,
                        enum_to_string(_potential_secondary_states.learning_status));
    }

    // if catch-up done, do flush to enable all learned state is durable
    if (err == ERR_OK && resp.prepare_start_decree != invalid_decree &&
        _app->last_committed_decree() + 1 >=
            _potential_secondary_states.learning_start_prepare_decree &&
        _app->last_committed_decree() > _app->last_durable_decree()) {
        err = background_sync_checkpoint();

        LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {}, learn_duration = "
                        "{} ms, flush done, err = {}, app_committed_decree = {}, "
                        "app_durable_decree = {}",
                        req.signature,
                        FMT_HOST_PORT_AND_IP(resp.config, primary),
                        _potential_secondary_states.duration_ms(),
                        err,
                        _app->last_committed_decree(),
                        _app->last_durable_decree());

        if (err == ERR_OK) {
            CHECK_EQ(_app->last_committed_decree(), _app->last_durable_decree());
        }
    }

    // it is possible that the _potential_secondary_states.learn_remote_files_task is still running
    // while its body is definitely done already as being here, so we manually set its value to
    // nullptr
    // so that we don't have unnecessary failed reconfiguration later due to this non-nullptr in
    // cleanup
    _potential_secondary_states.learn_remote_files_task = nullptr;

    _potential_secondary_states.learn_remote_files_completed_task = tasking::create_task(
        LPC_LEARN_REMOTE_DELTA_FILES_COMPLETED,
        &_tracker,
        [this, err]() { on_learn_remote_state_completed(err); },
        get_gpid().thread_hash());
    _potential_secondary_states.learn_remote_files_completed_task->enqueue();
}