in utilities/backupable/backupable_db.cc [997:1268]
IOStatus BackupEngineImpl::Initialize() {
assert(!initialized_);
initialized_ = true;
if (read_only_) {
ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
}
options_.Dump(options_.info_log);
auto meta_path = GetAbsolutePath(kMetaDirName);
if (!read_only_) {
// we might need to clean up from previous crash or I/O errors
might_need_garbage_collect_ = true;
if (options_.max_valid_backups_to_open != port::kMaxInt32) {
options_.max_valid_backups_to_open = port::kMaxInt32;
ROCKS_LOG_WARN(
options_.info_log,
"`max_valid_backups_to_open` is not set to the default value. Ignoring "
"its value since BackupEngine is not read-only.");
}
// gather the list of directories that we need to create
std::vector<std::pair<std::string, std::unique_ptr<FSDirectory>*>>
directories;
directories.emplace_back(GetAbsolutePath(), &backup_directory_);
if (options_.share_table_files) {
if (options_.share_files_with_checksum) {
directories.emplace_back(
GetAbsolutePath(GetSharedFileWithChecksumRel()),
&shared_directory_);
} else {
directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
&shared_directory_);
}
}
directories.emplace_back(GetAbsolutePath(kPrivateDirName),
&private_directory_);
directories.emplace_back(meta_path, &meta_directory_);
// create all the dirs we need
for (const auto& d : directories) {
IOStatus io_s =
backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr);
if (io_s.ok()) {
io_s =
backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr);
}
if (!io_s.ok()) {
return io_s;
}
}
}
std::vector<std::string> backup_meta_files;
{
IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_,
&backup_meta_files, nullptr);
if (io_s.IsNotFound()) {
return IOStatus::NotFound(meta_path + " is missing");
} else if (!io_s.ok()) {
return io_s;
}
}
// create backups_ structure
for (auto& file : backup_meta_files) {
ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
BackupID backup_id = 0;
sscanf(file.c_str(), "%u", &backup_id);
if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
// Invalid file name, will be deleted with auto-GC when user
// initiates an append or write operation. (Behave as read-only until
// then.)
ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s",
file.c_str());
continue;
}
assert(backups_.find(backup_id) == backups_.end());
// Insert all the (backup_id, BackupMeta) that will be loaded later
// The loading performed later will check whether there are corrupt backups
// and move the corrupt backups to corrupt_backups_
backups_.insert(std::make_pair(
backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
GetBackupMetaFile(backup_id, false /* tmp */),
GetBackupMetaFile(backup_id, true /* tmp */),
&backuped_file_infos_, backup_env_, backup_fs_))));
}
latest_backup_id_ = 0;
latest_valid_backup_id_ = 0;
if (options_.destroy_old_data) { // Destroy old data
assert(!read_only_);
ROCKS_LOG_INFO(
options_.info_log,
"Backup Engine started with destroy_old_data == true, deleting all "
"backups");
IOStatus io_s = PurgeOldBackups(0);
if (io_s.ok()) {
io_s = GarbageCollect();
}
if (!io_s.ok()) {
return io_s;
}
} else { // Load data from storage
// abs_path_to_size: maps absolute paths of files in backup directory to
// their corresponding sizes
std::unordered_map<std::string, uint64_t> abs_path_to_size;
// Insert files and their sizes in backup sub-directories (shared and
// shared_checksum) to abs_path_to_size
for (const auto& rel_dir :
{GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
const auto abs_dir = GetAbsolutePath(rel_dir);
IOStatus io_s =
ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size);
if (!io_s.ok()) {
// I/O error likely impacting all backups
return io_s;
}
}
// load the backups if any, until valid_backups_to_open of the latest
// non-corrupted backups have been successfully opened.
int valid_backups_to_open = options_.max_valid_backups_to_open;
for (auto backup_iter = backups_.rbegin();
backup_iter != backups_.rend();
++backup_iter) {
assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
if (latest_backup_id_ == 0) {
latest_backup_id_ = backup_iter->first;
}
if (valid_backups_to_open == 0) {
break;
}
// Insert files and their sizes in backup sub-directories
// (private/backup_id) to abs_path_to_size
IOStatus io_s = ReadChildFileCurrentSizes(
GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_,
&abs_path_to_size);
if (io_s.ok()) {
io_s = backup_iter->second->LoadFromFile(
options_.backup_dir, abs_path_to_size,
options_.backup_rate_limiter.get(), options_.info_log,
&reported_ignored_fields_);
}
if (io_s.IsCorruption() || io_s.IsNotSupported()) {
ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
backup_iter->first, io_s.ToString().c_str());
corrupt_backups_.insert(std::make_pair(
backup_iter->first,
std::make_pair(io_s, std::move(backup_iter->second))));
} else if (!io_s.ok()) {
// Distinguish corruption errors from errors in the backup Env.
// Errors in the backup Env (i.e., this code path) will cause Open() to
// fail, whereas corruption errors would not cause Open() failures.
return io_s;
} else {
ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
backup_iter->first,
backup_iter->second->GetInfoString().c_str());
assert(latest_valid_backup_id_ == 0 ||
latest_valid_backup_id_ > backup_iter->first);
if (latest_valid_backup_id_ == 0) {
latest_valid_backup_id_ = backup_iter->first;
}
--valid_backups_to_open;
}
}
for (const auto& corrupt : corrupt_backups_) {
backups_.erase(backups_.find(corrupt.first));
}
// erase the backups before max_valid_backups_to_open
int num_unopened_backups;
if (options_.max_valid_backups_to_open == 0) {
num_unopened_backups = 0;
} else {
num_unopened_backups =
std::max(0, static_cast<int>(backups_.size()) -
options_.max_valid_backups_to_open);
}
for (int i = 0; i < num_unopened_backups; ++i) {
assert(backups_.begin()->second->Empty());
backups_.erase(backups_.begin());
}
}
ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
latest_valid_backup_id_);
// set up threads perform copies from files_to_copy_or_create_ in the
// background
threads_cpu_priority_ = CpuPriority::kNormal;
threads_.reserve(options_.max_background_operations);
for (int t = 0; t < options_.max_background_operations; t++) {
threads_.emplace_back([this]() {
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
pthread_setname_np(pthread_self(), "backup_engine");
#endif
#endif
CpuPriority current_priority = CpuPriority::kNormal;
CopyOrCreateWorkItem work_item;
uint64_t bytes_toward_next_callback = 0;
while (files_to_copy_or_create_.read(work_item)) {
CpuPriority priority = threads_cpu_priority_;
if (current_priority != priority) {
TEST_SYNC_POINT_CALLBACK(
"BackupEngineImpl::Initialize:SetCpuPriority", &priority);
port::SetCpuPriority(0, priority);
current_priority = priority;
}
// `bytes_read` and `bytes_written` stats are enabled based on
// compile-time support and cannot be dynamically toggled. So we do not
// need to worry about `PerfLevel` here, unlike many other
// `IOStatsContext` / `PerfContext` stats.
uint64_t prev_bytes_read = IOSTATS(bytes_read);
uint64_t prev_bytes_written = IOSTATS(bytes_written);
CopyOrCreateResult result;
Temperature temp = work_item.src_temperature;
result.io_status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.size_limit, work_item.src_env, work_item.dst_env,
work_item.src_env_options, work_item.sync, work_item.rate_limiter,
work_item.progress_callback, &temp, work_item.dst_temperature,
&bytes_toward_next_callback, &result.size, &result.checksum_hex);
RecordTick(work_item.stats, BACKUP_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);
result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
result.expected_src_temperature = work_item.src_temperature;
result.current_src_temperature = temp;
if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
// unknown checksum function name implies no db table file checksum in
// db manifest; work_item.src_checksum_hex not empty means
// backup engine has calculated its crc32c checksum for the table
// file; therefore, we are able to compare the checksums.
if (work_item.src_checksum_func_name ==
kUnknownFileChecksumFuncName ||
work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
if (work_item.src_checksum_hex != result.checksum_hex) {
std::string checksum_info(
"Expected checksum is " + work_item.src_checksum_hex +
" while computed checksum is " + result.checksum_hex);
result.io_status = IOStatus::Corruption(
"Checksum mismatch after copying to " + work_item.dst_path +
": " + checksum_info);
}
} else {
// FIXME(peterd): dead code?
std::string checksum_function_info(
"Existing checksum function is " +
work_item.src_checksum_func_name +
" while provided checksum function is " +
kBackupFileChecksumFuncName);
ROCKS_LOG_INFO(
options_.info_log,
"Unable to verify checksum after copying to %s: %s\n",
work_item.dst_path.c_str(), checksum_function_info.c_str());
}
}
work_item.result.set_value(std::move(result));
}
});
}
ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
return IOStatus::OK();
}