in db_stress_tool/db_stress_test_base.cc [2303:2850]
void StressTest::Open() {
assert(db_ == nullptr);
#ifndef ROCKSDB_LITE
assert(txn_db_ == nullptr);
#endif
if (FLAGS_options_file.empty()) {
BlockBasedTableOptions block_based_options;
block_based_options.block_cache = cache_;
block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks;
block_based_options.metadata_cache_options.top_level_index_pinning =
static_cast<PinningTier>(FLAGS_top_level_index_pinning);
block_based_options.metadata_cache_options.partition_pinning =
static_cast<PinningTier>(FLAGS_partition_pinning);
block_based_options.metadata_cache_options.unpartitioned_pinning =
static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.checksum = checksum_type_e;
block_based_options.block_size = FLAGS_block_size;
block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval =
static_cast<int32_t>(FLAGS_index_block_restart_interval);
block_based_options.filter_policy = filter_policy_;
block_based_options.partition_filters = FLAGS_partition_filters;
block_based_options.optimize_filters_for_memory =
FLAGS_optimize_filters_for_memory;
block_based_options.detect_filter_construct_corruption =
FLAGS_detect_filter_construct_corruption;
block_based_options.index_type =
static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
block_based_options.prepopulate_block_cache =
static_cast<BlockBasedTableOptions::PrepopulateBlockCache>(
FLAGS_prepopulate_block_cache);
options_.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
options_.write_buffer_size = FLAGS_write_buffer_size;
options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
options_.min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge;
options_.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options_.max_write_buffer_size_to_maintain =
FLAGS_max_write_buffer_size_to_maintain;
options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
options_.disable_auto_compactions = FLAGS_disable_auto_compactions;
options_.max_background_compactions = FLAGS_max_background_compactions;
options_.max_background_flushes = FLAGS_max_background_flushes;
options_.compaction_style =
static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
if (FLAGS_prefix_size >= 0) {
options_.prefix_extractor.reset(
NewFixedPrefixTransform(FLAGS_prefix_size));
}
options_.max_open_files = FLAGS_open_files;
options_.statistics = dbstats;
options_.env = db_stress_env;
options_.use_fsync = FLAGS_use_fsync;
options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
options_.allow_mmap_reads = FLAGS_mmap_read;
options_.allow_mmap_writes = FLAGS_mmap_write;
options_.use_direct_reads = FLAGS_use_direct_reads;
options_.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
options_.recycle_log_file_num =
static_cast<size_t>(FLAGS_recycle_log_file_num);
options_.target_file_size_base = FLAGS_target_file_size_base;
options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
options_.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier;
options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
options_.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options_.level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger;
options_.compression = compression_type_e;
options_.bottommost_compression = bottommost_compression_type_e;
options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
options_.compression_opts.zstd_max_train_bytes =
FLAGS_compression_zstd_max_train_bytes;
options_.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options_.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
options_.create_if_missing = true;
options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
options_.inplace_update_support = FLAGS_in_place_update;
options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options_.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options_.experimental_mempurge_threshold =
FLAGS_experimental_mempurge_threshold;
options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
options_.ttl = FLAGS_compaction_ttl;
options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
options_.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options_.compaction_options_universal.size_ratio =
FLAGS_universal_size_ratio;
options_.compaction_options_universal.min_merge_width =
FLAGS_universal_min_merge_width;
options_.compaction_options_universal.max_merge_width =
FLAGS_universal_max_merge_width;
options_.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
options_.atomic_flush = FLAGS_atomic_flush;
options_.avoid_unnecessary_blocking_io =
FLAGS_avoid_unnecessary_blocking_io;
options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
options_.max_write_batch_group_size_bytes =
FLAGS_max_write_batch_group_size_bytes;
options_.level_compaction_dynamic_level_bytes =
FLAGS_level_compaction_dynamic_level_bytes;
options_.file_checksum_gen_factory =
GetFileChecksumImpl(FLAGS_file_checksum_impl);
options_.track_and_verify_wals_in_manifest = true;
// Integrated BlobDB
options_.enable_blob_files = FLAGS_enable_blob_files;
options_.min_blob_size = FLAGS_min_blob_size;
options_.blob_file_size = FLAGS_blob_file_size;
options_.blob_compression_type =
StringToCompressionType(FLAGS_blob_compression_type.c_str());
options_.enable_blob_garbage_collection =
FLAGS_enable_blob_garbage_collection;
options_.blob_garbage_collection_age_cutoff =
FLAGS_blob_garbage_collection_age_cutoff;
options_.blob_garbage_collection_force_threshold =
FLAGS_blob_garbage_collection_force_threshold;
options_.blob_compaction_readahead_size =
FLAGS_blob_compaction_readahead_size;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
exit(1);
#else
DBOptions db_options;
std::vector<ColumnFamilyDescriptor> cf_descriptors;
Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
&db_options, &cf_descriptors);
db_options.env = new DbStressEnvWrapper(db_stress_env);
if (!s.ok()) {
fprintf(stderr, "Unable to load options file %s --- %s\n",
FLAGS_options_file.c_str(), s.ToString().c_str());
exit(1);
}
options_ = Options(db_options, cf_descriptors[0].options);
#endif // ROCKSDB_LITE
}
if (FLAGS_rate_limiter_bytes_per_sec > 0) {
options_.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly));
}
if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
FLAGS_sst_file_manager_bytes_per_truncate > 0) {
Status status;
options_.sst_file_manager.reset(NewSstFileManager(
db_stress_env, options_.info_log, "" /* trash_dir */,
static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
true /* delete_existing_trash */, &status,
0.25 /* max_trash_db_ratio */,
FLAGS_sst_file_manager_bytes_per_truncate));
if (!status.ok()) {
fprintf(stderr, "SstFileManager creation failed: %s\n",
status.ToString().c_str());
exit(1);
}
}
if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
fprintf(stderr,
"prefeix_size cannot be zero if memtablerep == prefix_hash\n");
exit(1);
}
if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
fprintf(stderr,
"WARNING: prefix_size is non-zero but "
"memtablerep != prefix_hash\n");
}
switch (FLAGS_rep_factory) {
case kSkipList:
// no need to do anything
break;
#ifndef ROCKSDB_LITE
case kHashSkipList:
options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
break;
case kVectorRep:
options_.memtable_factory.reset(new VectorRepFactory());
break;
#else
default:
fprintf(stderr,
"RocksdbLite only supports skip list mem table. Skip "
"--rep_factory\n");
#endif // ROCKSDB_LITE
}
if (FLAGS_use_full_merge_v1) {
options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
} else {
options_.merge_operator = MergeOperators::CreatePutOperator();
}
if (FLAGS_enable_compaction_filter) {
options_.compaction_filter_factory =
std::make_shared<DbStressCompactionFilterFactory>();
}
options_.table_properties_collector_factories.emplace_back(
std::make_shared<DbStressTablePropertiesCollectorFactory>());
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
FLAGS_allow_setting_blob_options_dynamically) &&
FLAGS_best_efforts_recovery) {
fprintf(stderr,
"Integrated BlobDB is currently incompatible with best-effort "
"recovery\n");
exit(1);
}
fprintf(stdout,
"Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64
", blob file size %" PRIu64
", blob compression type %s, blob GC enabled %d, cutoff %f, force "
"threshold %f, blob compaction readahead size %" PRIu64 "\n",
options_.enable_blob_files, options_.min_blob_size,
options_.blob_file_size,
CompressionTypeToString(options_.blob_compression_type).c_str(),
options_.enable_blob_garbage_collection,
options_.blob_garbage_collection_age_cutoff,
options_.blob_garbage_collection_force_threshold,
options_.blob_compaction_readahead_size);
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
Status s;
if (FLAGS_user_timestamp_size > 0) {
CheckAndSetOptionsForUserTimestamp();
}
if (FLAGS_ttl == -1) {
std::vector<std::string> existing_column_families;
s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
&existing_column_families); // ignore errors
if (!s.ok()) {
// DB doesn't exist
assert(existing_column_families.empty());
assert(column_family_names_.empty());
column_family_names_.push_back(kDefaultColumnFamilyName);
} else if (column_family_names_.empty()) {
// this is the first call to the function Open()
column_family_names_ = existing_column_families;
} else {
// this is a reopen. just assert that existing column_family_names are
// equivalent to what we remember
auto sorted_cfn = column_family_names_;
std::sort(sorted_cfn.begin(), sorted_cfn.end());
std::sort(existing_column_families.begin(),
existing_column_families.end());
if (sorted_cfn != existing_column_families) {
fprintf(stderr, "Expected column families differ from the existing:\n");
fprintf(stderr, "Expected: {");
for (auto cf : sorted_cfn) {
fprintf(stderr, "%s ", cf.c_str());
}
fprintf(stderr, "}\n");
fprintf(stderr, "Existing: {");
for (auto cf : existing_column_families) {
fprintf(stderr, "%s ", cf.c_str());
}
fprintf(stderr, "}\n");
}
assert(sorted_cfn == existing_column_families);
}
std::vector<ColumnFamilyDescriptor> cf_descriptors;
for (auto name : column_family_names_) {
if (name != kDefaultColumnFamilyName) {
new_column_family_name_ =
std::max(new_column_family_name_.load(), std::stoi(name) + 1);
}
cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
}
while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
std::string name = ToString(new_column_family_name_.load());
new_column_family_name_++;
cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
column_family_names_.push_back(name);
}
options_.listeners.clear();
#ifndef ROCKSDB_LITE
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
#endif // !ROCKSDB_LITE
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
#ifndef NDEBUG
// Determine whether we need to ingest file metadata write failures
// during DB reopen. If it does, enable it.
// Only ingest metadata error if it is reopening, as initial open
// failure doesn't need to be handled.
// TODO cover transaction DB is not covered in this fault test too.
bool ingest_meta_error = false;
bool ingest_write_error = false;
bool ingest_read_error = false;
if ((FLAGS_open_metadata_write_fault_one_in ||
FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) &&
fault_fs_guard
->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
.ok()) {
if (!FLAGS_sync) {
// When DB Stress is not sync mode, we expect all WAL writes to
// WAL is durable. Buffering unsynced writes will cause false
// positive in crash tests. Before we figure out a way to
// solve it, skip WAL from failure injection.
fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
}
ingest_meta_error = FLAGS_open_metadata_write_fault_one_in;
ingest_write_error = FLAGS_open_write_fault_one_in;
ingest_read_error = FLAGS_open_read_fault_one_in;
if (ingest_meta_error) {
fault_fs_guard->EnableMetadataWriteErrorInjection();
fault_fs_guard->SetRandomMetadataWriteError(
FLAGS_open_metadata_write_fault_one_in);
}
if (ingest_write_error) {
fault_fs_guard->SetFilesystemDirectWritable(false);
fault_fs_guard->EnableWriteErrorInjection();
fault_fs_guard->SetRandomWriteError(
static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
IOStatus::IOError("Injected Open Error"),
/*inject_for_all_file_types=*/true, /*types=*/{});
}
if (ingest_read_error) {
fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in);
}
}
while (true) {
#endif // NDEBUG
#ifndef ROCKSDB_LITE
// StackableDB-based BlobDB
if (FLAGS_use_blob_db) {
blob_db::BlobDBOptions blob_db_options;
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
blob_db::BlobDB* blob_db = nullptr;
s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
cf_descriptors, &column_families_,
&blob_db);
if (s.ok()) {
db_ = blob_db;
}
} else
#endif // !ROCKSDB_LITE
{
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
} else {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
}
}
#ifndef NDEBUG
if (ingest_meta_error || ingest_write_error || ingest_read_error) {
fault_fs_guard->SetFilesystemDirectWritable(true);
fault_fs_guard->DisableMetadataWriteErrorInjection();
fault_fs_guard->DisableWriteErrorInjection();
fault_fs_guard->SetSkipDirectWritableTypes({});
fault_fs_guard->SetRandomReadError(0);
if (s.ok()) {
// Ingested errors might happen in background compactions. We
// wait for all compactions to finish to make sure DB is in
// clean state before executing queries.
s = static_cast_with_check<DBImpl>(db_->GetRootDB())
->TEST_WaitForCompact(true);
if (!s.ok()) {
for (auto cf : column_families_) {
delete cf;
}
column_families_.clear();
delete db_;
db_ = nullptr;
}
}
if (!s.ok()) {
// After failure to opening a DB due to IO error, retry should
// successfully open the DB with correct data if no IO error shows
// up.
ingest_meta_error = false;
ingest_write_error = false;
ingest_read_error = false;
Random rand(static_cast<uint32_t>(FLAGS_seed));
if (rand.OneIn(2)) {
fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
nullptr);
}
if (rand.OneIn(3)) {
fault_fs_guard->DropUnsyncedFileData();
} else if (rand.OneIn(2)) {
fault_fs_guard->DropRandomUnsyncedFileData(&rand);
}
continue;
}
}
break;
}
#endif // NDEBUG
} else {
#ifndef ROCKSDB_LITE
TransactionDBOptions txn_db_options;
assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
txn_db_options.write_policy =
static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
if (FLAGS_unordered_write) {
assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
options_.unordered_write = true;
options_.two_write_queues = true;
txn_db_options.skip_concurrency_control = true;
} else {
options_.two_write_queues = FLAGS_two_write_queues;
}
txn_db_options.wp_snapshot_cache_bits =
static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
txn_db_options.wp_commit_cache_bits =
static_cast<size_t>(FLAGS_wp_commit_cache_bits);
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
cf_descriptors, &column_families_, &txn_db_);
if (!s.ok()) {
fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
s.ToString().c_str());
fflush(stderr);
}
assert(s.ok());
db_ = txn_db_;
// after a crash, rollback to commit recovered transactions
std::vector<Transaction*> trans;
txn_db_->GetAllPreparedTransactions(&trans);
Random rand(static_cast<uint32_t>(FLAGS_seed));
for (auto txn : trans) {
if (rand.OneIn(2)) {
s = txn->Commit();
assert(s.ok());
} else {
s = txn->Rollback();
assert(s.ok());
}
delete txn;
}
trans.clear();
txn_db_->GetAllPreparedTransactions(&trans);
assert(trans.size() == 0);
#endif
}
assert(!s.ok() || column_families_.size() ==
static_cast<size_t>(FLAGS_column_families));
if (s.ok() && FLAGS_test_secondary) {
#ifndef ROCKSDB_LITE
secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
secondary_cfh_lists_.clear();
secondary_cfh_lists_.resize(FLAGS_threads);
Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.statistics = dbstats_secondaries;
tmp_opts.env = db_stress_env;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i);
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &secondary_cfh_lists_[i],
&secondaries_[i]);
if (!s.ok()) {
break;
}
}
#else
fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
exit(1);
#endif
}
// Secondary instance does not support write-prepared/write-unprepared
// transactions, thus just disable secondary instance if we use
// transaction.
if (s.ok() && FLAGS_continuous_verification_interval > 0 &&
!FLAGS_use_txn && !cmp_db_) {
Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.env = db_stress_env;
std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &cmp_cfhs_, &cmp_db_);
assert(!s.ok() ||
cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
}
} else {
#ifndef ROCKSDB_LITE
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl;
if (FLAGS_test_secondary) {
secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
Options tmp_opts;
tmp_opts.env = options_.env;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i);
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
&secondaries_[i]);
if (!s.ok()) {
break;
}
}
}
#else
fprintf(stderr, "TTL is not supported in RocksDBLite\n");
exit(1);
#endif
}
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}
}