in src/replica/replica_backup.cpp [77:255]
void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response)
{
_checker.only_one_thread_access();
const std::string &policy_name = request.policy.policy_name;
auto backup_id = request.backup_id;
cold_backup_context_ptr new_context(
new cold_backup_context(this, request, FLAGS_max_concurrent_uploading_file_count));
LOG_INFO_PREFIX("{}: received cold backup request, partition_status = {}",
new_context->name,
enum_to_string(status()));
if (status() == partition_status::type::PS_PRIMARY ||
status() == partition_status::type::PS_SECONDARY) {
cold_backup_context_ptr backup_context = nullptr;
auto find = _cold_backup_contexts.find(policy_name);
if (find != _cold_backup_contexts.end()) {
backup_context = find->second;
} else {
/// TODO: policy may change provider
dist::block_service::block_filesystem *block_service =
_stub->_block_service_manager.get_or_create_block_filesystem(
request.policy.backup_provider_type);
if (block_service == nullptr) {
LOG_ERROR(
"{}: create cold backup block service failed, provider_type = {}, response "
"ERR_INVALID_PARAMETERS",
new_context->name,
request.policy.backup_provider_type);
response.err = ERR_INVALID_PARAMETERS;
return;
}
auto r = _cold_backup_contexts.insert(std::make_pair(policy_name, new_context));
CHECK(r.second, "");
backup_context = r.first->second;
backup_context->block_service = block_service;
backup_context->backup_root = request.__isset.backup_path
? dsn::utils::filesystem::path_combine(
request.backup_path, FLAGS_cold_backup_root)
: FLAGS_cold_backup_root;
}
CHECK_EQ_PREFIX(backup_context->request.policy.policy_name, policy_name);
cold_backup_status backup_status = backup_context->status();
if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) {
if (backup_status == ColdBackupCheckpointing) {
LOG_INFO("{}: delay clearing obsoleted cold backup context, cause backup_status == "
"ColdBackupCheckpointing",
new_context->name);
tasking::enqueue(
LPC_REPLICATION_COLD_BACKUP,
&_tracker,
[this, request]() {
backup_response response;
on_cold_backup(request, response);
},
get_gpid().thread_hash(),
std::chrono::seconds(100));
} else {
// TODO(wutao1): deleting cold backup context should be
// extracted as a function like try_delete_cold_backup_context;
// clear obsoleted backup context firstly
LOG_INFO("{}: clear obsoleted cold backup context, old_backup_id = {}, "
"old_backup_status = {}",
new_context->name,
backup_context->request.backup_id,
cold_backup_status_to_string(backup_status));
backup_context->cancel();
_cold_backup_contexts.erase(policy_name);
// go to another round
on_cold_backup(request, response);
}
return;
}
if (backup_context->request.backup_id > backup_id) {
// backup_id is outdated
LOG_ERROR("{}: request outdated cold backup, current_backup_id = {}, response "
"ERR_VERSION_OUTDATED",
new_context->name,
backup_context->request.backup_id);
response.err = ERR_VERSION_OUTDATED;
return;
}
// for secondary, request is already filtered by primary, so if
// request is repeated, so generate_backup_checkpoint is already running, we do
// nothing;
// request is new, we should call generate_backup_checkpoint;
// TODO: if secondary's status have changed, how to process the _cold_backup_state,
// and how to process the backup_status, cancel/pause
if (status() == partition_status::PS_SECONDARY) {
if (backup_status == ColdBackupInvalid) {
// new backup_request, should set status to ColdBackupChecked to allow secondary
// can start to checkpoint
backup_context->start_check();
backup_context->complete_check(false);
if (backup_context->start_checkpoint()) {
METRIC_VAR_INCREMENT(backup_started_count);
tasking::enqueue(
LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
generate_backup_checkpoint(backup_context);
});
}
}
return;
}
send_backup_request_to_secondary(request);
if (backup_status == ColdBackupChecking || backup_status == ColdBackupCheckpointing ||
backup_status == ColdBackupUploading) {
// do nothing
LOG_INFO("{}: backup is busy, status = {}, progress = {}, response ERR_BUSY",
backup_context->name,
cold_backup_status_to_string(backup_status),
backup_context->progress());
response.err = ERR_BUSY;
} else if (backup_status == ColdBackupInvalid && backup_context->start_check()) {
METRIC_VAR_INCREMENT(backup_started_count);
LOG_INFO("{}: start checking backup on remote, response ERR_BUSY",
backup_context->name);
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, nullptr, [backup_context]() {
backup_context->check_backup_on_remote();
});
response.err = ERR_BUSY;
} else if (backup_status == ColdBackupChecked && backup_context->start_checkpoint()) {
// start generating checkpoint
LOG_INFO("{}: start generating checkpoint, response ERR_BUSY", backup_context->name);
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, backup_context]() {
generate_backup_checkpoint(backup_context);
});
response.err = ERR_BUSY;
} else if ((backup_status == ColdBackupCheckpointed || backup_status == ColdBackupPaused) &&
backup_context->start_upload()) {
// start uploading checkpoint
LOG_INFO("{}: start uploading checkpoint, response ERR_BUSY", backup_context->name);
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, nullptr, [backup_context]() {
backup_context->upload_checkpoint_to_remote();
});
response.err = ERR_BUSY;
} else if (backup_status == ColdBackupFailed) {
LOG_ERROR("{}: upload checkpoint failed, reason = {}, response ERR_LOCAL_APP_FAILURE",
backup_context->name,
backup_context->reason());
response.err = ERR_LOCAL_APP_FAILURE;
backup_context->cancel();
_cold_backup_contexts.erase(policy_name);
} else if (backup_status == ColdBackupCompleted) {
LOG_INFO("{}: upload checkpoint completed, response ERR_OK", backup_context->name);
_backup_mgr->send_clear_request_to_secondaries(backup_context->request.pid,
policy_name);
// clear local checkpoint dirs in background thread
_backup_mgr->background_clear_backup_checkpoint(policy_name);
response.err = ERR_OK;
} else {
LOG_WARNING(
"{}: unhandled case, handle_status = {}, real_time_status = {}, response ERR_BUSY",
backup_context->name,
cold_backup_status_to_string(backup_status),
cold_backup_status_to_string(backup_context->status()));
response.err = ERR_BUSY;
}
response.progress = backup_context->progress();
response.checkpoint_total_size = backup_context->get_checkpoint_total_size();
LOG_INFO("{}: backup progress is {}", backup_context->name, response.progress);
} else {
LOG_ERROR(
"{}: invalid state for cold backup, partition_status = {}, response ERR_INVALID_STATE",
new_context->name,
enum_to_string(status()));
response.err = ERR_INVALID_STATE;
}
}