void StressTest::Open()

in db_stress_tool/db_stress_test_base.cc [2303:2850]


void StressTest::Open() {
  assert(db_ == nullptr);
#ifndef ROCKSDB_LITE
  assert(txn_db_ == nullptr);
#endif
  if (FLAGS_options_file.empty()) {
    BlockBasedTableOptions block_based_options;
    block_based_options.block_cache = cache_;
    block_based_options.cache_index_and_filter_blocks =
        FLAGS_cache_index_and_filter_blocks;
    block_based_options.metadata_cache_options.top_level_index_pinning =
        static_cast<PinningTier>(FLAGS_top_level_index_pinning);
    block_based_options.metadata_cache_options.partition_pinning =
        static_cast<PinningTier>(FLAGS_partition_pinning);
    block_based_options.metadata_cache_options.unpartitioned_pinning =
        static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
    block_based_options.block_cache_compressed = compressed_cache_;
    block_based_options.checksum = checksum_type_e;
    block_based_options.block_size = FLAGS_block_size;
    block_based_options.format_version =
        static_cast<uint32_t>(FLAGS_format_version);
    block_based_options.index_block_restart_interval =
        static_cast<int32_t>(FLAGS_index_block_restart_interval);
    block_based_options.filter_policy = filter_policy_;
    block_based_options.partition_filters = FLAGS_partition_filters;
    block_based_options.optimize_filters_for_memory =
        FLAGS_optimize_filters_for_memory;
    block_based_options.detect_filter_construct_corruption =
        FLAGS_detect_filter_construct_corruption;
    block_based_options.index_type =
        static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
    block_based_options.prepopulate_block_cache =
        static_cast<BlockBasedTableOptions::PrepopulateBlockCache>(
            FLAGS_prepopulate_block_cache);
    options_.table_factory.reset(
        NewBlockBasedTableFactory(block_based_options));
    options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
    options_.write_buffer_size = FLAGS_write_buffer_size;
    options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
    options_.min_write_buffer_number_to_merge =
        FLAGS_min_write_buffer_number_to_merge;
    options_.max_write_buffer_number_to_maintain =
        FLAGS_max_write_buffer_number_to_maintain;
    options_.max_write_buffer_size_to_maintain =
        FLAGS_max_write_buffer_size_to_maintain;
    options_.memtable_prefix_bloom_size_ratio =
        FLAGS_memtable_prefix_bloom_size_ratio;
    options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
    options_.disable_auto_compactions = FLAGS_disable_auto_compactions;
    options_.max_background_compactions = FLAGS_max_background_compactions;
    options_.max_background_flushes = FLAGS_max_background_flushes;
    options_.compaction_style =
        static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
    if (FLAGS_prefix_size >= 0) {
      options_.prefix_extractor.reset(
          NewFixedPrefixTransform(FLAGS_prefix_size));
    }
    options_.max_open_files = FLAGS_open_files;
    options_.statistics = dbstats;
    options_.env = db_stress_env;
    options_.use_fsync = FLAGS_use_fsync;
    options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
    options_.allow_mmap_reads = FLAGS_mmap_read;
    options_.allow_mmap_writes = FLAGS_mmap_write;
    options_.use_direct_reads = FLAGS_use_direct_reads;
    options_.use_direct_io_for_flush_and_compaction =
        FLAGS_use_direct_io_for_flush_and_compaction;
    options_.recycle_log_file_num =
        static_cast<size_t>(FLAGS_recycle_log_file_num);
    options_.target_file_size_base = FLAGS_target_file_size_base;
    options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
    options_.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
    options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
    options_.level0_slowdown_writes_trigger =
        FLAGS_level0_slowdown_writes_trigger;
    options_.level0_file_num_compaction_trigger =
        FLAGS_level0_file_num_compaction_trigger;
    options_.compression = compression_type_e;
    options_.bottommost_compression = bottommost_compression_type_e;
    options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
    options_.compression_opts.zstd_max_train_bytes =
        FLAGS_compression_zstd_max_train_bytes;
    options_.compression_opts.parallel_threads =
        FLAGS_compression_parallel_threads;
    options_.compression_opts.max_dict_buffer_bytes =
        FLAGS_compression_max_dict_buffer_bytes;
    options_.create_if_missing = true;
    options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
    options_.inplace_update_support = FLAGS_in_place_update;
    options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
    options_.allow_concurrent_memtable_write =
        FLAGS_allow_concurrent_memtable_write;
    options_.experimental_mempurge_threshold =
        FLAGS_experimental_mempurge_threshold;
    options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
    options_.ttl = FLAGS_compaction_ttl;
    options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
    options_.enable_write_thread_adaptive_yield =
        FLAGS_enable_write_thread_adaptive_yield;
    options_.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    options_.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    options_.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    options_.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    options_.atomic_flush = FLAGS_atomic_flush;
    options_.avoid_unnecessary_blocking_io =
        FLAGS_avoid_unnecessary_blocking_io;
    options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
    options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
    options_.max_write_batch_group_size_bytes =
        FLAGS_max_write_batch_group_size_bytes;
    options_.level_compaction_dynamic_level_bytes =
        FLAGS_level_compaction_dynamic_level_bytes;
    options_.file_checksum_gen_factory =
        GetFileChecksumImpl(FLAGS_file_checksum_impl);
    options_.track_and_verify_wals_in_manifest = true;

    // Integrated BlobDB
    options_.enable_blob_files = FLAGS_enable_blob_files;
    options_.min_blob_size = FLAGS_min_blob_size;
    options_.blob_file_size = FLAGS_blob_file_size;
    options_.blob_compression_type =
        StringToCompressionType(FLAGS_blob_compression_type.c_str());
    options_.enable_blob_garbage_collection =
        FLAGS_enable_blob_garbage_collection;
    options_.blob_garbage_collection_age_cutoff =
        FLAGS_blob_garbage_collection_age_cutoff;
    options_.blob_garbage_collection_force_threshold =
        FLAGS_blob_garbage_collection_force_threshold;
    options_.blob_compaction_readahead_size =
        FLAGS_blob_compaction_readahead_size;
  } else {
#ifdef ROCKSDB_LITE
    fprintf(stderr, "--options_file not supported in lite mode\n");
    exit(1);
#else
    DBOptions db_options;
    std::vector<ColumnFamilyDescriptor> cf_descriptors;
    Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
                                   &db_options, &cf_descriptors);
    db_options.env = new DbStressEnvWrapper(db_stress_env);
    if (!s.ok()) {
      fprintf(stderr, "Unable to load options file %s --- %s\n",
              FLAGS_options_file.c_str(), s.ToString().c_str());
      exit(1);
    }
    options_ = Options(db_options, cf_descriptors[0].options);
#endif  // ROCKSDB_LITE
  }

  if (FLAGS_rate_limiter_bytes_per_sec > 0) {
    options_.rate_limiter.reset(NewGenericRateLimiter(
        FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
        10 /* fairness */,
        FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
                                  : RateLimiter::Mode::kWritesOnly));
  }
  if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
      FLAGS_sst_file_manager_bytes_per_truncate > 0) {
    Status status;
    options_.sst_file_manager.reset(NewSstFileManager(
        db_stress_env, options_.info_log, "" /* trash_dir */,
        static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
        true /* delete_existing_trash */, &status,
        0.25 /* max_trash_db_ratio */,
        FLAGS_sst_file_manager_bytes_per_truncate));
    if (!status.ok()) {
      fprintf(stderr, "SstFileManager creation failed: %s\n",
              status.ToString().c_str());
      exit(1);
    }
  }

  if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
    fprintf(stderr,
            "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
    exit(1);
  }
  if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
    fprintf(stderr,
            "WARNING: prefix_size is non-zero but "
            "memtablerep != prefix_hash\n");
  }
  switch (FLAGS_rep_factory) {
    case kSkipList:
      // no need to do anything
      break;
#ifndef ROCKSDB_LITE
    case kHashSkipList:
      options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
      break;
    case kVectorRep:
      options_.memtable_factory.reset(new VectorRepFactory());
      break;
#else
    default:
      fprintf(stderr,
              "RocksdbLite only supports skip list mem table. Skip "
              "--rep_factory\n");
#endif  // ROCKSDB_LITE
  }

  if (FLAGS_use_full_merge_v1) {
    options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
  } else {
    options_.merge_operator = MergeOperators::CreatePutOperator();
  }
  if (FLAGS_enable_compaction_filter) {
    options_.compaction_filter_factory =
        std::make_shared<DbStressCompactionFilterFactory>();
  }
  options_.table_properties_collector_factories.emplace_back(
      std::make_shared<DbStressTablePropertiesCollectorFactory>());

  options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
  options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
  options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;

  if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
       FLAGS_allow_setting_blob_options_dynamically) &&
      FLAGS_best_efforts_recovery) {
    fprintf(stderr,
            "Integrated BlobDB is currently incompatible with best-effort "
            "recovery\n");
    exit(1);
  }

  fprintf(stdout,
          "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
          ", blob file size %" PRIu64
          ", blob compression type %s, blob GC enabled %d, cutoff %f, force "
          "threshold %f, blob compaction readahead size %" PRIu64 "\n",
          options_.enable_blob_files, options_.min_blob_size,
          options_.blob_file_size,
          CompressionTypeToString(options_.blob_compression_type).c_str(),
          options_.enable_blob_garbage_collection,
          options_.blob_garbage_collection_age_cutoff,
          options_.blob_garbage_collection_force_threshold,
          options_.blob_compaction_readahead_size);

  fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());

  Status s;

  if (FLAGS_user_timestamp_size > 0) {
    CheckAndSetOptionsForUserTimestamp();
  }

  if (FLAGS_ttl == -1) {
    std::vector<std::string> existing_column_families;
    s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
                               &existing_column_families);  // ignore errors
    if (!s.ok()) {
      // DB doesn't exist
      assert(existing_column_families.empty());
      assert(column_family_names_.empty());
      column_family_names_.push_back(kDefaultColumnFamilyName);
    } else if (column_family_names_.empty()) {
      // this is the first call to the function Open()
      column_family_names_ = existing_column_families;
    } else {
      // this is a reopen. just assert that existing column_family_names are
      // equivalent to what we remember
      auto sorted_cfn = column_family_names_;
      std::sort(sorted_cfn.begin(), sorted_cfn.end());
      std::sort(existing_column_families.begin(),
                existing_column_families.end());
      if (sorted_cfn != existing_column_families) {
        fprintf(stderr, "Expected column families differ from the existing:\n");
        fprintf(stderr, "Expected: {");
        for (auto cf : sorted_cfn) {
          fprintf(stderr, "%s ", cf.c_str());
        }
        fprintf(stderr, "}\n");
        fprintf(stderr, "Existing: {");
        for (auto cf : existing_column_families) {
          fprintf(stderr, "%s ", cf.c_str());
        }
        fprintf(stderr, "}\n");
      }
      assert(sorted_cfn == existing_column_families);
    }
    std::vector<ColumnFamilyDescriptor> cf_descriptors;
    for (auto name : column_family_names_) {
      if (name != kDefaultColumnFamilyName) {
        new_column_family_name_ =
            std::max(new_column_family_name_.load(), std::stoi(name) + 1);
      }
      cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
    }
    while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
      std::string name = ToString(new_column_family_name_.load());
      new_column_family_name_++;
      cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
      column_family_names_.push_back(name);
    }
    options_.listeners.clear();
