in db/version_set.cc [2137:2403]
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback) {
PinnedIteratorsManager pinned_iters_mgr;
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
pinned_iters_mgr.StartPinning();
}
uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
if (vset_ && vset_->block_cache_tracer_ &&
vset_->block_cache_tracer_->is_tracing_enabled()) {
tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
}
// Even though we know the batch size won't be > MAX_BATCH_SIZE,
// use autovector in order to avoid unnecessary construction of GetContext
// objects, which is expensive
autovector<GetContext, 16> get_ctx;
BlobFetcher blob_fetcher(this, read_options);
for (auto iter = range->begin(); iter != range->end(); ++iter) {
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
&(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_,
nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
&iter->is_blob_index, tracing_mget_id, &blob_fetcher);
// MergeInProgress status, if set, has been transferred to the get_context
// state, so we set status to ok here. From now on, the iter status will
// be used for IO errors, and get_context state will be used for any
// key level errors
*(iter->s) = Status::OK();
}
int get_ctx_index = 0;
for (auto iter = range->begin(); iter != range->end();
++iter, get_ctx_index++) {
iter->get_context = &(get_ctx[get_ctx_index]);
}
MultiGetRange file_picker_range(*range, range->begin(), range->end());
FilePickerMultiGet fp(
&file_picker_range,
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
Status s;
uint64_t num_index_read = 0;
uint64_t num_filter_read = 0;
uint64_t num_data_read = 0;
uint64_t num_sst_read = 0;
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
// blob_file => [[blob_idx, it], ...]
std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
int level = -1;
while (f != nullptr) {
MultiGetRange file_range = fp.CurrentFileRange();
bool timer_enabled =
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
// Report MultiGet stats per level.
if (level >= 0 && level != (int)fp.GetHitFileLevel()) {
// Dump the stats if the search has moved to the next level and
// reset for next level.
RecordInHistogram(db_statistics_,
NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
num_index_read + num_filter_read);
RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
num_data_read);
RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
num_filter_read = 0;
num_index_read = 0;
num_data_read = 0;
num_sst_read = 0;
level = fp.GetHitFileLevel();
}
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
s = table_cache_->MultiGet(
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetHitFileLevel());
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
fp.GetHitFileLevel());
}
if (!s.ok()) {
// TODO: Set status for individual keys appropriately
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
*iter->s = s;
file_range.MarkKeyDone(iter);
}
return;
}
uint64_t batch_size = 0;
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
// The Status in the KeyContext takes precedence over GetContext state
// Status may be an error if there were any IO errors in the table
// reader. We never expect Status to be NotFound(), as that is
// determined by get_context
assert(!status->IsNotFound());
if (!status->ok()) {
file_range.MarkKeyDone(iter);
continue;
}
if (get_context.sample()) {
sample_file_read_inc(f->file_metadata);
}
batch_size++;
num_index_read += get_context.get_context_stats_.num_index_read;
num_filter_read += get_context.get_context_stats_.num_filter_read;
num_data_read += get_context.get_context_stats_.num_data_read;
num_sst_read += get_context.get_context_stats_.num_sst_read;
// Reset these stats since they're specific to a level
get_context.get_context_stats_.num_index_read = 0;
get_context.get_context_stats_.num_filter_read = 0;
get_context.get_context_stats_.num_data_read = 0;
get_context.get_context_stats_.num_sst_read = 0;
// report the counters before returning
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge &&
db_statistics_ != nullptr) {
get_context.ReportCounters();
} else {
if (iter->max_covering_tombstone_seq > 0) {
// The remaining files we look at will only contain covered keys, so
// we stop here for this key
file_picker_range.SkipKey(iter);
}
}
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kMerge:
// TODO: update per-level perfcontext user_key_return_count for kMerge
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
file_range.MarkKeyDone(iter);
if (iter->is_blob_index) {
if (iter->value) {
const Slice& blob_index_slice = *(iter->value);
BlobIndex blob_index;
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
if (tmp_s.ok()) {
const uint64_t blob_file_num = blob_index.file_number();
blob_rqs[blob_file_num].emplace_back(
std::make_pair(blob_index, std::cref(*iter)));
} else {
*(iter->s) = tmp_s;
}
}
} else {
file_range.AddValueSize(iter->value->size());
if (file_range.GetValueSize() >
read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
}
continue;
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
file_range.MarkKeyDone(iter);
continue;
case GetContext::kCorrupt:
*status =
Status::Corruption("corrupted key for ", iter->lkey->user_key());
file_range.MarkKeyDone(iter);
continue;
case GetContext::kUnexpectedBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
file_range.MarkKeyDone(iter);
continue;
}
}
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
if (!s.ok() || file_picker_range.empty()) {
break;
}
f = fp.GetNextFile();
}
// Dump stats for most recent level
RecordInHistogram(db_statistics_, NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
num_index_read + num_filter_read);
RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
num_data_read);
RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
if (s.ok() && !blob_rqs.empty()) {
MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
}
// Process any left over keys
for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
Slice user_key = iter->lkey->user_key();
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
range->MarkKeyDone(iter);
continue;
}
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
std::string* str_value =
iter->value != nullptr ? iter->value->GetSelf() : nullptr;
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
str_value, info_log_, db_statistics_, clock_,
nullptr /* result_operand */, true);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
range->MarkKeyDone(iter);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
}
} else {
range->MarkKeyDone(iter);
*status = Status::NotFound(); // Use an empty error message for speed
}
}
for (auto iter = range->begin(); iter != range->end(); ++iter) {
range->MarkKeyDone(iter);
*(iter->s) = s;
}
}