void replica::on_learn_reply()

in src/replica/replica_learn.cpp [571:939]


void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp)
{
    _checker.only_one_thread_access();

    CHECK_EQ(partition_status::PS_POTENTIAL_SECONDARY, status());
    CHECK_EQ(req.signature, _potential_secondary_states.learning_version);

    if (err != ERR_OK) {
        handle_learning_error(err, false);
        return;
    }

    LOG_INFO_PREFIX(
        "on_learn_reply_start[{}]: learnee = {}, learn_duration ={} ms, response_err = "
        "{}, remote_committed_decree = {}, prepare_start_decree = {}, learn_type = {} "
        "learned_buffer_size = {}, learned_file_count = {},to_decree_included = "
        "{}, learn_start_decree = {}, last_commit_decree = {}, current_learning_status = "
        "{} ",
        req.signature,
        FMT_HOST_PORT_AND_IP(resp.config, primary),
        _potential_secondary_states.duration_ms(),
        resp.err,
        resp.last_committed_decree,
        resp.prepare_start_decree,
        enum_to_string(resp.type),
        resp.state.meta.length(),
        static_cast<uint32_t>(resp.state.files.size()),
        resp.state.to_decree_included,
        resp.state.learn_start_decree,
        _app->last_committed_decree(),
        enum_to_string(_potential_secondary_states.learning_status));

    _potential_secondary_states.learning_copy_buffer_size += resp.state.meta.length();
    METRIC_VAR_INCREMENT_BY(learn_copy_buffer_bytes, resp.state.meta.length());

    if (resp.err != ERR_OK) {
        if (resp.err == ERR_INACTIVE_STATE || resp.err == ERR_INCONSISTENT_STATE) {
            LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learnee is updating "
                               "ballot(inactive state) or reconciliation(inconsistent state), "
                               "delay to start another round of learning",
                               req.signature,
                               FMT_HOST_PORT_AND_IP(resp.config, primary));
            _potential_secondary_states.learning_round_is_running = false;
            _potential_secondary_states.delay_learning_task =
                tasking::create_task(LPC_DELAY_LEARN,
                                     &_tracker,
                                     std::bind(&replica::init_learn, this, req.signature),
                                     get_gpid().thread_hash());
            _potential_secondary_states.delay_learning_task->enqueue(std::chrono::seconds(1));
        } else {
            handle_learning_error(resp.err, false);
        }
        return;
    }

    if (resp.config.ballot > get_ballot()) {
        LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, update configuration because "
                        "ballot have changed",
                        req.signature,
                        FMT_HOST_PORT_AND_IP(resp.config, primary));
        CHECK(update_local_configuration(resp.config), "");
    }

    if (status() != partition_status::PS_POTENTIAL_SECONDARY) {
        LOG_ERROR_PREFIX(
            "on_learn_reply[{:#018x}]: learnee = {}, current_status = {}, stop learning",
            req.signature,
            FMT_HOST_PORT_AND_IP(resp.config, primary),
            enum_to_string(status()));
        return;
    }

    // local state is newer than learnee
    if (resp.last_committed_decree < _app->last_committed_decree()) {
        LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learner state is newer than "
                           "learnee (primary): {} vs {}, create new app",
                           req.signature,
                           FMT_HOST_PORT_AND_IP(resp.config, primary),
                           _app->last_committed_decree(),
                           resp.last_committed_decree);

        METRIC_VAR_INCREMENT(learn_resets);

        // close app
        auto err = _app->close(true);
        if (err != ERR_OK) {
            LOG_ERROR_PREFIX(
                "on_learn_reply[{:#018x}]: learnee = {}), close app (with clear_state=true) "
                "failed, err = {}",
                req.signature,
                FMT_HOST_PORT_AND_IP(resp.config, primary),
                err);
        }

        // backup old data dir
        if (err == ERR_OK) {
            std::string old_dir = _app->data_dir();
            if (dsn::utils::filesystem::directory_exists(old_dir)) {
                char rename_dir[1024];
                sprintf(rename_dir, "%s.%" PRIu64 ".discarded", old_dir.c_str(), dsn_now_us());
                CHECK(dsn::utils::filesystem::rename_path(old_dir, rename_dir),
                      "{}: failed to move directory from '{}' to '{}'",
                      name(),
                      old_dir,
                      rename_dir);
                LOG_WARNING_PREFIX("replica_dir_op succeed to move directory from '{}' to '{}'",
                                   old_dir,
                                   rename_dir);
            }
        }

        if (err == ERR_OK) {
            err = _app->open_new_internal(this, _private_log->on_partition_reset(get_gpid(), 0));

            if (err != ERR_OK) {
                LOG_ERROR_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, open app (with "
                                 "create_new=true) failed, err = {}",
                                 req.signature,
                                 FMT_HOST_PORT_AND_IP(resp.config, primary),
                                 err);
            }
        }

        if (err == ERR_OK) {
            CHECK_EQ_MSG(_app->last_committed_decree(), 0, "must be zero after app::open(true)");
            CHECK_EQ_MSG(_app->last_durable_decree(), 0, "must be zero after app::open(true)");

            // reset prepare list
            _prepare_list->reset(0);
        }

        if (err != ERR_OK) {
            _potential_secondary_states.learn_remote_files_task = tasking::create_task(
                LPC_LEARN_REMOTE_DELTA_FILES,
                &_tracker,
                [this,
                 err,
                 copy_start = _potential_secondary_states.duration_ms(),
                 req_cap = std::move(req),
                 resp_cap = std::move(resp)]() mutable {
                    on_copy_remote_state_completed(
                        err, 0, copy_start, std::move(req_cap), std::move(resp_cap));
                });
            _potential_secondary_states.learn_remote_files_task->enqueue();
            return;
        }
    }

    if (resp.type == learn_type::LT_APP) {
        if (++_stub->_learn_app_concurrent_count > FLAGS_learn_app_max_concurrent_count) {
            --_stub->_learn_app_concurrent_count;
            LOG_WARNING_PREFIX(
                "on_learn_reply[{:#018x}]: learnee = {}, learn_app_concurrent_count({}) >= "
                "FLAGS_learn_app_max_concurrent_count({}), skip this round",
                _potential_secondary_states.learning_version,
                FMT_HOST_PORT_AND_IP(_config, primary),
                _stub->_learn_app_concurrent_count,
                FLAGS_learn_app_max_concurrent_count);
            _potential_secondary_states.learning_round_is_running = false;
            return;
        } else {
            _potential_secondary_states.learn_app_concurrent_count_increased = true;
            LOG_INFO_PREFIX(
                "on_learn_reply[{:#018x}]: learnee = {}, ++learn_app_concurrent_count = {}",
                _potential_secondary_states.learning_version,
                FMT_HOST_PORT_AND_IP(_config, primary),
                _stub->_learn_app_concurrent_count.load());
        }
    }

    switch (resp.type) {
    case learn_type::LT_CACHE:
        METRIC_VAR_INCREMENT(learn_lt_cache_responses);
        break;
    case learn_type::LT_APP:
        METRIC_VAR_INCREMENT(learn_lt_app_responses);
        break;
    case learn_type::LT_LOG:
        METRIC_VAR_INCREMENT(learn_lt_log_responses);
        break;
    default:
        // do nothing
        break;
    }

    if (resp.prepare_start_decree != invalid_decree) {
        CHECK_EQ(resp.type, learn_type::LT_CACHE);
        CHECK(resp.state.files.empty(), "");
        CHECK_EQ(_potential_secondary_states.learning_status,
                 learner_status::LearningWithoutPrepare);
        _potential_secondary_states.learning_status = learner_status::LearningWithPrepareTransient;

        // reset log positions for later mutations
        // WARNING: it still requires checkpoint operation in later
        // on_copy_remote_state_completed to ensure the state is completed
        // if there is a failure in between, our checking
        // during app::open_internal will invalidate the logs
        // appended by the mutations AFTER current position
        err = _app->update_init_info(
            this,
            _private_log->on_partition_reset(get_gpid(), _app->last_committed_decree()),
            _app->last_committed_decree());

        // switch private log to make learning easier
        _private_log->demand_switch_file();

        // reset preparelist
        _potential_secondary_states.learning_start_prepare_decree = resp.prepare_start_decree;
        _prepare_list->truncate(_app->last_committed_decree());
        LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, truncate prepare list, "
                        "local_committed_decree = {}, current_learning_status = {}",
                        req.signature,
                        FMT_HOST_PORT_AND_IP(resp.config, primary),
                        _app->last_committed_decree(),
                        enum_to_string(_potential_secondary_states.learning_status));

        // persist incoming mutations into private log and apply them to prepare-list
        std::pair<decree, decree> cache_range;
        binary_reader reader(resp.state.meta);
        while (!reader.is_eof()) {
            auto mu = mutation::read_from(reader, nullptr);
            if (mu->data.header.decree > last_committed_decree()) {
                LOG_DEBUG_PREFIX("on_learn_reply[{:#018x}]: apply learned mutation {}",
                                 req.signature,
                                 mu->name());

                // write to private log with no callback, the later 2pc ensures that logs
                // are written to the disk
                _private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);

                // because private log are written without callback, need to manully set flag
                mu->set_logged();

                // then we prepare, it is possible that a committed mutation exists in learner's
                // prepare log,
                // but with DIFFERENT ballot. Reference https://github.com/imzhenyu/rDSN/issues/496
                mutation_ptr existing_mutation =
                    _prepare_list->get_mutation_by_decree(mu->data.header.decree);
                if (existing_mutation != nullptr &&
                    existing_mutation->data.header.ballot > mu->data.header.ballot) {
                    LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, mutation({}) exist on "
                                    "the learner with larger ballot {}",
                                    req.signature,
                                    FMT_HOST_PORT_AND_IP(resp.config, primary),
                                    mu->name(),
                                    existing_mutation->data.header.ballot);
                } else {
                    _prepare_list->prepare(mu, partition_status::PS_POTENTIAL_SECONDARY);
                }

                if (cache_range.first == 0 || mu->data.header.decree < cache_range.first)
                    cache_range.first = mu->data.header.decree;
                if (cache_range.second == 0 || mu->data.header.decree > cache_range.second)
                    cache_range.second = mu->data.header.decree;
            }
        }

        LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learn_duration = {} ms, apply "
                        "cache done, prepare_cache_range = <{}, {}>, local_committed_decree = {}, "
                        "app_committed_decree = {}, current_learning_status = {}",
                        req.signature,
                        FMT_HOST_PORT_AND_IP(resp.config, primary),
                        _potential_secondary_states.duration_ms(),
                        cache_range.first,
                        cache_range.second,
                        last_committed_decree(),
                        _app->last_committed_decree(),
                        enum_to_string(_potential_secondary_states.learning_status));

        // further states are synced using 2pc, and we must commit now as those later 2pc messages
        // thinks they should
        _prepare_list->commit(resp.prepare_start_decree - 1, COMMIT_TO_DECREE_HARD);
        CHECK_EQ(_prepare_list->last_committed_decree(), _app->last_committed_decree());
        CHECK(resp.state.files.empty(), "");

        // all state is complete
        CHECK_GE_MSG(_app->last_committed_decree() + 1,
                     _potential_secondary_states.learning_start_prepare_decree,
                     "state is incomplete");

        // go to next stage
        _potential_secondary_states.learning_status = learner_status::LearningWithPrepare;
        _potential_secondary_states.learn_remote_files_task = tasking::create_task(
            LPC_LEARN_REMOTE_DELTA_FILES,
            &_tracker,
            [this,
             err,
             copy_start = _potential_secondary_states.duration_ms(),
             req_cap = std::move(req),
             resp_cap = std::move(resp)]() mutable {
                on_copy_remote_state_completed(
                    err, 0, copy_start, std::move(req_cap), std::move(resp_cap));
            });
        _potential_secondary_states.learn_remote_files_task->enqueue();
    }

    else if (resp.state.files.size() > 0) {
        auto learn_dir = _app->learn_dir();
        utils::filesystem::remove_path(learn_dir);
        utils::filesystem::create_directory(learn_dir);

        if (!dsn::utils::filesystem::directory_exists(learn_dir)) {
            LOG_ERROR_PREFIX(
                "on_learn_reply[{:#018x}]: learnee = {}, create replica learn dir {} failed",
                req.signature,
                FMT_HOST_PORT_AND_IP(resp.config, primary),
                learn_dir);

            _potential_secondary_states.learn_remote_files_task =
                tasking::create_task(LPC_LEARN_REMOTE_DELTA_FILES,
                                     &_tracker,
                                     [this,
                                      copy_start = _potential_secondary_states.duration_ms(),
                                      req_cap = std::move(req),
                                      resp_cap = std::move(resp)]() mutable {
                                         on_copy_remote_state_completed(ERR_FILE_OPERATION_FAILED,
                                                                        0,
                                                                        copy_start,
                                                                        std::move(req_cap),
                                                                        std::move(resp_cap));
                                     });
            _potential_secondary_states.learn_remote_files_task->enqueue();
            return;
        }

        bool high_priority = (resp.type == learn_type::LT_APP ? false : true);
        LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learn_duration = {} ms, start to "
                        "copy remote files, copy_file_count = {}, priority = {}",
                        req.signature,
                        FMT_HOST_PORT_AND_IP(resp.config, primary),
                        _potential_secondary_states.duration_ms(),
                        resp.state.files.size(),
                        high_priority ? "high" : "low");

        host_port primary;
        GET_HOST_PORT(resp.config, primary, primary);
        _potential_secondary_states.learn_remote_files_task = _stub->_nfs->copy_remote_files(
            primary,
            resp.replica_disk_tag,
            resp.base_local_dir,
            resp.state.files,
            _dir_node->tag,
            learn_dir,
            get_gpid(),
            true, // overwrite
            high_priority,
            LPC_REPLICATION_COPY_REMOTE_FILES,
            &_tracker,
            [this,
             copy_start = _potential_secondary_states.duration_ms(),
             req_cap = std::move(req),
             resp_copy = resp](error_code err, size_t sz) mutable {
                on_copy_remote_state_completed(
                    err, sz, copy_start, std::move(req_cap), std::move(resp_copy));
            });
    } else {
        _potential_secondary_states.learn_remote_files_task = tasking::create_task(
            LPC_LEARN_REMOTE_DELTA_FILES,
            &_tracker,
            [this,
             copy_start = _potential_secondary_states.duration_ms(),
             req_cap = std::move(req),
             resp_cap = std::move(resp)]() mutable {
                on_copy_remote_state_completed(
                    ERR_OK, 0, copy_start, std::move(req_cap), std::move(resp_cap));
            });
        _potential_secondary_states.learn_remote_files_task->enqueue();
    }
}