void StressTest::OperateDb()

in db_stress_tool/db_stress_test_base.cc [644:981]


void StressTest::OperateDb(ThreadState* thread) {
  ReadOptions read_opts(FLAGS_verify_checksum, true);
  read_opts.rate_limiter_priority =
      FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
  read_opts.async_io = FLAGS_async_io;
  read_opts.adaptive_readahead = FLAGS_adaptive_readahead;
  WriteOptions write_opts;
  if (FLAGS_rate_limit_auto_wal_flush) {
    write_opts.rate_limiter_priority = Env::IO_USER;
  }
  auto shared = thread->shared;
  char value[100];
  std::string from_db;
  if (FLAGS_sync) {
    write_opts.sync = true;
  }
  write_opts.disableWAL = FLAGS_disable_wal;
  const int prefix_bound = static_cast<int>(FLAGS_readpercent) +
                           static_cast<int>(FLAGS_prefixpercent);
  const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent);
  const int del_bound = write_bound + static_cast<int>(FLAGS_delpercent);
  const int delrange_bound =
      del_bound + static_cast<int>(FLAGS_delrangepercent);
  const int iterate_bound =
      delrange_bound + static_cast<int>(FLAGS_iterpercent);

  const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);

#ifndef NDEBUG
  if (FLAGS_read_fault_one_in) {
    fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
                                            FLAGS_read_fault_one_in);
  }
  if (FLAGS_write_fault_one_in) {
    IOStatus error_msg;
    if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) {
      error_msg = IOStatus::IOError("Retryable IO Error");
      error_msg.SetRetryable(true);
    } else if (FLAGS_injest_error_severity == 2) {
      // Ingest the fatal error
      error_msg = IOStatus::IOError("Fatal IO Error");
      error_msg.SetDataLoss(true);
    }
    std::vector<FileType> types = {FileType::kTableFile,
                                   FileType::kDescriptorFile,
                                   FileType::kCurrentFile};
    fault_fs_guard->SetRandomWriteError(
        thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
        /*inject_for_all_file_types=*/false, types);
  }
#endif // NDEBUG
  thread->stats.Start();
  for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
    if (thread->shared->HasVerificationFailedYet() ||
        thread->shared->ShouldStopTest()) {
      break;
    }
    if (open_cnt != 0) {
      thread->stats.FinishedSingleOp();
      MutexLock l(thread->shared->GetMutex());
      while (!thread->snapshot_queue.empty()) {
        db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
        delete thread->snapshot_queue.front().second.key_vec;
        thread->snapshot_queue.pop();
      }
      thread->shared->IncVotedReopen();
      if (thread->shared->AllVotedReopen()) {
        thread->shared->GetStressTest()->Reopen(thread);
        thread->shared->GetCondVar()->SignalAll();
      } else {
        thread->shared->GetCondVar()->Wait();
      }
      // Commenting this out as we don't want to reset stats on each open.
      // thread->stats.Start();
    }

    for (uint64_t i = 0; i < ops_per_open; i++) {
      if (thread->shared->HasVerificationFailedYet()) {
        break;
      }

      // Change Options
      if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
        SetOptions(thread);
      }

      if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
        options_.inplace_update_support ^= options_.inplace_update_support;
      }

      if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
          thread->rand.OneIn(FLAGS_verify_db_one_in)) {
        ContinuouslyVerifyDb(thread);
        if (thread->shared->ShouldStopTest()) {
          break;
        }
      }

      MaybeClearOneColumnFamily(thread);

      if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
        Status s = db_->SyncWAL();
        if (!s.ok() && !s.IsNotSupported()) {
          fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
        }
      }

      int rand_column_family = thread->rand.Next() % FLAGS_column_families;
      ColumnFamilyHandle* column_family = column_families_[rand_column_family];

      if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
        TestCompactFiles(thread, column_family);
      }

      int64_t rand_key = GenerateOneKey(thread, i);
      std::string keystr = Key(rand_key);
      Slice key = keystr;
      std::unique_ptr<MutexLock> lock;
      if (ShouldAcquireMutexOnKey()) {
        lock.reset(new MutexLock(
            shared->GetMutexForKey(rand_column_family, rand_key)));
      }

      if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
        TestCompactRange(thread, rand_key, key, column_family);
        if (thread->shared->HasVerificationFailedYet()) {
          break;
        }
      }

      std::vector<int> rand_column_families =
          GenerateColumnFamilies(FLAGS_column_families, rand_column_family);

      if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
        Status status = TestFlush(rand_column_families);
        if (!status.ok()) {
          fprintf(stdout, "Unable to perform Flush(): %s\n",
                  status.ToString().c_str());
        }
      }

#ifndef ROCKSDB_LITE
      // Verify GetLiveFiles with a 1 in N chance.
      if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
          !FLAGS_write_fault_one_in) {
        Status status = VerifyGetLiveFiles();
        if (!status.ok()) {
          VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
        }
      }

      // Verify GetSortedWalFiles with a 1 in N chance.
      if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
        Status status = VerifyGetSortedWalFiles();
        if (!status.ok()) {
          VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
                            status);
        }
      }

      // Verify GetCurrentWalFile with a 1 in N chance.
      if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
        Status status = VerifyGetCurrentWalFile();
        if (!status.ok()) {
          VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
                            status);
        }
      }