#ifndef ROCKSDB_LITE
    options_.listeners.emplace_back(new DbStressListener(
        FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
#endif  // !ROCKSDB_LITE
    options_.create_missing_column_families = true;
    if (!FLAGS_use_txn) {
#ifndef NDEBUG
      // Determine whether we need to ingest file metadata write failures
      // during DB reopen. If it does, enable it.
      // Only ingest metadata error if it is reopening, as initial open
      // failure doesn't need to be handled.
      // TODO cover transaction DB is not covered in this fault test too.
      bool ingest_meta_error = false;
      bool ingest_write_error = false;
      bool ingest_read_error = false;
      if ((FLAGS_open_metadata_write_fault_one_in ||
           FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) &&
          fault_fs_guard
              ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
              .ok()) {
        if (!FLAGS_sync) {
          // When DB Stress is not sync mode, we expect all WAL writes to
          // WAL is durable. Buffering unsynced writes will cause false
          // positive in crash tests. Before we figure out a way to
          // solve it, skip WAL from failure injection.
          fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
        }
        ingest_meta_error = FLAGS_open_metadata_write_fault_one_in;
        ingest_write_error = FLAGS_open_write_fault_one_in;
        ingest_read_error = FLAGS_open_read_fault_one_in;
        if (ingest_meta_error) {
          fault_fs_guard->EnableMetadataWriteErrorInjection();
          fault_fs_guard->SetRandomMetadataWriteError(
              FLAGS_open_metadata_write_fault_one_in);
        }
        if (ingest_write_error) {
          fault_fs_guard->SetFilesystemDirectWritable(false);
          fault_fs_guard->EnableWriteErrorInjection();
          fault_fs_guard->SetRandomWriteError(
              static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
              IOStatus::IOError("Injected Open Error"),
              /*inject_for_all_file_types=*/true, /*types=*/{});
        }
        if (ingest_read_error) {
          fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in);
        }
      }
      while (true) {
#endif  // NDEBUG
#ifndef ROCKSDB_LITE
        // StackableDB-based BlobDB
        if (FLAGS_use_blob_db) {
          blob_db::BlobDBOptions blob_db_options;
          blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
          blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
          blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
          blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
          blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;

          blob_db::BlobDB* blob_db = nullptr;
          s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
                                    cf_descriptors, &column_families_,
                                    &blob_db);
          if (s.ok()) {
            db_ = blob_db;
          }
        } else
#endif  // !ROCKSDB_LITE
        {
          if (db_preload_finished_.load() && FLAGS_read_only) {
            s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
                                    cf_descriptors, &column_families_, &db_);
          } else {
            s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
                         &column_families_, &db_);
          }
        }

