void DoWrite()

in tools/db_bench_tool.cc [4662:5098]


  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
    const int64_t num_ops = writes_ == 0 ? num_ : writes_;

    size_t num_key_gens = 1;
    if (db_.db == nullptr) {
      num_key_gens = multi_dbs_.size();
    }
    std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
    int64_t max_ops = num_ops * num_key_gens;
    int64_t ops_per_stage = max_ops;
    if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
      ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
                                       FLAGS_num_hot_column_families) +
                      1;
    }

    Duration duration(test_duration, max_ops, ops_per_stage);
    const uint64_t num_per_key_gen = num_ + max_num_range_tombstones_;
    for (size_t i = 0; i < num_key_gens; i++) {
      key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode,
                                         num_per_key_gen, ops_per_stage));
    }

    if (num_ != FLAGS_num) {
      char msg[100];
      snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
      thread->stats.AddMessage(msg);
    }

    RandomGenerator gen;
    WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0,
                     /*protection_bytes_per_key=*/0, user_timestamp_size_);
    Status s;
    int64_t bytes = 0;

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    std::unique_ptr<const char[]> begin_key_guard;
    Slice begin_key = AllocateKey(&begin_key_guard);
    std::unique_ptr<const char[]> end_key_guard;
    Slice end_key = AllocateKey(&end_key_guard);
    double p = 0.0;
    uint64_t num_overwrites = 0, num_unique_keys = 0, num_selective_deletes = 0;
    // If user set overwrite_probability flag,
    // check if value is in [0.0,1.0].
    if (FLAGS_overwrite_probability > 0.0) {
      p = FLAGS_overwrite_probability > 1.0 ? 1.0 : FLAGS_overwrite_probability;
      // If overwrite set by user, and UNIQUE_RANDOM mode on,
      // the overwrite_window_size must be > 0.
      if (write_mode == UNIQUE_RANDOM && FLAGS_overwrite_window_size == 0) {
        fprintf(stderr,
                "Overwrite_window_size must be  strictly greater than 0.\n");
        ErrorExit();
      }
    }

    // Default_random_engine provides slightly
    // improved throughput over mt19937.
    std::default_random_engine overwrite_gen{
        static_cast<unsigned int>(seed_base)};
    std::bernoulli_distribution overwrite_decider(p);

    // Inserted key window is filled with the last N
    // keys previously inserted into the DB (with
    // N=FLAGS_overwrite_window_size).
    // We use a deque struct because:
    // - random access is O(1)
    // - insertion/removal at beginning/end is also O(1).
    std::deque<int64_t> inserted_key_window;
    Random64 reservoir_id_gen(seed_base);

    // --- Variables used in disposable/persistent keys simulation:
    // The following variables are used when
    // disposable_entries_batch_size is >0. We simualte a workload
    // where the following sequence is repeated multiple times:
    // "A set of keys S1 is inserted ('disposable entries'), then after
    // some delay another set of keys S2 is inserted ('persistent entries')
    // and the first set of keys S1 is deleted. S2 artificially represents
    // the insertion of hypothetical results from some undefined computation
    // done on the first set of keys S1. The next sequence can start as soon
    // as the last disposable entry in the set S1 of this sequence is
    // inserted, if the delay is non negligible"
    bool skip_for_loop = false, is_disposable_entry = true;
    std::vector<uint64_t> disposable_entries_index(num_key_gens, 0);
    std::vector<uint64_t> persistent_ent_and_del_index(num_key_gens, 0);
    const uint64_t kNumDispAndPersEntries =
        FLAGS_disposable_entries_batch_size +
        FLAGS_persistent_entries_batch_size;
    if (kNumDispAndPersEntries > 0) {
      if ((write_mode != UNIQUE_RANDOM) || (writes_per_range_tombstone_ > 0) ||
          (p > 0.0)) {
        fprintf(
            stderr,
            "Disposable/persistent deletes are not compatible with overwrites "
            "and DeleteRanges; and are only supported in filluniquerandom.\n");
        ErrorExit();
      }
      if (FLAGS_disposable_entries_value_size < 0 ||
          FLAGS_persistent_entries_value_size < 0) {
        fprintf(
            stderr,
            "disposable_entries_value_size and persistent_entries_value_size"
            "have to be positive.\n");
        ErrorExit();
      }
    }
    Random rnd_disposable_entry(static_cast<uint32_t>(seed_base));
    std::string random_value;
    // Queue that stores scheduled timestamp of disposable entries deletes,
    // along with starting index of disposable entry keys to delete.
    std::vector<std::queue<std::pair<uint64_t, uint64_t>>> disposable_entries_q(
        num_key_gens);
    // --- End of variables used in disposable/persistent keys simulation.

    std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
    std::vector<Slice> expanded_keys;
    if (FLAGS_expand_range_tombstones) {
      expanded_key_guards.resize(range_tombstone_width_);
      for (auto& expanded_key_guard : expanded_key_guards) {
        expanded_keys.emplace_back(AllocateKey(&expanded_key_guard));
      }
    }

    std::unique_ptr<char[]> ts_guard;
    if (user_timestamp_size_ > 0) {
      ts_guard.reset(new char[user_timestamp_size_]);
    }

    int64_t stage = 0;
    int64_t num_written = 0;
    int64_t next_seq_db_at = num_ops;
    size_t id = 0;

    while ((num_per_key_gen != 0) && !duration.Done(entries_per_batch_)) {
      if (duration.GetStage() != stage) {
        stage = duration.GetStage();
        if (db_.db != nullptr) {
          db_.CreateNewCf(open_options_, stage);
        } else {
          for (auto& db : multi_dbs_) {
            db.CreateNewCf(open_options_, stage);
          }
        }
      }

      if (write_mode != SEQUENTIAL) {
        id = thread->rand.Next() % num_key_gens;
      } else {
        // When doing a sequential load with multiple databases, load them in
        // order rather than all at the same time to avoid:
        // 1) long delays between flushing memtables
        // 2) flushing memtables for all of them at the same point in time
        // 3) not putting the same number of keys in each database
        if (num_written >= next_seq_db_at) {
          next_seq_db_at += num_ops;
          id++;
          if (id >= num_key_gens) {
            fprintf(stderr, "Logic error. Filled all databases\n");
            ErrorExit();
          }
        }
      }
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);

      batch.Clear();
      int64_t batch_bytes = 0;

      for (int64_t j = 0; j < entries_per_batch_; j++) {
        int64_t rand_num = 0;
        if ((write_mode == UNIQUE_RANDOM) && (p > 0.0)) {
          if ((inserted_key_window.size() > 0) &&
              overwrite_decider(overwrite_gen)) {
            num_overwrites++;
            rand_num = inserted_key_window[reservoir_id_gen.Next() %
                                           inserted_key_window.size()];
          } else {
            num_unique_keys++;
            rand_num = key_gens[id]->Next();
            if (inserted_key_window.size() < FLAGS_overwrite_window_size) {
              inserted_key_window.push_back(rand_num);
            } else {
              inserted_key_window.pop_front();
              inserted_key_window.push_back(rand_num);
            }
          }
        } else if (kNumDispAndPersEntries > 0) {
          // Check if queue is non-empty and if we need to insert
          // 'persistent' KV entries (KV entries that are never deleted)
          // and delete disposable entries previously inserted.
          if (!disposable_entries_q[id].empty() &&
              (disposable_entries_q[id].front().first <
               FLAGS_env->NowMicros())) {
            // If we need to perform a "merge op" pattern,
            // we first write all the persistent KV entries not targeted
            // by deletes, and then we write the disposable entries deletes.
            if (persistent_ent_and_del_index[id] <
                FLAGS_persistent_entries_batch_size) {
              // Generate key to insert.
              rand_num =
                  key_gens[id]->Fetch(disposable_entries_q[id].front().second +
                                      FLAGS_disposable_entries_batch_size +
                                      persistent_ent_and_del_index[id]);
              persistent_ent_and_del_index[id]++;
              is_disposable_entry = false;
              skip_for_loop = false;
            } else if (persistent_ent_and_del_index[id] <
                       kNumDispAndPersEntries) {
              // Find key of the entry to delete.
              rand_num =
                  key_gens[id]->Fetch(disposable_entries_q[id].front().second +
                                      (persistent_ent_and_del_index[id] -
                                       FLAGS_persistent_entries_batch_size));
              persistent_ent_and_del_index[id]++;
              GenerateKeyFromInt(rand_num, FLAGS_num, &key);
              // For the delete operation, everything happens here and we
              // skip the rest of the for-loop, which is designed for
              // inserts.
              if (FLAGS_num_column_families <= 1) {
                batch.Delete(key);
              } else {
                // We use same rand_num as seed for key and column family so
                // that we can deterministically find the cfh corresponding to a
                // particular key while reading the key.
                batch.Delete(db_with_cfh->GetCfh(rand_num), key);
              }
              // A delete only includes Key+Timestamp (no value).
              batch_bytes += key_size_ + user_timestamp_size_;
              bytes += key_size_ + user_timestamp_size_;
              num_selective_deletes++;
              // Skip rest of the for-loop (j=0, j<entries_per_batch_,j++).
              skip_for_loop = true;
            } else {
              assert(false);  // should never reach this point.
            }
            // If disposable_entries_q needs to be updated (ie: when a selective
            // insert+delete was successfully completed, pop the job out of the
            // queue).
            if (!disposable_entries_q[id].empty() &&
                (disposable_entries_q[id].front().first <
                 FLAGS_env->NowMicros()) &&
                persistent_ent_and_del_index[id] == kNumDispAndPersEntries) {
              disposable_entries_q[id].pop();
              persistent_ent_and_del_index[id] = 0;
            }

            // If we are deleting disposable entries, skip the rest of the
            // for-loop since there is no key-value inserts at this moment in
            // time.
            if (skip_for_loop) {
              continue;
            }

          }
          // If no job is in the queue, then we keep inserting disposable KV
          // entries that will be deleted later by a series of deletes.
          else {
            rand_num = key_gens[id]->Fetch(disposable_entries_index[id]);
            disposable_entries_index[id]++;
            is_disposable_entry = true;
            if ((disposable_entries_index[id] %
                 FLAGS_disposable_entries_batch_size) == 0) {
              // Skip the persistent KV entries inserts for now
              disposable_entries_index[id] +=
                  FLAGS_persistent_entries_batch_size;
            }
          }
        } else {
          rand_num = key_gens[id]->Next();
        }
        GenerateKeyFromInt(rand_num, FLAGS_num, &key);
        Slice val;
        if (kNumDispAndPersEntries > 0) {
          random_value = rnd_disposable_entry.RandomString(
              is_disposable_entry ? FLAGS_disposable_entries_value_size
                                  : FLAGS_persistent_entries_value_size);
          val = Slice(random_value);
          num_unique_keys++;
        } else {
          val = gen.Generate();
        }
        if (use_blob_db_) {
#ifndef ROCKSDB_LITE
          // Stacked BlobDB
          blob_db::BlobDB* blobdb =
              static_cast<blob_db::BlobDB*>(db_with_cfh->db);
          if (FLAGS_blob_db_max_ttl_range > 0) {
            int ttl = rand() % FLAGS_blob_db_max_ttl_range;
            s = blobdb->PutWithTTL(write_options_, key, val, ttl);
          } else {
            s = blobdb->Put(write_options_, key, val);
          }
#endif  //  ROCKSDB_LITE
        } else if (FLAGS_num_column_families <= 1) {
          batch.Put(key, val);
        } else {
          // We use same rand_num as seed for key and column family so that we
          // can deterministically find the cfh corresponding to a particular
          // key while reading the key.
          batch.Put(db_with_cfh->GetCfh(rand_num), key,
                    val);
        }
        batch_bytes += val.size() + key_size_ + user_timestamp_size_;
        bytes += val.size() + key_size_ + user_timestamp_size_;
        ++num_written;

        // If all disposable entries have been inserted, then we need to
        // add in the job queue a call for 'persistent entry insertions +
        // disposable entry deletions'.
        if (kNumDispAndPersEntries > 0 && is_disposable_entry &&
            ((disposable_entries_index[id] % kNumDispAndPersEntries) == 0)) {
          // Queue contains [timestamp, starting_idx],
          // timestamp = current_time + delay (minimum aboslute time when to
          // start inserting the selective deletes) starting_idx = index in the
          // keygen of the rand_num to generate the key of the first KV entry to
          // delete (= key of the first selective delete).
          disposable_entries_q[id].push(std::make_pair(
              FLAGS_env->NowMicros() +
                  FLAGS_disposable_entries_delete_delay /* timestamp */,
              disposable_entries_index[id] - kNumDispAndPersEntries
              /*starting idx*/));
        }
        if (writes_per_range_tombstone_ > 0 &&
            num_written > writes_before_delete_range_ &&
            (num_written - writes_before_delete_range_) /
                    writes_per_range_tombstone_ <=
                max_num_range_tombstones_ &&
            (num_written - writes_before_delete_range_) %
                    writes_per_range_tombstone_ ==
                0) {
          int64_t begin_num = key_gens[id]->Next();
          if (FLAGS_expand_range_tombstones) {
            for (int64_t offset = 0; offset < range_tombstone_width_;
                 ++offset) {
              GenerateKeyFromInt(begin_num + offset, FLAGS_num,
                                 &expanded_keys[offset]);
              if (use_blob_db_) {
#ifndef ROCKSDB_LITE
                // Stacked BlobDB
                s = db_with_cfh->db->Delete(write_options_,
                                            expanded_keys[offset]);
#endif  //  ROCKSDB_LITE
              } else if (FLAGS_num_column_families <= 1) {
                batch.Delete(expanded_keys[offset]);
              } else {
                batch.Delete(db_with_cfh->GetCfh(rand_num),
                             expanded_keys[offset]);
              }
            }
          } else {
            GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
            GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
                               &end_key);
            if (use_blob_db_) {
#ifndef ROCKSDB_LITE
              // Stacked BlobDB
              s = db_with_cfh->db->DeleteRange(
                  write_options_, db_with_cfh->db->DefaultColumnFamily(),
                  begin_key, end_key);
#endif  //  ROCKSDB_LITE
            } else if (FLAGS_num_column_families <= 1) {
              batch.DeleteRange(begin_key, end_key);
            } else {
              batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key,
                                end_key);
            }
          }
        }
      }
      if (thread->shared->write_rate_limiter.get() != nullptr) {
        thread->shared->write_rate_limiter->Request(
            batch_bytes, Env::IO_HIGH,
            nullptr /* stats */, RateLimiter::OpType::kWrite);
        // Set time at which last op finished to Now() to hide latency and
        // sleep from rate limiter. Also, do the check once per batch, not
        // once per write.
        thread->stats.ResetLastOpTime();
      }
      if (user_timestamp_size_ > 0) {
        Slice user_ts = mock_app_clock_->Allocate(ts_guard.get());
        s = batch.UpdateTimestamps(
            user_ts, [this](uint32_t) { return user_timestamp_size_; });
        if (!s.ok()) {
          fprintf(stderr, "assign timestamp to write batch: %s\n",
                  s.ToString().c_str());
          ErrorExit();
        }
      }
      if (!use_blob_db_) {
        // Not stacked BlobDB
        s = db_with_cfh->db->Write(write_options_, &batch);
      }
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
                                entries_per_batch_, kWrite);
      if (FLAGS_sine_write_rate) {
        uint64_t now = FLAGS_env->NowMicros();

        uint64_t usecs_since_last;
        if (now > thread->stats.GetSineInterval()) {
          usecs_since_last = now - thread->stats.GetSineInterval();
        } else {
          usecs_since_last = 0;
        }

        if (usecs_since_last >
            (FLAGS_sine_write_rate_interval_milliseconds * uint64_t{1000})) {
          double usecs_since_start =
                  static_cast<double>(now - thread->stats.GetStart());
          thread->stats.ResetSineInterval();
          uint64_t write_rate =
                  static_cast<uint64_t>(SineRate(usecs_since_start / 1000000.0));
          thread->shared->write_rate_limiter.reset(
                  NewGenericRateLimiter(write_rate));
        }
      }
      if (!s.ok()) {
        s = listener_->WaitForRecovery(600000000) ? Status::OK() : s;
      }

      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        ErrorExit();
      }
    }
    if ((write_mode == UNIQUE_RANDOM) && (p > 0.0)) {
      fprintf(stdout,
              "Number of unique keys inserted: %" PRIu64
              ".\nNumber of overwrites: %" PRIu64 "\n",
              num_unique_keys, num_overwrites);
    } else if (kNumDispAndPersEntries > 0) {
      fprintf(stdout,
              "Number of unique keys inserted (disposable+persistent): %" PRIu64
              ".\nNumber of 'disposable entry delete': %" PRIu64 "\n",
              num_written, num_selective_deletes);
    }
    thread->stats.AddBytes(bytes);
  }