IOStatus BackupEngineImpl::Initialize()

in utilities/backupable/backupable_db.cc [997:1268]


IOStatus BackupEngineImpl::Initialize() {
  assert(!initialized_);
  initialized_ = true;
  if (read_only_) {
    ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
  }
  options_.Dump(options_.info_log);

  auto meta_path = GetAbsolutePath(kMetaDirName);

  if (!read_only_) {
    // we might need to clean up from previous crash or I/O errors
    might_need_garbage_collect_ = true;

    if (options_.max_valid_backups_to_open != port::kMaxInt32) {
      options_.max_valid_backups_to_open = port::kMaxInt32;
      ROCKS_LOG_WARN(
          options_.info_log,
          "`max_valid_backups_to_open` is not set to the default value. Ignoring "
          "its value since BackupEngine is not read-only.");
    }

    // gather the list of directories that we need to create
    std::vector<std::pair<std::string, std::unique_ptr<FSDirectory>*>>
        directories;
    directories.emplace_back(GetAbsolutePath(), &backup_directory_);
    if (options_.share_table_files) {
      if (options_.share_files_with_checksum) {
        directories.emplace_back(
            GetAbsolutePath(GetSharedFileWithChecksumRel()),
            &shared_directory_);
      } else {
        directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
                                 &shared_directory_);
      }
    }
    directories.emplace_back(GetAbsolutePath(kPrivateDirName),
                             &private_directory_);
    directories.emplace_back(meta_path, &meta_directory_);
    // create all the dirs we need
    for (const auto& d : directories) {
      IOStatus io_s =
          backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr);
      if (io_s.ok()) {
        io_s =
            backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr);
      }
      if (!io_s.ok()) {
        return io_s;
      }
    }
  }

  std::vector<std::string> backup_meta_files;
  {
    IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_,
                                            &backup_meta_files, nullptr);
    if (io_s.IsNotFound()) {
      return IOStatus::NotFound(meta_path + " is missing");
    } else if (!io_s.ok()) {
      return io_s;
    }
  }
  // create backups_ structure
  for (auto& file : backup_meta_files) {
    ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
    if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
      // Invalid file name, will be deleted with auto-GC when user
      // initiates an append or write operation. (Behave as read-only until
      // then.)
      ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s",
                     file.c_str());
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
    // Insert all the (backup_id, BackupMeta) that will be loaded later
    // The loading performed later will check whether there are corrupt backups
    // and move the corrupt backups to corrupt_backups_
    backups_.insert(std::make_pair(
        backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
                       GetBackupMetaFile(backup_id, false /* tmp */),
                       GetBackupMetaFile(backup_id, true /* tmp */),
                       &backuped_file_infos_, backup_env_, backup_fs_))));
  }

  latest_backup_id_ = 0;
  latest_valid_backup_id_ = 0;
  if (options_.destroy_old_data) {  // Destroy old data
    assert(!read_only_);
    ROCKS_LOG_INFO(
        options_.info_log,
        "Backup Engine started with destroy_old_data == true, deleting all "
        "backups");
    IOStatus io_s = PurgeOldBackups(0);
    if (io_s.ok()) {
      io_s = GarbageCollect();
    }
    if (!io_s.ok()) {
      return io_s;
    }
  } else {  // Load data from storage
    // abs_path_to_size: maps absolute paths of files in backup directory to
    // their corresponding sizes
    std::unordered_map<std::string, uint64_t> abs_path_to_size;
    // Insert files and their sizes in backup sub-directories (shared and
    // shared_checksum) to abs_path_to_size
    for (const auto& rel_dir :
         {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
      const auto abs_dir = GetAbsolutePath(rel_dir);
      IOStatus io_s =
          ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size);
      if (!io_s.ok()) {
        // I/O error likely impacting all backups
        return io_s;
      }
    }
    // load the backups if any, until valid_backups_to_open of the latest
    // non-corrupted backups have been successfully opened.
    int valid_backups_to_open = options_.max_valid_backups_to_open;
    for (auto backup_iter = backups_.rbegin();
         backup_iter != backups_.rend();
         ++backup_iter) {
      assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
      if (latest_backup_id_ == 0) {
        latest_backup_id_ = backup_iter->first;
      }
      if (valid_backups_to_open == 0) {
        break;
      }

      // Insert files and their sizes in backup sub-directories
      // (private/backup_id) to abs_path_to_size
      IOStatus io_s = ReadChildFileCurrentSizes(
          GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_,
          &abs_path_to_size);
      if (io_s.ok()) {
        io_s = backup_iter->second->LoadFromFile(
            options_.backup_dir, abs_path_to_size,
            options_.backup_rate_limiter.get(), options_.info_log,
            &reported_ignored_fields_);
      }
      if (io_s.IsCorruption() || io_s.IsNotSupported()) {
        ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
                       backup_iter->first, io_s.ToString().c_str());
        corrupt_backups_.insert(std::make_pair(
            backup_iter->first,
            std::make_pair(io_s, std::move(backup_iter->second))));
      } else if (!io_s.ok()) {
        // Distinguish corruption errors from errors in the backup Env.
        // Errors in the backup Env (i.e., this code path) will cause Open() to
        // fail, whereas corruption errors would not cause Open() failures.
        return io_s;
      } else {
        ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
                       backup_iter->first,
                       backup_iter->second->GetInfoString().c_str());
        assert(latest_valid_backup_id_ == 0 ||
               latest_valid_backup_id_ > backup_iter->first);
        if (latest_valid_backup_id_ == 0) {
          latest_valid_backup_id_ = backup_iter->first;
        }
        --valid_backups_to_open;
      }
    }

    for (const auto& corrupt : corrupt_backups_) {
      backups_.erase(backups_.find(corrupt.first));
    }
    // erase the backups before max_valid_backups_to_open
    int num_unopened_backups;
    if (options_.max_valid_backups_to_open == 0) {
      num_unopened_backups = 0;
    } else {
      num_unopened_backups =
          std::max(0, static_cast<int>(backups_.size()) -
                          options_.max_valid_backups_to_open);
    }
    for (int i = 0; i < num_unopened_backups; ++i) {
      assert(backups_.begin()->second->Empty());
      backups_.erase(backups_.begin());
    }
  }

  ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
  ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
                 latest_valid_backup_id_);

  // set up threads perform copies from files_to_copy_or_create_ in the
  // background
  threads_cpu_priority_ = CpuPriority::kNormal;
  threads_.reserve(options_.max_background_operations);
  for (int t = 0; t < options_.max_background_operations; t++) {
    threads_.emplace_back([this]() {
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
      pthread_setname_np(pthread_self(), "backup_engine");
#endif
#endif
      CpuPriority current_priority = CpuPriority::kNormal;
      CopyOrCreateWorkItem work_item;
      uint64_t bytes_toward_next_callback = 0;
      while (files_to_copy_or_create_.read(work_item)) {
        CpuPriority priority = threads_cpu_priority_;
        if (current_priority != priority) {
          TEST_SYNC_POINT_CALLBACK(
              "BackupEngineImpl::Initialize:SetCpuPriority", &priority);
          port::SetCpuPriority(0, priority);
          current_priority = priority;
        }
        // `bytes_read` and `bytes_written` stats are enabled based on
        // compile-time support and cannot be dynamically toggled. So we do not
        // need to worry about `PerfLevel` here, unlike many other
        // `IOStatsContext` / `PerfContext` stats.
        uint64_t prev_bytes_read = IOSTATS(bytes_read);
        uint64_t prev_bytes_written = IOSTATS(bytes_written);

        CopyOrCreateResult result;
        Temperature temp = work_item.src_temperature;
        result.io_status = CopyOrCreateFile(
            work_item.src_path, work_item.dst_path, work_item.contents,
            work_item.size_limit, work_item.src_env, work_item.dst_env,
            work_item.src_env_options, work_item.sync, work_item.rate_limiter,
            work_item.progress_callback, &temp, work_item.dst_temperature,
            &bytes_toward_next_callback, &result.size, &result.checksum_hex);

        RecordTick(work_item.stats, BACKUP_READ_BYTES,
                   IOSTATS(bytes_read) - prev_bytes_read);
        RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
                   IOSTATS(bytes_written) - prev_bytes_written);

        result.db_id = work_item.db_id;
        result.db_session_id = work_item.db_session_id;
        result.expected_src_temperature = work_item.src_temperature;
        result.current_src_temperature = temp;
        if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
          // unknown checksum function name implies no db table file checksum in
          // db manifest; work_item.src_checksum_hex not empty means
          // backup engine has calculated its crc32c checksum for the table
          // file; therefore, we are able to compare the checksums.
          if (work_item.src_checksum_func_name ==
                  kUnknownFileChecksumFuncName ||
              work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
            if (work_item.src_checksum_hex != result.checksum_hex) {
              std::string checksum_info(
                  "Expected checksum is " + work_item.src_checksum_hex +
                  " while computed checksum is " + result.checksum_hex);
              result.io_status = IOStatus::Corruption(
                  "Checksum mismatch after copying to " + work_item.dst_path +
                  ": " + checksum_info);
            }
          } else {
            // FIXME(peterd): dead code?
            std::string checksum_function_info(
                "Existing checksum function is " +
                work_item.src_checksum_func_name +
                " while provided checksum function is " +
                kBackupFileChecksumFuncName);
            ROCKS_LOG_INFO(
                options_.info_log,
                "Unable to verify checksum after copying to %s: %s\n",
                work_item.dst_path.c_str(), checksum_function_info.c_str());
          }
        }
        work_item.result.set_value(std::move(result));
      }
    });
  }
  ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
  return IOStatus::OK();
}