in utilities/backupable/backupable_db.cc [1270:1542]
IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
const CreateBackupOptions& options, DB* db, const std::string& app_metadata,
BackupID* new_backup_id_ptr) {
assert(initialized_);
assert(!read_only_);
if (app_metadata.size() > kMaxAppMetaSize) {
return IOStatus::InvalidArgument("App metadata too large");
}
if (options.decrease_background_thread_cpu_priority) {
if (options.background_thread_cpu_priority < threads_cpu_priority_) {
threads_cpu_priority_.store(options.background_thread_cpu_priority);
}
}
BackupID new_backup_id = latest_backup_id_ + 1;
// `bytes_read` and `bytes_written` stats are enabled based on compile-time
// support and cannot be dynamically toggled. So we do not need to worry about
// `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats.
uint64_t prev_bytes_read = IOSTATS(bytes_read);
uint64_t prev_bytes_written = IOSTATS(bytes_written);
assert(backups_.find(new_backup_id) == backups_.end());
auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr);
if (io_s.ok()) {
// maybe last backup failed and left partial state behind, clean it up.
// need to do this before updating backups_ such that a private dir
// named after new_backup_id will be cleaned up.
// (If an incomplete new backup is followed by an incomplete delete
// of the latest full backup, then there could be more than one next
// id with a private dir, the last thing to be deleted in delete
// backup, but all will be cleaned up with a GarbageCollect.)
io_s = GarbageCollect();
} else if (io_s.IsNotFound()) {
// normal case, the new backup's private dir doesn't exist yet
io_s = IOStatus::OK();
}
auto ret = backups_.insert(std::make_pair(
new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
GetBackupMetaFile(new_backup_id, false /* tmp */),
GetBackupMetaFile(new_backup_id, true /* tmp */),
&backuped_file_infos_, backup_env_, backup_fs_))));
assert(ret.second == true);
auto& new_backup = ret.first->second;
new_backup->RecordTimestamp();
new_backup->SetAppMetadata(app_metadata);
auto start_backup = backup_env_->NowMicros();
ROCKS_LOG_INFO(options_.info_log,
"Started the backup process -- creating backup %u",
new_backup_id);
if (options_.share_table_files && !options_.share_files_with_checksum) {
ROCKS_LOG_WARN(options_.info_log,
"BackupEngineOptions::share_files_with_checksum=false is "
"DEPRECATED and could lead to data loss.");
}
if (io_s.ok()) {
io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr);
}
// A set into which we will insert the dst_paths that are calculated for live
// files and live WAL files.
// This is used to check whether a live files shares a dst_path with another
// live file.
std::unordered_set<std::string> live_dst_paths;
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyOrCreateWorkItem to the channel for each live file
Status disabled = db->DisableFileDeletions();
DBOptions db_options = db->GetDBOptions();
Statistics* stats = db_options.statistics.get();
if (io_s.ok()) {
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
FileChecksumGenFactory* db_checksum_factory =
db_options.file_checksum_gen_factory.get();
const std::string kFileChecksumGenFactoryName =
"FileChecksumGenCrc32cFactory";
bool compare_checksum =
db_checksum_factory != nullptr &&
db_checksum_factory->Name() == kFileChecksumGenFactoryName
? true
: false;
EnvOptions src_raw_env_options(db_options);
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint(
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
FileType) {
// custom checkpoint will switch to calling copy_file_cb after it sees
// NotSupported returned from link_file_cb.
return IOStatus::NotSupported();
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType type,
const std::string& checksum_func_name,
const std::string& checksum_val,
const Temperature src_temperature) {
if (type == kWalFile && !options_.backup_log_files) {
return IOStatus::OK();
}
Log(options_.info_log, "add file for backup %s", fname.c_str());
uint64_t size_bytes = 0;
IOStatus io_st;
if (type == kTableFile || type == kBlobFile) {
io_st = db_fs_->GetFileSize(src_dirname + "/" + fname, io_options_,
&size_bytes, nullptr);
}
EnvOptions src_env_options;
switch (type) {
case kWalFile:
src_env_options =
db_env_->OptimizeForLogRead(src_raw_env_options);
break;
case kTableFile:
src_env_options = db_env_->OptimizeForCompactionTableRead(
src_raw_env_options, ImmutableDBOptions(db_options));
break;
case kDescriptorFile:
src_env_options =
db_env_->OptimizeForManifestRead(src_raw_env_options);
break;
case kBlobFile:
src_env_options = db_env_->OptimizeForBlobFileRead(
src_raw_env_options, ImmutableDBOptions(db_options));
break;
default:
// Other backed up files (like options file) are not read by live
// DB, so don't need to worry about avoiding mixing buffered and
// direct I/O. Just use plain defaults.
src_env_options = src_raw_env_options;
break;
}
if (io_st.ok()) {
io_st = AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
options_.share_table_files &&
(type == kTableFile || type == kBlobFile),
src_dirname, fname, src_env_options, rate_limiter, type,
size_bytes, db_options.statistics.get(), size_limit_bytes,
options_.share_files_with_checksum &&
(type == kTableFile || type == kBlobFile),
options.progress_callback, "" /* contents */,
checksum_func_name, checksum_val, src_temperature);
}
return io_st;
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents,
FileType type) {
Log(options_.info_log, "add file for backup %s", fname.c_str());
return AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
false /* shared */, "" /* src_dir */, fname,
EnvOptions() /* src_env_options */, rate_limiter, type,
contents.size(), db_options.statistics.get(), 0 /* size_limit */,
false /* shared_checksum */, options.progress_callback, contents);
} /* create_file_cb */,
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
compare_checksum));
if (io_s.ok()) {
new_backup->SetSequenceNumber(sequence_number);
}
}
ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
IOStatus item_io_status;
for (auto& item : backup_items_to_finish) {
item.result.wait();
auto result = item.result.get();
item_io_status = result.io_status;
Temperature temp = result.expected_src_temperature;
if (result.current_src_temperature != Temperature::kUnknown &&
(temp == Temperature::kUnknown ||
options_.current_temperatures_override_manifest)) {
temp = result.current_src_temperature;
}
if (item_io_status.ok() && item.shared && item.needed_to_copy) {
item_io_status = item.backup_env->GetFileSystem()->RenameFile(
item.dst_path_tmp, item.dst_path, io_options_, nullptr);
}
if (item_io_status.ok()) {
item_io_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
item.dst_relative, result.size, result.checksum_hex, result.db_id,
result.db_session_id, temp));
}
if (!item_io_status.ok()) {
io_s = item_io_status;
}
}
// we copied all the files, enable file deletions
if (disabled.ok()) { // If we successfully disabled file deletions
db->EnableFileDeletions(false).PermitUncheckedError();
}
auto backup_time = backup_env_->NowMicros() - start_backup;
if (io_s.ok()) {
// persist the backup metadata on the disk
io_s = new_backup->StoreToFile(options_.sync, options_.schema_version,
schema_test_options_.get());
}
if (io_s.ok() && options_.sync) {
std::unique_ptr<FSDirectory> backup_private_directory;
backup_fs_
->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
io_options_, &backup_private_directory, nullptr)
.PermitUncheckedError();
if (backup_private_directory != nullptr) {
io_s = backup_private_directory->FsyncWithDirOptions(io_options_, nullptr,
DirFsyncOptions());
}
if (io_s.ok() && private_directory_ != nullptr) {
io_s = private_directory_->FsyncWithDirOptions(io_options_, nullptr,
DirFsyncOptions());
}
if (io_s.ok() && meta_directory_ != nullptr) {
io_s = meta_directory_->FsyncWithDirOptions(io_options_, nullptr,
DirFsyncOptions());
}
if (io_s.ok() && shared_directory_ != nullptr) {
io_s = shared_directory_->FsyncWithDirOptions(io_options_, nullptr,
DirFsyncOptions());
}
if (io_s.ok() && backup_directory_ != nullptr) {
io_s = backup_directory_->FsyncWithDirOptions(io_options_, nullptr,
DirFsyncOptions());
}
}
if (io_s.ok()) {
backup_statistics_.IncrementNumberSuccessBackup();
// here we know that we succeeded and installed the new backup
latest_backup_id_ = new_backup_id;
latest_valid_backup_id_ = new_backup_id;
if (new_backup_id_ptr) {
*new_backup_id_ptr = new_backup_id;
}
ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
// backup_speed is in byte/second
double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
new_backup->GetNumberFiles());
char human_size[16];
AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
backup_time);
ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
backup_statistics_.ToString().c_str());
} else {
backup_statistics_.IncrementNumberFailBackup();
// clean all the files we might have created
ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
io_s.ToString().c_str());
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
backup_statistics_.ToString().c_str());
// delete files that we might have already written
might_need_garbage_collect_ = true;
DeleteBackup(new_backup_id).PermitUncheckedError();
}
RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);
return io_s;
}