#ifndef NDEBUG
        if (ingest_meta_error || ingest_write_error || ingest_read_error) {
          fault_fs_guard->SetFilesystemDirectWritable(true);
          fault_fs_guard->DisableMetadataWriteErrorInjection();
          fault_fs_guard->DisableWriteErrorInjection();
          fault_fs_guard->SetSkipDirectWritableTypes({});
          fault_fs_guard->SetRandomReadError(0);
          if (s.ok()) {
            // Ingested errors might happen in background compactions. We
            // wait for all compactions to finish to make sure DB is in
            // clean state before executing queries.
            s = static_cast_with_check<DBImpl>(db_->GetRootDB())
                    ->TEST_WaitForCompact(true);
            if (!s.ok()) {
              for (auto cf : column_families_) {
                delete cf;
              }
              column_families_.clear();
              delete db_;
              db_ = nullptr;
            }
          }
          if (!s.ok()) {
            // After failure to opening a DB due to IO error, retry should
            // successfully open the DB with correct data if no IO error shows
            // up.
            ingest_meta_error = false;
            ingest_write_error = false;
            ingest_read_error = false;

            Random rand(static_cast<uint32_t>(FLAGS_seed));
            if (rand.OneIn(2)) {
              fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
                                                                 nullptr);
            }
            if (rand.OneIn(3)) {
              fault_fs_guard->DropUnsyncedFileData();
            } else if (rand.OneIn(2)) {
              fault_fs_guard->DropRandomUnsyncedFileData(&rand);
            }
            continue;
          }
        }
        break;
      }
