in db_stress_tool/no_batched_ops_stress.cc [242:446]
std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
key_str.reserve(num_keys);
keys.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
int error_count = 0;
// Do a consistency check between Get and MultiGet. Don't do it too
// often as it will slow db_stress down
bool do_consistency_check = thread->rand.OneIn(4);
ReadOptions readoptionscopy = read_opts;
if (do_consistency_check) {
readoptionscopy.snapshot = db_->GetSnapshot();
}
// To appease clang analyzer
const bool use_txn = FLAGS_use_txn;
// Create a transaction in order to write some data. The purpose is to
// exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
// will be rolled back once MultiGet returns.
#ifndef ROCKSDB_LITE
Transaction* txn = nullptr;
if (use_txn) {
WriteOptions wo;
if (FLAGS_rate_limit_auto_wal_flush) {
wo.rate_limiter_priority = Env::IO_USER;
}
Status s = NewTxn(wo, &txn);
if (!s.ok()) {
fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
std::terminate();
}
}
#endif
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
#ifndef ROCKSDB_LITE
if (use_txn) {
// With a 1 in 10 probability, insert the just added key in the batch
// into the transaction. This will create an overlap with the MultiGet
// keys and exercise some corner cases in the code
if (thread->rand.OneIn(10)) {
int op = thread->rand.Uniform(2);
Status s;
switch (op) {
case 0:
case 1: {
uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
char value[100];
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
if (op == 0) {
s = txn->Put(cfh, keys.back(), v);
} else {
s = txn->Merge(cfh, keys.back(), v);
}
break;
}
case 2:
s = txn->Delete(cfh, keys.back());
break;
default:
assert(false);
}
if (!s.ok()) {
fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
std::terminate();
}
}
}
#endif
}
if (!use_txn) {
#ifndef NDEBUG
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false;
}
#endif // NDEBUG
db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
statuses.data());
#ifndef NDEBUG
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
#endif // NDEBUG
} else {
#ifndef ROCKSDB_LITE
txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
statuses.data());
#endif
}
#ifndef NDEBUG
if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
int stat_nok = 0;
for (const auto& s : statuses) {
if (!s.ok() && !s.IsNotFound()) {
stat_nok++;
}
}
if (stat_nok < error_count) {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "Didn't get expected error from MultiGet. \n");
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
error_count, stat_nok);
fprintf(stderr, "Callstack that injected the fault\n");
fault_fs_guard->PrintFaultBacktrace();
std::terminate();
}
}
if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection();
}
#endif // NDEBUG
for (size_t i = 0; i < statuses.size(); ++i) {
Status s = statuses[i];
bool is_consistent = true;
// Only do the consistency check if no error was injected and MultiGet
// didn't return an unexpected error
if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
Status tmp_s;
std::string value;
if (use_txn) {
#ifndef ROCKSDB_LITE
tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
#endif // ROCKSDB_LITE
} else {
tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
}
if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
is_consistent = false;
} else if (!s.ok() && tmp_s.ok()) {
fprintf(stderr, "MultiGet returned different results with key %s\n",
keys[i].ToString(true).c_str());
fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
is_consistent = false;
} else if (s.ok() && tmp_s.IsNotFound()) {
fprintf(stderr, "MultiGet returned different results with key %s\n",
keys[i].ToString(true).c_str());
fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
is_consistent = false;
} else if (s.ok() && value != values[i].ToString()) {
fprintf(stderr, "MultiGet returned different results with key %s\n",
keys[i].ToString(true).c_str());
fprintf(stderr, "MultiGet returned value %s\n",
values[i].ToString(true).c_str());
fprintf(stderr, "Get returned value %s\n", value.c_str());
is_consistent = false;
}
}
if (!is_consistent) {
fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
thread->stats.AddErrors(1);
// Fail fast to preserve the DB state
thread->shared->SetVerificationFailure();
break;
} else if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else if (s.IsMergeInProgress() && use_txn) {
// With txn this is sometimes expected.
thread->stats.AddGets(1, 1);
} else {
if (error_count == 0) {
// errors case
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddVerifiedErrors(1);
}
}
}
if (readoptionscopy.snapshot) {
db_->ReleaseSnapshot(readoptionscopy.snapshot);
}
if (use_txn) {
#ifndef ROCKSDB_LITE
RollbackTxn(txn);
#endif
}
return statuses;
}