Status StressTest::TestBackupRestore()

in db_stress_tool/db_stress_test_base.cc [1424:1672]


Status StressTest::TestBackupRestore(
    ThreadState* thread, const std::vector<int>& rand_column_families,
    const std::vector<int64_t>& rand_keys) {
  std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
  std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
  BackupEngineOptions backup_opts(backup_dir);
  // For debugging, get info_log from live options
  backup_opts.info_log = db_->GetDBOptions().info_log.get();
  if (thread->rand.OneIn(10)) {
    backup_opts.share_table_files = false;
  } else {
    backup_opts.share_table_files = true;
    if (thread->rand.OneIn(5)) {
      backup_opts.share_files_with_checksum = false;
    } else {
      backup_opts.share_files_with_checksum = true;
      if (thread->rand.OneIn(2)) {
        // old
        backup_opts.share_files_with_checksum_naming =
            BackupEngineOptions::kLegacyCrc32cAndFileSize;
      } else {
        // new
        backup_opts.share_files_with_checksum_naming =
            BackupEngineOptions::kUseDbSessionId;
      }
      if (thread->rand.OneIn(2)) {
        backup_opts.share_files_with_checksum_naming =
            backup_opts.share_files_with_checksum_naming |
            BackupEngineOptions::kFlagIncludeFileSize;
      }
    }
  }
  if (thread->rand.OneIn(2)) {
    backup_opts.schema_version = 1;
  } else {
    backup_opts.schema_version = 2;
  }
  BackupEngine* backup_engine = nullptr;
  std::string from = "a backup/restore operation";
  Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
  if (!s.ok()) {
    from = "BackupEngine::Open";
  }
  if (s.ok()) {
    if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) {
      TEST_BackupMetaSchemaOptions test_opts;
      test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
      test_opts.file_sizes = thread->rand.OneIn(2) == 0;
      TEST_SetBackupMetaSchemaOptions(backup_engine, test_opts);
    }
    CreateBackupOptions create_opts;
    if (FLAGS_disable_wal) {
      // The verification can only work when latest value of `key` is backed up,
      // which requires flushing in case of WAL disabled.
      //
      // Note this triggers a flush with a key lock held. Meanwhile, operations
      // like flush/compaction may attempt to grab key locks like in
      // `DbStressCompactionFilter`. The philosophy around preventing deadlock
      // is the background operation key lock acquisition only tries but does
      // not wait for the lock. So here in the foreground it is OK to hold the
      // lock and wait on a background operation (flush).
      create_opts.flush_before_backup = true;
    }
    s = backup_engine->CreateNewBackup(create_opts, db_);
    if (!s.ok()) {
      from = "BackupEngine::CreateNewBackup";
    }
  }
  if (s.ok()) {
    delete backup_engine;
    backup_engine = nullptr;
    s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
    if (!s.ok()) {
      from = "BackupEngine::Open (again)";
    }
  }
  std::vector<BackupInfo> backup_info;
  // If inplace_not_restore, we verify the backup by opening it as a
  // read-only DB. If !inplace_not_restore, we restore it to a temporary
  // directory for verification.
  bool inplace_not_restore = thread->rand.OneIn(3);
  if (s.ok()) {
    backup_engine->GetBackupInfo(&backup_info,
                                 /*include_file_details*/ inplace_not_restore);
    if (backup_info.empty()) {
      s = Status::NotFound("no backups found");
      from = "BackupEngine::GetBackupInfo";
    }
  }
  if (s.ok() && thread->rand.OneIn(2)) {
    s = backup_engine->VerifyBackup(
        backup_info.front().backup_id,
        thread->rand.OneIn(2) /* verify_with_checksum */);
    if (!s.ok()) {
      from = "BackupEngine::VerifyBackup";
    }
  }
  const bool allow_persistent = thread->tid == 0;  // not too many
  bool from_latest = false;
  int count = static_cast<int>(backup_info.size());
  if (s.ok() && !inplace_not_restore) {
    if (count > 1) {
      s = backup_engine->RestoreDBFromBackup(
          RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
          restore_dir /* db_dir */, restore_dir /* wal_dir */);
      if (!s.ok()) {
        from = "BackupEngine::RestoreDBFromBackup";
      }
    } else {
      from_latest = true;
      s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
                                                   restore_dir /* db_dir */,
                                                   restore_dir /* wal_dir */);
      if (!s.ok()) {
        from = "BackupEngine::RestoreDBFromLatestBackup";
      }
    }
  }
  if (s.ok() && !inplace_not_restore) {
    // Purge early if restoring, to ensure the restored directory doesn't
    // have some secret dependency on the backup directory.
    uint32_t to_keep = 0;
    if (allow_persistent) {
      // allow one thread to keep up to 2 backups
      to_keep = thread->rand.Uniform(3);
    }
    s = backup_engine->PurgeOldBackups(to_keep);
    if (!s.ok()) {
      from = "BackupEngine::PurgeOldBackups";
    }
  }
  DB* restored_db = nullptr;
  std::vector<ColumnFamilyHandle*> restored_cf_handles;
  // Not yet implemented: opening restored BlobDB or TransactionDB
  if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
    Options restore_options(options_);
    restore_options.listeners.clear();
    // Avoid dangling/shared file descriptors, for reliable destroy
    restore_options.sst_file_manager = nullptr;
    std::vector<ColumnFamilyDescriptor> cf_descriptors;
    // TODO(ajkr): `column_family_names_` is not safe to access here when
    // `clear_column_family_one_in != 0`. But we can't easily switch to
    // `ListColumnFamilies` to get names because it won't necessarily give
    // the same order as `column_family_names_`.
    assert(FLAGS_clear_column_family_one_in == 0);
    for (auto name : column_family_names_) {
      cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
    }
    if (inplace_not_restore) {
      BackupInfo& info = backup_info[thread->rand.Uniform(count)];
      restore_options.env = info.env_for_open.get();
      s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
                              cf_descriptors, &restored_cf_handles,
                              &restored_db);
      if (!s.ok()) {
        from = "DB::OpenForReadOnly in backup/restore";
      }
    } else {
      s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
                   &restored_cf_handles, &restored_db);
      if (!s.ok()) {
        from = "DB::Open in backup/restore";
      }
    }
  }
  // Note the column families chosen by `rand_column_families` cannot be
  // dropped while the locks for `rand_keys` are held. So we should not have
  // to worry about accessing those column families throughout this function.
  //
  // For simplicity, currently only verifies existence/non-existence of a
  // single key
  for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
       ++i) {
    std::string key_str = Key(rand_keys[0]);
    Slice key = key_str;
    std::string restored_value;
    // This `ReadOptions` is for validation purposes. Ignore
    // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
    ReadOptions read_opts;
    std::string ts_str;
    Slice ts;
    if (FLAGS_user_timestamp_size > 0) {
      ts_str = GenerateTimestampForRead();
      ts = ts_str;
      read_opts.timestamp = &ts;
    }
    Status get_status = restored_db->Get(
        read_opts, restored_cf_handles[rand_column_families[i]], key,
        &restored_value);
    bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
    if (get_status.ok()) {
      if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
        s = Status::Corruption("key exists in restore but not in original db");
      }
    } else if (get_status.IsNotFound()) {
      if (exists && from_latest && ShouldAcquireMutexOnKey()) {
        s = Status::Corruption("key exists in original db but not in restore");
      }
    } else {
      s = get_status;
      if (!s.ok()) {
        from = "DB::Get in backup/restore";
      }
    }
  }
  if (restored_db != nullptr) {
    for (auto* cf_handle : restored_cf_handles) {
      restored_db->DestroyColumnFamilyHandle(cf_handle);
    }
    delete restored_db;
    restored_db = nullptr;
  }
  if (s.ok() && inplace_not_restore) {
    // Purge late if inplace open read-only
    uint32_t to_keep = 0;
    if (allow_persistent) {
      // allow one thread to keep up to 2 backups
      to_keep = thread->rand.Uniform(3);
    }
    s = backup_engine->PurgeOldBackups(to_keep);
    if (!s.ok()) {
      from = "BackupEngine::PurgeOldBackups";
    }
  }
  if (backup_engine != nullptr) {
    delete backup_engine;
    backup_engine = nullptr;
  }
  if (s.ok()) {
    // Preserve directories on failure, or allowed persistent backup
    if (!allow_persistent) {
      s = DestroyDir(db_stress_env, backup_dir);
      if (!s.ok()) {
        from = "Destroy backup dir";
      }
    }
  }
  if (s.ok()) {
    s = DestroyDir(db_stress_env, restore_dir);
    if (!s.ok()) {
      from = "Destroy restore dir";
    }
  }
  if (!s.ok()) {
    fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
            s.ToString().c_str());
  }
  return s;
}