#endif  // NDEBUG
    } else {
#ifndef ROCKSDB_LITE
      TransactionDBOptions txn_db_options;
      assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
      txn_db_options.write_policy =
          static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
      if (FLAGS_unordered_write) {
        assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
        options_.unordered_write = true;
        options_.two_write_queues = true;
        txn_db_options.skip_concurrency_control = true;
      } else {
        options_.two_write_queues = FLAGS_two_write_queues;
      }
      txn_db_options.wp_snapshot_cache_bits =
          static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
      txn_db_options.wp_commit_cache_bits =
          static_cast<size_t>(FLAGS_wp_commit_cache_bits);
      s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
                              cf_descriptors, &column_families_, &txn_db_);
      if (!s.ok()) {
        fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
                s.ToString().c_str());
        fflush(stderr);
      }
      assert(s.ok());
      db_ = txn_db_;
      // after a crash, rollback to commit recovered transactions
      std::vector<Transaction*> trans;
      txn_db_->GetAllPreparedTransactions(&trans);
      Random rand(static_cast<uint32_t>(FLAGS_seed));
      for (auto txn : trans) {
        if (rand.OneIn(2)) {
          s = txn->Commit();
          assert(s.ok());
        } else {
          s = txn->Rollback();
          assert(s.ok());
        }
        delete txn;
      }
      trans.clear();
      txn_db_->GetAllPreparedTransactions(&trans);
      assert(trans.size() == 0);
