Status ExternalSstFileIngestionJob::Prepare()

in db/external_sst_file_ingestion_job.cc [30:333]


Status ExternalSstFileIngestionJob::Prepare(
    const std::vector<std::string>& external_files_paths,
    const std::vector<std::string>& files_checksums,
    const std::vector<std::string>& files_checksum_func_names,
    const Temperature& file_temperature, uint64_t next_file_number,
    SuperVersion* sv) {
  Status status;

  // Read the information of files we are ingesting
  for (const std::string& file_path : external_files_paths) {
    IngestedFileInfo file_to_ingest;
    status =
        GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv);
    if (!status.ok()) {
      return status;
    }

    if (file_to_ingest.cf_id !=
            TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
        file_to_ingest.cf_id != cfd_->GetID()) {
      return Status::InvalidArgument(
          "External file column family id don't match");
    }

    if (file_to_ingest.num_entries == 0 &&
        file_to_ingest.num_range_deletions == 0) {
      return Status::InvalidArgument("File contain no entries");
    }

    if (!file_to_ingest.smallest_internal_key.Valid() ||
        !file_to_ingest.largest_internal_key.Valid()) {
      return Status::Corruption("Generated table have corrupted keys");
    }

    files_to_ingest_.emplace_back(std::move(file_to_ingest));
  }

  const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
  auto num_files = files_to_ingest_.size();
  if (num_files == 0) {
    return Status::InvalidArgument("The list of files is empty");
  } else if (num_files > 1) {
    // Verify that passed files don't have overlapping ranges
    autovector<const IngestedFileInfo*> sorted_files;
    for (size_t i = 0; i < num_files; i++) {
      sorted_files.push_back(&files_to_ingest_[i]);
    }

    std::sort(
        sorted_files.begin(), sorted_files.end(),
        [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
          return sstableKeyCompare(ucmp, info1->smallest_internal_key,
                                   info2->smallest_internal_key) < 0;
        });

    for (size_t i = 0; i + 1 < num_files; i++) {
      if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
                            sorted_files[i + 1]->smallest_internal_key) >= 0) {
        files_overlap_ = true;
        break;
      }
    }
  }

  // Hanlde the file temperature
  for (size_t i = 0; i < num_files; i++) {
    files_to_ingest_[i].file_temperature = file_temperature;
  }

  if (ingestion_options_.ingest_behind && files_overlap_) {
    return Status::NotSupported("Files have overlapping ranges");
  }

  // Copy/Move external files into DB
  std::unordered_set<size_t> ingestion_path_ids;
  for (IngestedFileInfo& f : files_to_ingest_) {
    f.copy_file = false;
    const std::string path_outside_db = f.external_file_path;
    const std::string path_inside_db =
        TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(),
                      f.fd.GetPathId());
    if (ingestion_options_.move_files) {
      status =
          fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
      if (status.ok()) {
        // It is unsafe to assume application had sync the file and file
        // directory before ingest the file. For integrity of RocksDB we need
        // to sync the file.
        std::unique_ptr<FSWritableFile> file_to_sync;
        Status s = fs_->ReopenWritableFile(path_inside_db, env_options_,
                                           &file_to_sync, nullptr);
        TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen",
                                 &s);
        // Some file systems (especially remote/distributed) don't support
        // reopening a file for writing and don't require reopening and
        // syncing the file. Ignore the NotSupported error in that case.
        if (!s.IsNotSupported()) {
          status = s;
          if (status.ok()) {
            TEST_SYNC_POINT(
                "ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
            status = SyncIngestedFile(file_to_sync.get());
            TEST_SYNC_POINT(
                "ExternalSstFileIngestionJob::AfterSyncIngestedFile");
            if (!status.ok()) {
              ROCKS_LOG_WARN(db_options_.info_log,
                             "Failed to sync ingested file %s: %s",
                             path_inside_db.c_str(), status.ToString().c_str());
            }
          }
        }
      } else if (status.IsNotSupported() &&
                 ingestion_options_.failed_move_fall_back_to_copy) {
        // Original file is on a different FS, use copy instead of hard linking.
        f.copy_file = true;
      }
    } else {
      f.copy_file = true;
    }

    if (f.copy_file) {
      TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
                               nullptr);
      // CopyFile also sync the new file.
      status =
          CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
                   db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
    }
    TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
    if (!status.ok()) {
      break;
    }
    f.internal_file_path = path_inside_db;
    // Initialize the checksum information of ingested files.
    f.file_checksum = kUnknownFileChecksum;
    f.file_checksum_func_name = kUnknownFileChecksumFuncName;
    ingestion_path_ids.insert(f.fd.GetPathId());
  }

  TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
  if (status.ok()) {
    for (auto path_id : ingestion_path_ids) {
      status = directories_->GetDataDir(path_id)->FsyncWithDirOptions(
          IOOptions(), nullptr,
          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
      if (!status.ok()) {
        ROCKS_LOG_WARN(db_options_.info_log,
                       "Failed to sync directory %" ROCKSDB_PRIszt
                       " while ingest file: %s",
                       path_id, status.ToString().c_str());
        break;
      }
    }
  }
  TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");

  // Generate and check the sst file checksum. Note that, if
  // IngestExternalFileOptions::write_global_seqno is true, we will not update
  // the checksum information in the files_to_ingests_ here, since the file is
  // upadted with the new global_seqno. After global_seqno is updated, DB will
  // generate the new checksum and store it in the Manifest. In all other cases
  // if ingestion_options_.write_global_seqno == true and
  // verify_file_checksum is false, we only check the checksum function name.
  if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) {
    if (ingestion_options_.verify_file_checksum == false &&
        files_checksums.size() == files_to_ingest_.size() &&
        files_checksum_func_names.size() == files_to_ingest_.size()) {
      // Only when verify_file_checksum == false and the checksum for ingested
      // files are provided, DB will use the provided checksum and does not
      // generate the checksum for ingested files.
      need_generate_file_checksum_ = false;
    } else {
      need_generate_file_checksum_ = true;
    }
    FileChecksumGenContext gen_context;
    std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
        db_options_.file_checksum_gen_factory->CreateFileChecksumGenerator(
            gen_context);
    std::vector<std::string> generated_checksums;
    std::vector<std::string> generated_checksum_func_names;
    // Step 1: generate the checksum for ingested sst file.
    if (need_generate_file_checksum_) {
      for (size_t i = 0; i < files_to_ingest_.size(); i++) {
        std::string generated_checksum;
        std::string generated_checksum_func_name;
        std::string requested_checksum_func_name;
        // TODO: rate limit file reads for checksum calculation during file
        // ingestion.
        IOStatus io_s = GenerateOneFileChecksum(
            fs_.get(), files_to_ingest_[i].internal_file_path,
            db_options_.file_checksum_gen_factory.get(),
            requested_checksum_func_name, &generated_checksum,
            &generated_checksum_func_name,
            ingestion_options_.verify_checksums_readahead_size,
            db_options_.allow_mmap_reads, io_tracer_,
            db_options_.rate_limiter.get(),
            Env::IO_TOTAL /* rate_limiter_priority */);
        if (!io_s.ok()) {
          status = io_s;
          ROCKS_LOG_WARN(db_options_.info_log,
                         "Sst file checksum generation of file: %s failed: %s",
                         files_to_ingest_[i].internal_file_path.c_str(),
                         status.ToString().c_str());
          break;
        }
        if (ingestion_options_.write_global_seqno == false) {
          files_to_ingest_[i].file_checksum = generated_checksum;
          files_to_ingest_[i].file_checksum_func_name =
              generated_checksum_func_name;
        }
        generated_checksums.push_back(generated_checksum);
        generated_checksum_func_names.push_back(generated_checksum_func_name);
      }
    }

    // Step 2: based on the verify_file_checksum and ingested checksum
    // information, do the verification.
    if (status.ok()) {
      if (files_checksums.size() == files_to_ingest_.size() &&
          files_checksum_func_names.size() == files_to_ingest_.size()) {
        // Verify the checksum and checksum function name.
        if (ingestion_options_.verify_file_checksum) {
          for (size_t i = 0; i < files_to_ingest_.size(); i++) {
            if (files_checksum_func_names[i] !=
                generated_checksum_func_names[i]) {
              status = Status::InvalidArgument(
                  "Checksum function name does not match with the checksum "
                  "function name of this DB");
              ROCKS_LOG_WARN(
                  db_options_.info_log,
                  "Sst file checksum verification of file: %s failed: %s",
                  external_files_paths[i].c_str(), status.ToString().c_str());
              break;
            }
            if (files_checksums[i] != generated_checksums[i]) {
              status = Status::Corruption(
                  "Ingested checksum does not match with the generated "
                  "checksum");
              ROCKS_LOG_WARN(
                  db_options_.info_log,
                  "Sst file checksum verification of file: %s failed: %s",
                  files_to_ingest_[i].internal_file_path.c_str(),
                  status.ToString().c_str());
              break;
            }
          }
        } else {
          // If verify_file_checksum is not enabled, we only verify the
          // checksum function name. If it does not match, fail the ingestion.
          // If matches, we trust the ingested checksum information and store
          // in the Manifest.
          for (size_t i = 0; i < files_to_ingest_.size(); i++) {
            if (files_checksum_func_names[i] != file_checksum_gen->Name()) {
              status = Status::InvalidArgument(
                  "Checksum function name does not match with the checksum "
                  "function name of this DB");
              ROCKS_LOG_WARN(
                  db_options_.info_log,
                  "Sst file checksum verification of file: %s failed: %s",
                  external_files_paths[i].c_str(), status.ToString().c_str());
              break;
            }
            files_to_ingest_[i].file_checksum = files_checksums[i];
            files_to_ingest_[i].file_checksum_func_name =
                files_checksum_func_names[i];
          }
        }
      } else if (files_checksums.size() != files_checksum_func_names.size() ||
                 (files_checksums.size() == files_checksum_func_names.size() &&
                  files_checksums.size() != 0)) {
        // The checksum or checksum function name vector are not both empty
        // and they are incomplete.
        status = Status::InvalidArgument(
            "The checksum information of ingested sst files are nonempty and "
            "the size of checksums or the size of the checksum function "
            "names "
            "does not match with the number of ingested sst files");
        ROCKS_LOG_WARN(
            db_options_.info_log,
            "The ingested sst files checksum information is incomplete: %s",
            status.ToString().c_str());
      }
    }
  }

  // TODO: The following is duplicated with Cleanup().
  if (!status.ok()) {
    IOOptions io_opts;
    // We failed, remove all files that we copied into the db
    for (IngestedFileInfo& f : files_to_ingest_) {
      if (f.internal_file_path.empty()) {
        continue;
      }
      Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
      if (!s.ok()) {
        ROCKS_LOG_WARN(db_options_.info_log,
                       "AddFile() clean up for file %s failed : %s",
                       f.internal_file_path.c_str(), s.ToString().c_str());
      }
    }
  }

  return status;
}