#endif  // !ROCKSDB_LITE

      if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
        Status status = TestPauseBackground(thread);
        if (!status.ok()) {
          VerificationAbort(
              shared, "Pause/ContinueBackgroundWork status not OK", status);
        }
      }

#ifndef ROCKSDB_LITE
      if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
        Status status = db_->VerifyChecksum();
        if (!status.ok()) {
          VerificationAbort(shared, "VerifyChecksum status not OK", status);
        }
      }

      if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
        TestGetProperty(thread);
      }
#endif

      std::vector<int64_t> rand_keys = GenerateKeys(rand_key);

      if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
        TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
      }

      if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
        // Beyond a certain DB size threshold, this test becomes heavier than
        // it's worth.
        uint64_t total_size = 0;
        if (FLAGS_backup_max_size > 0) {
          std::vector<FileAttributes> files;
          db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
          for (auto& file : files) {
            total_size += file.size_bytes;
          }
        }

        if (total_size <= FLAGS_backup_max_size) {
          Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
          if (!s.ok()) {
            VerificationAbort(shared, "Backup/restore gave inconsistent state",
                              s);
          }
        }
      }

      if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
        Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
        if (!s.ok()) {
          VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
        }
      }

#ifndef ROCKSDB_LITE
      if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
        Status s =
            TestApproximateSize(thread, i, rand_column_families, rand_keys);
        if (!s.ok()) {
          VerificationAbort(shared, "ApproximateSize Failed", s);
        }
      }
#endif  // !ROCKSDB_LITE
      if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
        TestAcquireSnapshot(thread, rand_column_family, keystr, i);
      }

      /*always*/ {
        Status s = MaybeReleaseSnapshots(thread, i);
        if (!s.ok()) {
          VerificationAbort(shared, "Snapshot gave inconsistent state", s);
        }
      }

      // Assign timestamps if necessary.
      std::string read_ts_str;
      std::string write_ts_str;
      Slice read_ts;
      Slice write_ts;
      if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
        read_ts_str = GenerateTimestampForRead();
        read_ts = read_ts_str;
        read_opts.timestamp = &read_ts;
        write_ts_str = NowNanosStr();
        write_ts = write_ts_str;
      }

      int prob_op = thread->rand.Uniform(100);
      // Reset this in case we pick something other than a read op. We don't
      // want to use a stale value when deciding at the beginning of the loop
      // whether to vote to reopen
      if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
        assert(0 <= prob_op);
        // OPERATION read
        if (FLAGS_use_multiget) {
          // Leave room for one more iteration of the loop with a single key
          // batch. This is to ensure that each thread does exactly the same
          // number of ops
          int multiget_batch_size = static_cast<int>(
              std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
                       FLAGS_ops_per_thread - i - 1));
          // If its the last iteration, ensure that multiget_batch_size is 1
          multiget_batch_size = std::max(multiget_batch_size, 1);
          rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
          TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
          i += multiget_batch_size - 1;
        } else {
          TestGet(thread, read_opts, rand_column_families, rand_keys);
        }
      } else if (prob_op < prefix_bound) {
        assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
        // OPERATION prefix scan
        // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
        // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
        // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
        // prefix
        TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
      } else if (prob_op < write_bound) {
        assert(prefix_bound <= prob_op);
        // OPERATION write
        TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
                value, lock);
      } else if (prob_op < del_bound) {
        assert(write_bound <= prob_op);
        // OPERATION delete
        TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
      } else if (prob_op < delrange_bound) {
        assert(del_bound <= prob_op);
        // OPERATION delete range
        TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
                        lock);
      } else if (prob_op < iterate_bound) {
        assert(delrange_bound <= prob_op);
        // OPERATION iterate
        int num_seeks = static_cast<int>(
            std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
                     FLAGS_ops_per_thread - i - 1));
        rand_keys = GenerateNKeys(thread, num_seeks, i);
        i += num_seeks - 1;
        TestIterate(thread, read_opts, rand_column_families, rand_keys);
      } else {
        assert(iterate_bound <= prob_op);
        TestCustomOperations(thread, rand_column_families);
      }
      thread->stats.FinishedSingleOp();
#ifndef ROCKSDB_LITE
      uint32_t tid = thread->tid;
      assert(secondaries_.empty() ||
             static_cast<size_t>(tid) < secondaries_.size());
      if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
        Status s = secondaries_[tid]->TryCatchUpWithPrimary();
        if (!s.ok()) {
          VerificationAbort(shared, "Secondary instance failed to catch up", s);
          break;
        }
      }
#endif
    }
  }
  while (!thread->snapshot_queue.empty()) {
    db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
    delete thread->snapshot_queue.front().second.key_vec;
    thread->snapshot_queue.pop();
  }

  thread->stats.Stop();
}