in tools/db_bench_tool.cc [4662:5098]
void DoWrite(ThreadState* thread, WriteMode write_mode) {
const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
const int64_t num_ops = writes_ == 0 ? num_ : writes_;
size_t num_key_gens = 1;
if (db_.db == nullptr) {
num_key_gens = multi_dbs_.size();
}
std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
int64_t max_ops = num_ops * num_key_gens;
int64_t ops_per_stage = max_ops;
if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
FLAGS_num_hot_column_families) +
1;
}
Duration duration(test_duration, max_ops, ops_per_stage);
const uint64_t num_per_key_gen = num_ + max_num_range_tombstones_;
for (size_t i = 0; i < num_key_gens; i++) {
key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode,
num_per_key_gen, ops_per_stage));
}
if (num_ != FLAGS_num) {
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
thread->stats.AddMessage(msg);
}
RandomGenerator gen;
WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, user_timestamp_size_);
Status s;
int64_t bytes = 0;
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
std::unique_ptr<const char[]> begin_key_guard;
Slice begin_key = AllocateKey(&begin_key_guard);
std::unique_ptr<const char[]> end_key_guard;
Slice end_key = AllocateKey(&end_key_guard);
double p = 0.0;
uint64_t num_overwrites = 0, num_unique_keys = 0, num_selective_deletes = 0;
// If user set overwrite_probability flag,
// check if value is in [0.0,1.0].
if (FLAGS_overwrite_probability > 0.0) {
p = FLAGS_overwrite_probability > 1.0 ? 1.0 : FLAGS_overwrite_probability;
// If overwrite set by user, and UNIQUE_RANDOM mode on,
// the overwrite_window_size must be > 0.
if (write_mode == UNIQUE_RANDOM && FLAGS_overwrite_window_size == 0) {
fprintf(stderr,
"Overwrite_window_size must be strictly greater than 0.\n");
ErrorExit();
}
}
// Default_random_engine provides slightly
// improved throughput over mt19937.
std::default_random_engine overwrite_gen{
static_cast<unsigned int>(seed_base)};
std::bernoulli_distribution overwrite_decider(p);
// Inserted key window is filled with the last N
// keys previously inserted into the DB (with
// N=FLAGS_overwrite_window_size).
// We use a deque struct because:
// - random access is O(1)
// - insertion/removal at beginning/end is also O(1).
std::deque<int64_t> inserted_key_window;
Random64 reservoir_id_gen(seed_base);
// --- Variables used in disposable/persistent keys simulation:
// The following variables are used when
// disposable_entries_batch_size is >0. We simualte a workload
// where the following sequence is repeated multiple times:
// "A set of keys S1 is inserted ('disposable entries'), then after
// some delay another set of keys S2 is inserted ('persistent entries')
// and the first set of keys S1 is deleted. S2 artificially represents
// the insertion of hypothetical results from some undefined computation
// done on the first set of keys S1. The next sequence can start as soon
// as the last disposable entry in the set S1 of this sequence is
// inserted, if the delay is non negligible"
bool skip_for_loop = false, is_disposable_entry = true;
std::vector<uint64_t> disposable_entries_index(num_key_gens, 0);
std::vector<uint64_t> persistent_ent_and_del_index(num_key_gens, 0);
const uint64_t kNumDispAndPersEntries =
FLAGS_disposable_entries_batch_size +
FLAGS_persistent_entries_batch_size;
if (kNumDispAndPersEntries > 0) {
if ((write_mode != UNIQUE_RANDOM) || (writes_per_range_tombstone_ > 0) ||
(p > 0.0)) {
fprintf(
stderr,
"Disposable/persistent deletes are not compatible with overwrites "
"and DeleteRanges; and are only supported in filluniquerandom.\n");
ErrorExit();
}
if (FLAGS_disposable_entries_value_size < 0 ||
FLAGS_persistent_entries_value_size < 0) {
fprintf(
stderr,
"disposable_entries_value_size and persistent_entries_value_size"
"have to be positive.\n");
ErrorExit();
}
}
Random rnd_disposable_entry(static_cast<uint32_t>(seed_base));
std::string random_value;
// Queue that stores scheduled timestamp of disposable entries deletes,
// along with starting index of disposable entry keys to delete.
std::vector<std::queue<std::pair<uint64_t, uint64_t>>> disposable_entries_q(
num_key_gens);
// --- End of variables used in disposable/persistent keys simulation.
std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
std::vector<Slice> expanded_keys;
if (FLAGS_expand_range_tombstones) {
expanded_key_guards.resize(range_tombstone_width_);
for (auto& expanded_key_guard : expanded_key_guards) {
expanded_keys.emplace_back(AllocateKey(&expanded_key_guard));
}
}
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
int64_t stage = 0;
int64_t num_written = 0;
int64_t next_seq_db_at = num_ops;
size_t id = 0;
while ((num_per_key_gen != 0) && !duration.Done(entries_per_batch_)) {
if (duration.GetStage() != stage) {
stage = duration.GetStage();
if (db_.db != nullptr) {
db_.CreateNewCf(open_options_, stage);
} else {
for (auto& db : multi_dbs_) {
db.CreateNewCf(open_options_, stage);
}
}
}
if (write_mode != SEQUENTIAL) {
id = thread->rand.Next() % num_key_gens;
} else {
// When doing a sequential load with multiple databases, load them in
// order rather than all at the same time to avoid:
// 1) long delays between flushing memtables
// 2) flushing memtables for all of them at the same point in time
// 3) not putting the same number of keys in each database
if (num_written >= next_seq_db_at) {
next_seq_db_at += num_ops;
id++;
if (id >= num_key_gens) {
fprintf(stderr, "Logic error. Filled all databases\n");
ErrorExit();
}
}
}
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
batch.Clear();
int64_t batch_bytes = 0;
for (int64_t j = 0; j < entries_per_batch_; j++) {
int64_t rand_num = 0;
if ((write_mode == UNIQUE_RANDOM) && (p > 0.0)) {
if ((inserted_key_window.size() > 0) &&
overwrite_decider(overwrite_gen)) {
num_overwrites++;
rand_num = inserted_key_window[reservoir_id_gen.Next() %
inserted_key_window.size()];
} else {
num_unique_keys++;
rand_num = key_gens[id]->Next();
if (inserted_key_window.size() < FLAGS_overwrite_window_size) {
inserted_key_window.push_back(rand_num);
} else {
inserted_key_window.pop_front();
inserted_key_window.push_back(rand_num);
}
}
} else if (kNumDispAndPersEntries > 0) {
// Check if queue is non-empty and if we need to insert
// 'persistent' KV entries (KV entries that are never deleted)
// and delete disposable entries previously inserted.
if (!disposable_entries_q[id].empty() &&
(disposable_entries_q[id].front().first <
FLAGS_env->NowMicros())) {
// If we need to perform a "merge op" pattern,
// we first write all the persistent KV entries not targeted
// by deletes, and then we write the disposable entries deletes.
if (persistent_ent_and_del_index[id] <
FLAGS_persistent_entries_batch_size) {
// Generate key to insert.
rand_num =
key_gens[id]->Fetch(disposable_entries_q[id].front().second +
FLAGS_disposable_entries_batch_size +
persistent_ent_and_del_index[id]);
persistent_ent_and_del_index[id]++;
is_disposable_entry = false;
skip_for_loop = false;
} else if (persistent_ent_and_del_index[id] <
kNumDispAndPersEntries) {
// Find key of the entry to delete.
rand_num =
key_gens[id]->Fetch(disposable_entries_q[id].front().second +
(persistent_ent_and_del_index[id] -
FLAGS_persistent_entries_batch_size));
persistent_ent_and_del_index[id]++;
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
// For the delete operation, everything happens here and we
// skip the rest of the for-loop, which is designed for
// inserts.
if (FLAGS_num_column_families <= 1) {
batch.Delete(key);
} else {
// We use same rand_num as seed for key and column family so
// that we can deterministically find the cfh corresponding to a
// particular key while reading the key.
batch.Delete(db_with_cfh->GetCfh(rand_num), key);
}
// A delete only includes Key+Timestamp (no value).
batch_bytes += key_size_ + user_timestamp_size_;
bytes += key_size_ + user_timestamp_size_;
num_selective_deletes++;
// Skip rest of the for-loop (j=0, j<entries_per_batch_,j++).
skip_for_loop = true;
} else {
assert(false); // should never reach this point.
}
// If disposable_entries_q needs to be updated (ie: when a selective
// insert+delete was successfully completed, pop the job out of the
// queue).
if (!disposable_entries_q[id].empty() &&
(disposable_entries_q[id].front().first <
FLAGS_env->NowMicros()) &&
persistent_ent_and_del_index[id] == kNumDispAndPersEntries) {
disposable_entries_q[id].pop();
persistent_ent_and_del_index[id] = 0;
}
// If we are deleting disposable entries, skip the rest of the
// for-loop since there is no key-value inserts at this moment in
// time.
if (skip_for_loop) {
continue;
}
}
// If no job is in the queue, then we keep inserting disposable KV
// entries that will be deleted later by a series of deletes.
else {
rand_num = key_gens[id]->Fetch(disposable_entries_index[id]);
disposable_entries_index[id]++;
is_disposable_entry = true;
if ((disposable_entries_index[id] %
FLAGS_disposable_entries_batch_size) == 0) {
// Skip the persistent KV entries inserts for now
disposable_entries_index[id] +=
FLAGS_persistent_entries_batch_size;
}
}
} else {
rand_num = key_gens[id]->Next();
}
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
Slice val;
if (kNumDispAndPersEntries > 0) {
random_value = rnd_disposable_entry.RandomString(
is_disposable_entry ? FLAGS_disposable_entries_value_size
: FLAGS_persistent_entries_value_size);
val = Slice(random_value);
num_unique_keys++;
} else {
val = gen.Generate();
}
if (use_blob_db_) {
#ifndef ROCKSDB_LITE
// Stacked BlobDB
blob_db::BlobDB* blobdb =
static_cast<blob_db::BlobDB*>(db_with_cfh->db);
if (FLAGS_blob_db_max_ttl_range > 0) {
int ttl = rand() % FLAGS_blob_db_max_ttl_range;
s = blobdb->PutWithTTL(write_options_, key, val, ttl);
} else {
s = blobdb->Put(write_options_, key, val);
}
#endif // ROCKSDB_LITE
} else if (FLAGS_num_column_families <= 1) {
batch.Put(key, val);
} else {
// We use same rand_num as seed for key and column family so that we
// can deterministically find the cfh corresponding to a particular
// key while reading the key.
batch.Put(db_with_cfh->GetCfh(rand_num), key,
val);
}
batch_bytes += val.size() + key_size_ + user_timestamp_size_;
bytes += val.size() + key_size_ + user_timestamp_size_;
++num_written;
// If all disposable entries have been inserted, then we need to
// add in the job queue a call for 'persistent entry insertions +
// disposable entry deletions'.
if (kNumDispAndPersEntries > 0 && is_disposable_entry &&
((disposable_entries_index[id] % kNumDispAndPersEntries) == 0)) {
// Queue contains [timestamp, starting_idx],
// timestamp = current_time + delay (minimum aboslute time when to
// start inserting the selective deletes) starting_idx = index in the
// keygen of the rand_num to generate the key of the first KV entry to
// delete (= key of the first selective delete).
disposable_entries_q[id].push(std::make_pair(
FLAGS_env->NowMicros() +
FLAGS_disposable_entries_delete_delay /* timestamp */,
disposable_entries_index[id] - kNumDispAndPersEntries
/*starting idx*/));
}
if (writes_per_range_tombstone_ > 0 &&
num_written > writes_before_delete_range_ &&
(num_written - writes_before_delete_range_) /
writes_per_range_tombstone_ <=
max_num_range_tombstones_ &&
(num_written - writes_before_delete_range_) %
writes_per_range_tombstone_ ==
0) {
int64_t begin_num = key_gens[id]->Next();
if (FLAGS_expand_range_tombstones) {
for (int64_t offset = 0; offset < range_tombstone_width_;
++offset) {
GenerateKeyFromInt(begin_num + offset, FLAGS_num,
&expanded_keys[offset]);
if (use_blob_db_) {
#ifndef ROCKSDB_LITE
// Stacked BlobDB
s = db_with_cfh->db->Delete(write_options_,
expanded_keys[offset]);
#endif // ROCKSDB_LITE
} else if (FLAGS_num_column_families <= 1) {
batch.Delete(expanded_keys[offset]);
} else {
batch.Delete(db_with_cfh->GetCfh(rand_num),
expanded_keys[offset]);
}
}
} else {
GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
&end_key);
if (use_blob_db_) {
#ifndef ROCKSDB_LITE
// Stacked BlobDB
s = db_with_cfh->db->DeleteRange(
write_options_, db_with_cfh->db->DefaultColumnFamily(),
begin_key, end_key);
#endif // ROCKSDB_LITE
} else if (FLAGS_num_column_families <= 1) {
batch.DeleteRange(begin_key, end_key);
} else {
batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key,
end_key);
}
}
}
}
if (thread->shared->write_rate_limiter.get() != nullptr) {
thread->shared->write_rate_limiter->Request(
batch_bytes, Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
// Set time at which last op finished to Now() to hide latency and
// sleep from rate limiter. Also, do the check once per batch, not
// once per write.
thread->stats.ResetLastOpTime();
}
if (user_timestamp_size_ > 0) {
Slice user_ts = mock_app_clock_->Allocate(ts_guard.get());
s = batch.UpdateTimestamps(
user_ts, [this](uint32_t) { return user_timestamp_size_; });
if (!s.ok()) {
fprintf(stderr, "assign timestamp to write batch: %s\n",
s.ToString().c_str());
ErrorExit();
}
}
if (!use_blob_db_) {
// Not stacked BlobDB
s = db_with_cfh->db->Write(write_options_, &batch);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
entries_per_batch_, kWrite);
if (FLAGS_sine_write_rate) {
uint64_t now = FLAGS_env->NowMicros();
uint64_t usecs_since_last;
if (now > thread->stats.GetSineInterval()) {
usecs_since_last = now - thread->stats.GetSineInterval();
} else {
usecs_since_last = 0;
}
if (usecs_since_last >
(FLAGS_sine_write_rate_interval_milliseconds * uint64_t{1000})) {
double usecs_since_start =
static_cast<double>(now - thread->stats.GetStart());
thread->stats.ResetSineInterval();
uint64_t write_rate =
static_cast<uint64_t>(SineRate(usecs_since_start / 1000000.0));
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(write_rate));
}
}
if (!s.ok()) {
s = listener_->WaitForRecovery(600000000) ? Status::OK() : s;
}
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
ErrorExit();
}
}
if ((write_mode == UNIQUE_RANDOM) && (p > 0.0)) {
fprintf(stdout,
"Number of unique keys inserted: %" PRIu64
".\nNumber of overwrites: %" PRIu64 "\n",
num_unique_keys, num_overwrites);
} else if (kNumDispAndPersEntries > 0) {
fprintf(stdout,
"Number of unique keys inserted (disposable+persistent): %" PRIu64
".\nNumber of 'disposable entry delete': %" PRIu64 "\n",
num_written, num_selective_deletes);
}
thread->stats.AddBytes(bytes);
}