in db/version_set.cc [1965:2135]
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, std::string* timestamp, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
PinnedIteratorsManager* pinned_iters_mgr, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob, bool do_merge) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
assert(status->ok() || status->IsMergeInProgress());
if (key_exists != nullptr) {
// will falsify below if not found
*key_exists = true;
}
uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
if (vset_ && vset_->block_cache_tracer_ &&
vset_->block_cache_tracer_->is_tracing_enabled()) {
tracing_get_id = vset_->block_cache_tracer_->NextGetId();
}
// Note: the old StackableDB-based BlobDB passes in
// GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
// need to provide it here.
bool is_blob_index = false;
bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
BlobFetcher blob_fetcher(this, read_options);
assert(pinned_iters_mgr);
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
merge_context, do_merge, max_covering_tombstone_seq, clock_, seq,
merge_operator_ ? pinned_iters_mgr : nullptr, callback, is_blob_to_use,
tracing_get_id, &blob_fetcher);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
pinned_iters_mgr->StartPinning();
}
FilePicker fp(user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(),
internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
if (*max_covering_tombstone_seq > 0) {
// The remaining files we look at will only contain covered keys, so we
// stop here.
break;
}
if (get_context.sample()) {
sample_file_read_inc(f->file_metadata);
}
bool timer_enabled =
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
fp.GetHitFileLevel());
}
if (!status->ok()) {
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
return;
}
// report the counters before returning
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge &&
db_statistics_ != nullptr) {
get_context.ReportCounters();
}
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kMerge:
// TODO: update per-level perfcontext user_key_return_count for kMerge
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
if (is_blob_index) {
if (do_merge && value) {
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;
*status = GetBlob(read_options, user_key, *value, prefetch_buffer,
value, bytes_read);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();
}
return;
}
}
}
return;
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
return;
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case GetContext::kUnexpectedBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
return;
}
f = fp.GetNextFile();
}
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
if (!do_merge) {
*status = Status::OK();
return;
}
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
return;
}
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, info_log_, db_statistics_, clock_,
nullptr /* result_operand */, true);
if (LIKELY(value != nullptr)) {
value->PinSelf();
}
} else {
if (key_exists != nullptr) {
*key_exists = false;
}
*status = Status::NotFound(); // Use an empty error message for speed
}
}