in tools/db_bench_tool.cc [3104:3574]
void Run() {
if (!SanityCheck()) {
ErrorExit();
}
Open(&open_options_);
PrintHeader(open_options_);
std::stringstream benchmark_stream(FLAGS_benchmarks);
std::string name;
std::unique_ptr<ExpiredTimeFilter> filter;
while (std::getline(benchmark_stream, name, ',')) {
// Sanitize parameters
num_ = FLAGS_num;
reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
value_size = FLAGS_value_size;
key_size_ = FLAGS_key_size;
entries_per_batch_ = FLAGS_batch_size;
writes_before_delete_range_ = FLAGS_writes_before_delete_range;
writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
range_tombstone_width_ = FLAGS_range_tombstone_width;
max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
write_options_ = WriteOptions();
read_random_exp_range_ = FLAGS_read_random_exp_range;
if (FLAGS_sync) {
write_options_.sync = true;
}
write_options_.disableWAL = FLAGS_disable_wal;
write_options_.rate_limiter_priority =
FLAGS_rate_limit_auto_wal_flush ? Env::IO_USER : Env::IO_TOTAL;
read_options_ = ReadOptions(FLAGS_verify_checksum, true);
read_options_.total_order_seek = FLAGS_total_order_seek;
read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start;
read_options_.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
read_options_.tailing = FLAGS_use_tailing_iterator;
read_options_.readahead_size = FLAGS_readahead_size;
read_options_.adaptive_readahead = FLAGS_adaptive_readahead;
read_options_.async_io = FLAGS_async_io;
void (Benchmark::*method)(ThreadState*) = nullptr;
void (Benchmark::*post_process_method)() = nullptr;
bool fresh_db = false;
int num_threads = FLAGS_threads;
int num_repeat = 1;
int num_warmup = 0;
if (!name.empty() && *name.rbegin() == ']') {
auto it = name.find('[');
if (it == std::string::npos) {
fprintf(stderr, "unknown benchmark arguments '%s'\n", name.c_str());
ErrorExit();
}
std::string args = name.substr(it + 1);
args.resize(args.size() - 1);
name.resize(it);
std::string bench_arg;
std::stringstream args_stream(args);
while (std::getline(args_stream, bench_arg, '-')) {
if (bench_arg.empty()) {
continue;
}
if (bench_arg[0] == 'X') {
// Repeat the benchmark n times
std::string num_str = bench_arg.substr(1);
num_repeat = std::stoi(num_str);
} else if (bench_arg[0] == 'W') {
// Warm up the benchmark for n times
std::string num_str = bench_arg.substr(1);
num_warmup = std::stoi(num_str);
}
}
}
// Both fillseqdeterministic and filluniquerandomdeterministic
// fill the levels except the max level with UNIQUE_RANDOM
// and fill the max level with fillseq and filluniquerandom, respectively
if (name == "fillseqdeterministic" ||
name == "filluniquerandomdeterministic") {
if (!FLAGS_disable_auto_compactions) {
fprintf(stderr,
"Please disable_auto_compactions in FillDeterministic "
"benchmark\n");
ErrorExit();
}
if (num_threads > 1) {
fprintf(stderr,
"filldeterministic multithreaded not supported"
", use 1 thread\n");
num_threads = 1;
}
fresh_db = true;
if (name == "fillseqdeterministic") {
method = &Benchmark::WriteSeqDeterministic;
} else {
method = &Benchmark::WriteUniqueRandomDeterministic;
}
} else if (name == "fillseq") {
fresh_db = true;
method = &Benchmark::WriteSeq;
} else if (name == "fillbatch") {
fresh_db = true;
entries_per_batch_ = 1000;
method = &Benchmark::WriteSeq;
} else if (name == "fillrandom") {
fresh_db = true;
method = &Benchmark::WriteRandom;
} else if (name == "filluniquerandom" ||
name == "fillanddeleteuniquerandom") {
fresh_db = true;
if (num_threads > 1) {
fprintf(stderr,
"filluniquerandom and fillanddeleteuniquerandom "
"multithreaded not supported, use 1 thread");
num_threads = 1;
}
method = &Benchmark::WriteUniqueRandom;
} else if (name == "overwrite") {
method = &Benchmark::WriteRandom;
} else if (name == "fillsync") {
fresh_db = true;
num_ /= 1000;
write_options_.sync = true;
method = &Benchmark::WriteRandom;
} else if (name == "fill100K") {
fresh_db = true;
num_ /= 1000;
value_size = 100 * 1000;
method = &Benchmark::WriteRandom;
} else if (name == "readseq") {
method = &Benchmark::ReadSequential;
} else if (name == "readtorowcache") {
if (!FLAGS_use_existing_keys || !FLAGS_row_cache_size) {
fprintf(stderr,
"Please set use_existing_keys to true and specify a "
"row cache size in readtorowcache benchmark\n");
ErrorExit();
}
method = &Benchmark::ReadToRowCache;
} else if (name == "readtocache") {
method = &Benchmark::ReadSequential;
num_threads = 1;
reads_ = num_;
} else if (name == "readreverse") {
method = &Benchmark::ReadReverse;
} else if (name == "readrandom") {
if (FLAGS_multiread_stride) {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
}
method = &Benchmark::ReadRandom;
} else if (name == "readrandomfast") {
method = &Benchmark::ReadRandomFast;
} else if (name == "multireadrandom") {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
} else if (name == "approximatesizerandom") {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::ApproximateSizeRandom;
} else if (name == "mixgraph") {
method = &Benchmark::MixGraph;
} else if (name == "readmissing") {
++key_size_;
method = &Benchmark::ReadRandom;
} else if (name == "newiterator") {
method = &Benchmark::IteratorCreation;
} else if (name == "newiteratorwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::IteratorCreationWhileWriting;
} else if (name == "seekrandom") {
method = &Benchmark::SeekRandom;
} else if (name == "seekrandomwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::SeekRandomWhileWriting;
} else if (name == "seekrandomwhilemerging") {
num_threads++; // Add extra thread for merging
method = &Benchmark::SeekRandomWhileMerging;
} else if (name == "readrandomsmall") {
reads_ /= 1000;
method = &Benchmark::ReadRandom;
} else if (name == "deleteseq") {
method = &Benchmark::DeleteSeq;
} else if (name == "deleterandom") {
method = &Benchmark::DeleteRandom;
} else if (name == "readwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileWriting;
} else if (name == "readwhilemerging") {
num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileMerging;
} else if (name == "readwhilescanning") {
num_threads++; // Add extra thread for scaning
method = &Benchmark::ReadWhileScanning;
} else if (name == "readrandomwriterandom") {
method = &Benchmark::ReadRandomWriteRandom;
} else if (name == "readrandommergerandom") {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.c_str());
ErrorExit();
}
method = &Benchmark::ReadRandomMergeRandom;
} else if (name == "updaterandom") {
method = &Benchmark::UpdateRandom;
} else if (name == "xorupdaterandom") {
method = &Benchmark::XORUpdateRandom;
} else if (name == "appendrandom") {
method = &Benchmark::AppendRandom;
} else if (name == "mergerandom") {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.c_str());
exit(1);
}
method = &Benchmark::MergeRandom;
} else if (name == "randomwithverify") {
method = &Benchmark::RandomWithVerify;
} else if (name == "fillseekseq") {
method = &Benchmark::WriteSeqSeekSeq;
} else if (name == "compact") {
method = &Benchmark::Compact;
} else if (name == "compactall") {
CompactAll();
#ifndef ROCKSDB_LITE
} else if (name == "compact0") {
CompactLevel(0);
} else if (name == "compact1") {
CompactLevel(1);
} else if (name == "waitforcompaction") {
WaitForCompaction();
#endif
} else if (name == "flush") {
Flush();
} else if (name == "crc32c") {
method = &Benchmark::Crc32c;
} else if (name == "xxhash") {
method = &Benchmark::xxHash;
} else if (name == "xxhash64") {
method = &Benchmark::xxHash64;
} else if (name == "xxh3") {
method = &Benchmark::xxh3;
} else if (name == "acquireload") {
method = &Benchmark::AcquireLoad;
} else if (name == "compress") {
method = &Benchmark::Compress;
} else if (name == "uncompress") {
method = &Benchmark::Uncompress;
#ifndef ROCKSDB_LITE
} else if (name == "randomtransaction") {
method = &Benchmark::RandomTransaction;
post_process_method = &Benchmark::RandomTransactionVerify;
#endif // ROCKSDB_LITE
} else if (name == "randomreplacekeys") {
fresh_db = true;
method = &Benchmark::RandomReplaceKeys;
} else if (name == "timeseries") {
timestamp_emulator_.reset(new TimestampEmulator());
if (FLAGS_expire_style == "compaction_filter") {
filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
fprintf(stdout, "Compaction filter is used to remove expired data");
open_options_.compaction_filter = filter.get();
}
fresh_db = true;
method = &Benchmark::TimeSeries;
} else if (name == "stats") {
PrintStats("rocksdb.stats");
} else if (name == "resetstats") {
ResetStats();
} else if (name == "verify") {
VerifyDBFromDB(FLAGS_truth_db);
} else if (name == "levelstats") {
PrintStats("rocksdb.levelstats");
} else if (name == "memstats") {
std::vector<std::string> keys{"rocksdb.num-immutable-mem-table",
"rocksdb.cur-size-active-mem-table",
"rocksdb.cur-size-all-mem-tables",
"rocksdb.size-all-mem-tables",
"rocksdb.num-entries-active-mem-table",
"rocksdb.num-entries-imm-mem-tables"};
PrintStats(keys);
} else if (name == "sstables") {
PrintStats("rocksdb.sstables");
} else if (name == "stats_history") {
PrintStatsHistory();
#ifndef ROCKSDB_LITE
} else if (name == "replay") {
if (num_threads > 1) {
fprintf(stderr, "Multi-threaded replay is not yet supported\n");
ErrorExit();
}
if (FLAGS_trace_file == "") {
fprintf(stderr, "Please set --trace_file to be replayed from\n");
ErrorExit();
}
method = &Benchmark::Replay;
#endif // ROCKSDB_LITE
} else if (name == "getmergeoperands") {
method = &Benchmark::GetMergeOperands;
#ifndef ROCKSDB_LITE
} else if (name == "verifychecksum") {
method = &Benchmark::VerifyChecksum;
} else if (name == "verifyfilechecksums") {
method = &Benchmark::VerifyFileChecksums;
#endif // ROCKSDB_LITE
} else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
ErrorExit();
}
if (fresh_db) {
if (FLAGS_use_existing_db) {
fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
name.c_str());
method = nullptr;
} else {
if (db_.db != nullptr) {
db_.DeleteDBs();
DestroyDB(FLAGS_db, open_options_);
}
Options options = open_options_;
for (size_t i = 0; i < multi_dbs_.size(); i++) {
delete multi_dbs_[i].db;
if (!open_options_.wal_dir.empty()) {
options.wal_dir = GetPathForMultiple(open_options_.wal_dir, i);
}
DestroyDB(GetPathForMultiple(FLAGS_db, i), options);
}
multi_dbs_.clear();
}
Open(&open_options_); // use open_options for the last accessed
}
if (method != nullptr) {
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
#ifndef ROCKSDB_LITE
// A trace_file option can be provided both for trace and replay
// operations. But db_bench does not support tracing and replaying at
// the same time, for now. So, start tracing only when it is not a
// replay.
if (FLAGS_trace_file != "" && name != "replay") {
std::unique_ptr<TraceWriter> trace_writer;
Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
FLAGS_trace_file, &trace_writer);
if (!s.ok()) {
fprintf(stderr, "Encountered an error starting a trace, %s\n",
s.ToString().c_str());
ErrorExit();
}
s = db_.db->StartTrace(trace_options_, std::move(trace_writer));
if (!s.ok()) {
fprintf(stderr, "Encountered an error starting a trace, %s\n",
s.ToString().c_str());
ErrorExit();
}
fprintf(stdout, "Tracing the workload to: [%s]\n",
FLAGS_trace_file.c_str());
}
// Start block cache tracing.
if (!FLAGS_block_cache_trace_file.empty()) {
// Sanity checks.
if (FLAGS_block_cache_trace_sampling_frequency <= 0) {
fprintf(stderr,
"Block cache trace sampling frequency must be higher than "
"0.\n");
ErrorExit();
}
if (FLAGS_block_cache_trace_max_trace_file_size_in_bytes <= 0) {
fprintf(stderr,
"The maximum file size for block cache tracing must be "
"higher than 0.\n");
ErrorExit();
}
block_cache_trace_options_.max_trace_file_size =
FLAGS_block_cache_trace_max_trace_file_size_in_bytes;
block_cache_trace_options_.sampling_frequency =
FLAGS_block_cache_trace_sampling_frequency;
std::unique_ptr<TraceWriter> block_cache_trace_writer;
Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
FLAGS_block_cache_trace_file,
&block_cache_trace_writer);
if (!s.ok()) {
fprintf(stderr,
"Encountered an error when creating trace writer, %s\n",
s.ToString().c_str());
ErrorExit();
}
s = db_.db->StartBlockCacheTrace(block_cache_trace_options_,
std::move(block_cache_trace_writer));
if (!s.ok()) {
fprintf(
stderr,
"Encountered an error when starting block cache tracing, %s\n",
s.ToString().c_str());
ErrorExit();
}
fprintf(stdout, "Tracing block cache accesses to: [%s]\n",
FLAGS_block_cache_trace_file.c_str());
}
#endif // ROCKSDB_LITE
if (num_warmup > 0) {
printf("Warming up benchmark by running %d times\n", num_warmup);
}
for (int i = 0; i < num_warmup; i++) {
RunBenchmark(num_threads, name, method);
}
if (num_repeat > 1) {
printf("Running benchmark for %d times\n", num_repeat);
}
CombinedStats combined_stats;
for (int i = 0; i < num_repeat; i++) {
Stats stats = RunBenchmark(num_threads, name, method);
combined_stats.AddStats(stats);
}
if (num_repeat > 1) {
combined_stats.Report(name);
}
}
if (post_process_method != nullptr) {
(this->*post_process_method)();
}
}
if (secondary_update_thread_) {
secondary_update_stopped_.store(1, std::memory_order_relaxed);
secondary_update_thread_->join();
secondary_update_thread_.reset();
}
#ifndef ROCKSDB_LITE
if (name != "replay" && FLAGS_trace_file != "") {
Status s = db_.db->EndTrace();
if (!s.ok()) {
fprintf(stderr, "Encountered an error ending the trace, %s\n",
s.ToString().c_str());
}
}
if (!FLAGS_block_cache_trace_file.empty()) {
Status s = db_.db->EndBlockCacheTrace();
if (!s.ok()) {
fprintf(stderr,
"Encountered an error ending the block cache tracing, %s\n",
s.ToString().c_str());
}
}
#endif // ROCKSDB_LITE
if (FLAGS_statistics) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
}
if (FLAGS_simcache_size >= 0) {
fprintf(
stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
static_cast_with_check<SimCache>(cache_.get())->ToString().c_str());
}
#ifndef ROCKSDB_LITE
if (FLAGS_use_secondary_db) {
fprintf(stdout, "Secondary instance updated %" PRIu64 " times.\n",
secondary_db_updates_);
}
#endif // ROCKSDB_LITE
}