in db_stress_tool/db_stress_test_base.cc [1424:1672]
Status StressTest::TestBackupRestore(
ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
BackupEngineOptions backup_opts(backup_dir);
// For debugging, get info_log from live options
backup_opts.info_log = db_->GetDBOptions().info_log.get();
if (thread->rand.OneIn(10)) {
backup_opts.share_table_files = false;
} else {
backup_opts.share_table_files = true;
if (thread->rand.OneIn(5)) {
backup_opts.share_files_with_checksum = false;
} else {
backup_opts.share_files_with_checksum = true;
if (thread->rand.OneIn(2)) {
// old
backup_opts.share_files_with_checksum_naming =
BackupEngineOptions::kLegacyCrc32cAndFileSize;
} else {
// new
backup_opts.share_files_with_checksum_naming =
BackupEngineOptions::kUseDbSessionId;
}
if (thread->rand.OneIn(2)) {
backup_opts.share_files_with_checksum_naming =
backup_opts.share_files_with_checksum_naming |
BackupEngineOptions::kFlagIncludeFileSize;
}
}
}
if (thread->rand.OneIn(2)) {
backup_opts.schema_version = 1;
} else {
backup_opts.schema_version = 2;
}
BackupEngine* backup_engine = nullptr;
std::string from = "a backup/restore operation";
Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
if (!s.ok()) {
from = "BackupEngine::Open";
}
if (s.ok()) {
if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) {
TEST_BackupMetaSchemaOptions test_opts;
test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
test_opts.file_sizes = thread->rand.OneIn(2) == 0;
TEST_SetBackupMetaSchemaOptions(backup_engine, test_opts);
}
CreateBackupOptions create_opts;
if (FLAGS_disable_wal) {
// The verification can only work when latest value of `key` is backed up,
// which requires flushing in case of WAL disabled.
//
// Note this triggers a flush with a key lock held. Meanwhile, operations
// like flush/compaction may attempt to grab key locks like in
// `DbStressCompactionFilter`. The philosophy around preventing deadlock
// is the background operation key lock acquisition only tries but does
// not wait for the lock. So here in the foreground it is OK to hold the
// lock and wait on a background operation (flush).
create_opts.flush_before_backup = true;
}
s = backup_engine->CreateNewBackup(create_opts, db_);
if (!s.ok()) {
from = "BackupEngine::CreateNewBackup";
}
}
if (s.ok()) {
delete backup_engine;
backup_engine = nullptr;
s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
if (!s.ok()) {
from = "BackupEngine::Open (again)";
}
}
std::vector<BackupInfo> backup_info;
// If inplace_not_restore, we verify the backup by opening it as a
// read-only DB. If !inplace_not_restore, we restore it to a temporary
// directory for verification.
bool inplace_not_restore = thread->rand.OneIn(3);
if (s.ok()) {
backup_engine->GetBackupInfo(&backup_info,
/*include_file_details*/ inplace_not_restore);
if (backup_info.empty()) {
s = Status::NotFound("no backups found");
from = "BackupEngine::GetBackupInfo";
}
}
if (s.ok() && thread->rand.OneIn(2)) {
s = backup_engine->VerifyBackup(
backup_info.front().backup_id,
thread->rand.OneIn(2) /* verify_with_checksum */);
if (!s.ok()) {
from = "BackupEngine::VerifyBackup";
}
}
const bool allow_persistent = thread->tid == 0; // not too many
bool from_latest = false;
int count = static_cast<int>(backup_info.size());
if (s.ok() && !inplace_not_restore) {
if (count > 1) {
s = backup_engine->RestoreDBFromBackup(
RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
restore_dir /* db_dir */, restore_dir /* wal_dir */);
if (!s.ok()) {
from = "BackupEngine::RestoreDBFromBackup";
}
} else {
from_latest = true;
s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
restore_dir /* db_dir */,
restore_dir /* wal_dir */);
if (!s.ok()) {
from = "BackupEngine::RestoreDBFromLatestBackup";
}
}
}
if (s.ok() && !inplace_not_restore) {
// Purge early if restoring, to ensure the restored directory doesn't
// have some secret dependency on the backup directory.
uint32_t to_keep = 0;
if (allow_persistent) {
// allow one thread to keep up to 2 backups
to_keep = thread->rand.Uniform(3);
}
s = backup_engine->PurgeOldBackups(to_keep);
if (!s.ok()) {
from = "BackupEngine::PurgeOldBackups";
}
}
DB* restored_db = nullptr;
std::vector<ColumnFamilyHandle*> restored_cf_handles;
// Not yet implemented: opening restored BlobDB or TransactionDB
if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
Options restore_options(options_);
restore_options.listeners.clear();
// Avoid dangling/shared file descriptors, for reliable destroy
restore_options.sst_file_manager = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descriptors;
// TODO(ajkr): `column_family_names_` is not safe to access here when
// `clear_column_family_one_in != 0`. But we can't easily switch to
// `ListColumnFamilies` to get names because it won't necessarily give
// the same order as `column_family_names_`.
assert(FLAGS_clear_column_family_one_in == 0);
for (auto name : column_family_names_) {
cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
}
if (inplace_not_restore) {
BackupInfo& info = backup_info[thread->rand.Uniform(count)];
restore_options.env = info.env_for_open.get();
s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
cf_descriptors, &restored_cf_handles,
&restored_db);
if (!s.ok()) {
from = "DB::OpenForReadOnly in backup/restore";
}
} else {
s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
&restored_cf_handles, &restored_db);
if (!s.ok()) {
from = "DB::Open in backup/restore";
}
}
}
// Note the column families chosen by `rand_column_families` cannot be
// dropped while the locks for `rand_keys` are held. So we should not have
// to worry about accessing those column families throughout this function.
//
// For simplicity, currently only verifies existence/non-existence of a
// single key
for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
++i) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
std::string restored_value;
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions read_opts;
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts = ts_str;
read_opts.timestamp = &ts;
}
Status get_status = restored_db->Get(
read_opts, restored_cf_handles[rand_column_families[i]], key,
&restored_value);
bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
if (get_status.ok()) {
if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
s = Status::Corruption("key exists in restore but not in original db");
}
} else if (get_status.IsNotFound()) {
if (exists && from_latest && ShouldAcquireMutexOnKey()) {
s = Status::Corruption("key exists in original db but not in restore");
}
} else {
s = get_status;
if (!s.ok()) {
from = "DB::Get in backup/restore";
}
}
}
if (restored_db != nullptr) {
for (auto* cf_handle : restored_cf_handles) {
restored_db->DestroyColumnFamilyHandle(cf_handle);
}
delete restored_db;
restored_db = nullptr;
}
if (s.ok() && inplace_not_restore) {
// Purge late if inplace open read-only
uint32_t to_keep = 0;
if (allow_persistent) {
// allow one thread to keep up to 2 backups
to_keep = thread->rand.Uniform(3);
}
s = backup_engine->PurgeOldBackups(to_keep);
if (!s.ok()) {
from = "BackupEngine::PurgeOldBackups";
}
}
if (backup_engine != nullptr) {
delete backup_engine;
backup_engine = nullptr;
}
if (s.ok()) {
// Preserve directories on failure, or allowed persistent backup
if (!allow_persistent) {
s = DestroyDir(db_stress_env, backup_dir);
if (!s.ok()) {
from = "Destroy backup dir";
}
}
}
if (s.ok()) {
s = DestroyDir(db_stress_env, restore_dir);
if (!s.ok()) {
from = "Destroy restore dir";
}
}
if (!s.ok()) {
fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
s.ToString().c_str());
}
return s;
}