#endif
    }
    assert(!s.ok() || column_families_.size() ==
                          static_cast<size_t>(FLAGS_column_families));

    if (s.ok() && FLAGS_test_secondary) {
#ifndef ROCKSDB_LITE
      secondaries_.resize(FLAGS_threads);
      std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
      secondary_cfh_lists_.clear();
      secondary_cfh_lists_.resize(FLAGS_threads);
      Options tmp_opts;
      // TODO(yanqin) support max_open_files != -1 for secondary instance.
      tmp_opts.max_open_files = -1;
      tmp_opts.statistics = dbstats_secondaries;
      tmp_opts.env = db_stress_env;
      for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
        const std::string secondary_path =
            FLAGS_secondaries_base + "/" + std::to_string(i);
        s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
                                cf_descriptors, &secondary_cfh_lists_[i],
                                &secondaries_[i]);
        if (!s.ok()) {
          break;
        }
      }
#else
      fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
      exit(1);
#endif
    }
    // Secondary instance does not support write-prepared/write-unprepared
    // transactions, thus just disable secondary instance if we use
    // transaction.
    if (s.ok() && FLAGS_continuous_verification_interval > 0 &&
        !FLAGS_use_txn && !cmp_db_) {
      Options tmp_opts;
      // TODO(yanqin) support max_open_files != -1 for secondary instance.
      tmp_opts.max_open_files = -1;
      tmp_opts.env = db_stress_env;
      std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
      s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
                              cf_descriptors, &cmp_cfhs_, &cmp_db_);
      assert(!s.ok() ||
             cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
    }
  } else {
#ifndef ROCKSDB_LITE
    DBWithTTL* db_with_ttl;
    s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
    db_ = db_with_ttl;
    if (FLAGS_test_secondary) {
      secondaries_.resize(FLAGS_threads);
      std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
      Options tmp_opts;
      tmp_opts.env = options_.env;
      // TODO(yanqin) support max_open_files != -1 for secondary instance.
      tmp_opts.max_open_files = -1;
      for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
        const std::string secondary_path =
            FLAGS_secondaries_base + "/" + std::to_string(i);
        s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
                                &secondaries_[i]);
        if (!s.ok()) {
          break;
        }
      }
    }
#else
    fprintf(stderr, "TTL is not supported in RocksDBLite\n");
    exit(1);
#endif
  }
  if (!s.ok()) {
    fprintf(stderr, "open error: %s\n", s.ToString().c_str());
    exit(1);
  }
}