void replica::on_cold_backup()

in src/replica/replica_backup.cpp [77:255]


void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response)
{
    _checker.only_one_thread_access();

    const std::string &policy_name = request.policy.policy_name;
    auto backup_id = request.backup_id;
    cold_backup_context_ptr new_context(
        new cold_backup_context(this, request, FLAGS_max_concurrent_uploading_file_count));

    LOG_INFO_PREFIX("{}: received cold backup request, partition_status = {}",
                    new_context->name,
                    enum_to_string(status()));

    if (status() == partition_status::type::PS_PRIMARY ||
        status() == partition_status::type::PS_SECONDARY) {
        cold_backup_context_ptr backup_context = nullptr;
        auto find = _cold_backup_contexts.find(policy_name);
        if (find != _cold_backup_contexts.end()) {
            backup_context = find->second;
        } else {
            /// TODO: policy may change provider
            dist::block_service::block_filesystem *block_service =
                _stub->_block_service_manager.get_or_create_block_filesystem(
                    request.policy.backup_provider_type);
            if (block_service == nullptr) {
                LOG_ERROR(
                    "{}: create cold backup block service failed, provider_type = {}, response "
                    "ERR_INVALID_PARAMETERS",
                    new_context->name,
                    request.policy.backup_provider_type);
                response.err = ERR_INVALID_PARAMETERS;
                return;
            }
            auto r = _cold_backup_contexts.insert(std::make_pair(policy_name, new_context));
            CHECK(r.second, "");
            backup_context = r.first->second;
            backup_context->block_service = block_service;
            backup_context->backup_root = request.__isset.backup_path
                                              ? dsn::utils::filesystem::path_combine(
                                                    request.backup_path, FLAGS_cold_backup_root)
                                              : FLAGS_cold_backup_root;
        }

        CHECK_EQ_PREFIX(backup_context->request.policy.policy_name, policy_name);
        cold_backup_status backup_status = backup_context->status();

        if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) {
            if (backup_status == ColdBackupCheckpointing) {
                LOG_INFO("{}: delay clearing obsoleted cold backup context, cause backup_status == "
                         "ColdBackupCheckpointing",
                         new_context->name);
                tasking::enqueue(
                    LPC_REPLICATION_COLD_BACKUP,
                    &_tracker,
                    [this, request]() {
                        backup_response response;
                        on_cold_backup(request, response);
                    },
                    get_gpid().thread_hash(),
                    std::chrono::seconds(100));
            } else {
                // TODO(wutao1): deleting cold backup context should be
                //               extracted as a function like try_delete_cold_backup_context;
                // clear obsoleted backup context firstly
                LOG_INFO("{}: clear obsoleted cold backup context, old_backup_id = {}, "
                         "old_backup_status = {}",
                         new_context->name,
                         backup_context->request.backup_id,
                         cold_backup_status_to_string(backup_status));
                backup_context->cancel();
                _cold_backup_contexts.erase(policy_name);
                // go to another round
                on_cold_backup(request, response);
            }
            return;
        }

        if (backup_context->request.backup_id > backup_id) {
            // backup_id is outdated
            LOG_ERROR("{}: request outdated cold backup, current_backup_id = {}, response "
                      "ERR_VERSION_OUTDATED",
                      new_context->name,
                      backup_context->request.backup_id);
            response.err = ERR_VERSION_OUTDATED;
            return;
        }

        // for secondary, request is already filtered by primary, so if
        //      request is repeated, so generate_backup_checkpoint is already running, we do
        //      nothing;
        //      request is new, we should call generate_backup_checkpoint;

        // TODO: if secondary's status have changed, how to process the _cold_backup_state,
        // and how to process the backup_status, cancel/pause
        if (status() == partition_status::PS_SECONDARY) {
            if (backup_status == ColdBackupInvalid) {
                // new backup_request, should set status to ColdBackupChecked to allow secondary
                // can start to checkpoint
                backup_context->start_check();
                backup_context->complete_check(false);
                if (backup_context->start_checkpoint()) {
                    METRIC_VAR_INCREMENT(backup_started_count);
                    tasking::enqueue(
                        LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
                            generate_backup_checkpoint(backup_context);
                        });
                }
            }
            return;
        }

        send_backup_request_to_secondary(request);

        if (backup_status == ColdBackupChecking || backup_status == ColdBackupCheckpointing ||
            backup_status == ColdBackupUploading) {
            // do nothing
            LOG_INFO("{}: backup is busy, status = {}, progress = {}, response ERR_BUSY",
                     backup_context->name,
                     cold_backup_status_to_string(backup_status),
                     backup_context->progress());
            response.err = ERR_BUSY;
        } else if (backup_status == ColdBackupInvalid && backup_context->start_check()) {
            METRIC_VAR_INCREMENT(backup_started_count);
            LOG_INFO("{}: start checking backup on remote, response ERR_BUSY",
                     backup_context->name);
            tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, nullptr, [backup_context]() {
                backup_context->check_backup_on_remote();
            });
            response.err = ERR_BUSY;
        } else if (backup_status == ColdBackupChecked && backup_context->start_checkpoint()) {
            // start generating checkpoint
            LOG_INFO("{}: start generating checkpoint, response ERR_BUSY", backup_context->name);
            tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
                generate_backup_checkpoint(backup_context);
            });
            response.err = ERR_BUSY;
        } else if ((backup_status == ColdBackupCheckpointed || backup_status == ColdBackupPaused) &&
                   backup_context->start_upload()) {
            // start uploading checkpoint
            LOG_INFO("{}: start uploading checkpoint, response ERR_BUSY", backup_context->name);
            tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, nullptr, [backup_context]() {
                backup_context->upload_checkpoint_to_remote();
            });
            response.err = ERR_BUSY;
        } else if (backup_status == ColdBackupFailed) {
            LOG_ERROR("{}: upload checkpoint failed, reason = {}, response ERR_LOCAL_APP_FAILURE",
                      backup_context->name,
                      backup_context->reason());
            response.err = ERR_LOCAL_APP_FAILURE;
            backup_context->cancel();
            _cold_backup_contexts.erase(policy_name);
        } else if (backup_status == ColdBackupCompleted) {
            LOG_INFO("{}: upload checkpoint completed, response ERR_OK", backup_context->name);
            _backup_mgr->send_clear_request_to_secondaries(backup_context->request.pid,
                                                           policy_name);

            // clear local checkpoint dirs in background thread
            _backup_mgr->background_clear_backup_checkpoint(policy_name);
            response.err = ERR_OK;
        } else {
            LOG_WARNING(
                "{}: unhandled case, handle_status = {}, real_time_status = {}, response ERR_BUSY",
                backup_context->name,
                cold_backup_status_to_string(backup_status),
                cold_backup_status_to_string(backup_context->status()));
            response.err = ERR_BUSY;
        }

        response.progress = backup_context->progress();
        response.checkpoint_total_size = backup_context->get_checkpoint_total_size();
        LOG_INFO("{}: backup progress is {}", backup_context->name, response.progress);
    } else {
        LOG_ERROR(
            "{}: invalid state for cold backup, partition_status = {}, response ERR_INVALID_STATE",
            new_context->name,
            enum_to_string(status()));
        response.err = ERR_INVALID_STATE;
    }
}