in tools/db_bench_tool.cc [6075:6276]
void MixGraph(ThreadState* thread) {
int64_t gets = 0;
int64_t puts = 0;
int64_t get_found = 0;
int64_t seek = 0;
int64_t seek_found = 0;
int64_t bytes = 0;
double total_scan_length = 0;
double total_val_size = 0;
const int64_t default_value_max = 1 * 1024 * 1024;
int64_t value_max = default_value_max;
int64_t scan_len_max = FLAGS_mix_max_scan_len;
double write_rate = 1000000.0;
double read_rate = 1000000.0;
bool use_prefix_modeling = false;
bool use_random_modeling = false;
GenerateTwoTermExpKeys gen_exp;
std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
FLAGS_mix_seek_ratio};
char value_buffer[default_value_max];
QueryDecider query;
RandomGenerator gen;
Status s;
if (value_max > FLAGS_mix_max_value_size) {
value_max = FLAGS_mix_max_value_size;
}
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
PinnableSlice pinnable_val;
query.Initiate(ratio);
// the limit of qps initiation
if (FLAGS_sine_mix_rate) {
thread->shared->read_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(read_rate)));
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
}
// Decide if user wants to use prefix based key generation
if (FLAGS_keyrange_dist_a != 0.0 || FLAGS_keyrange_dist_b != 0.0 ||
FLAGS_keyrange_dist_c != 0.0 || FLAGS_keyrange_dist_d != 0.0) {
use_prefix_modeling = true;
gen_exp.InitiateExpDistribution(
FLAGS_num, FLAGS_keyrange_dist_a, FLAGS_keyrange_dist_b,
FLAGS_keyrange_dist_c, FLAGS_keyrange_dist_d);
}
if (FLAGS_key_dist_a == 0 || FLAGS_key_dist_b == 0) {
use_random_modeling = true;
}
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
int64_t ini_rand, rand_v, key_rand, key_seed;
ini_rand = GetRandomKey(&thread->rand);
rand_v = ini_rand % FLAGS_num;
double u = static_cast<double>(rand_v) / FLAGS_num;
// Generate the keyID based on the key hotness and prefix hotness
if (use_random_modeling) {
key_rand = ini_rand;
} else if (use_prefix_modeling) {
key_rand =
gen_exp.DistGetKeyID(ini_rand, FLAGS_key_dist_a, FLAGS_key_dist_b);
} else {
key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
Random64 rand(key_seed);
key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
}
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
int query_type = query.GetType(rand_v);
// change the qps
uint64_t now = FLAGS_env->NowMicros();
uint64_t usecs_since_last;
if (now > thread->stats.GetSineInterval()) {
usecs_since_last = now - thread->stats.GetSineInterval();
} else {
usecs_since_last = 0;
}
if (FLAGS_sine_mix_rate &&
usecs_since_last >
(FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
double usecs_since_start =
static_cast<double>(now - thread->stats.GetStart());
thread->stats.ResetSineInterval();
double mix_rate_with_noise = AddNoise(
SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
write_rate = mix_rate_with_noise * query.ratio_[1];
if (read_rate > 0) {
thread->shared->read_rate_limiter->SetBytesPerSecond(
static_cast<int64_t>(read_rate));
}
if (write_rate > 0) {
thread->shared->write_rate_limiter->SetBytesPerSecond(
static_cast<int64_t>(write_rate));
}
}
// Start the query
if (query_type == 0) {
// the Get query
gets++;
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Get(read_options_, db_with_cfh->GetCfh(key_rand),
key, &pinnable_val);
} else {
pinnable_val.Reset();
s = db_with_cfh->db->Get(read_options_,
db_with_cfh->db->DefaultColumnFamily(), key,
&pinnable_val);
}
if (s.ok()) {
get_found++;
bytes += key.size() + pinnable_val.size();
} else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}
if (thread->shared->read_rate_limiter && (gets + seek) % 100 == 0) {
thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH,
nullptr /*stats*/);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
} else if (query_type == 1) {
// the Put query
puts++;
int64_t val_size = ParetoCdfInversion(
u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
if (val_size < 10) {
val_size = 10;
} else if (val_size > value_max) {
val_size = val_size % value_max;
}
total_val_size += val_size;
s = db_with_cfh->db->Put(
write_options_, key,
gen.Generate(static_cast<unsigned int>(val_size)));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
ErrorExit();
}
if (thread->shared->write_rate_limiter && puts % 100 == 0) {
thread->shared->write_rate_limiter->Request(100, Env::IO_HIGH,
nullptr /*stats*/);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
} else if (query_type == 2) {
// Seek query
if (db_with_cfh->db != nullptr) {
Iterator* single_iter = nullptr;
single_iter = db_with_cfh->db->NewIterator(read_options_);
if (single_iter != nullptr) {
single_iter->Seek(key);
seek++;
if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
seek_found++;
}
int64_t scan_length =
ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
FLAGS_iter_sigma) %
scan_len_max;
for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
Slice value = single_iter->value();
memcpy(value_buffer, value.data(),
std::min(value.size(), sizeof(value_buffer)));
bytes += single_iter->key().size() + single_iter->value().size();
single_iter->Next();
assert(single_iter->status().ok());
total_scan_length++;
}
}
delete single_iter;
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
}
}
char msg[256];
snprintf(msg, sizeof(msg),
"( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64
", reads %" PRIu64 " in %" PRIu64
" found, "
"avg size: %.1f value, %.1f scan)\n",
gets, puts, seek, get_found + seek_found, gets + seek,
total_val_size / puts, total_scan_length / seek);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
}