void Version::MultiGet()

in db/version_set.cc [2137:2403]


void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
                       ReadCallback* callback) {
  PinnedIteratorsManager pinned_iters_mgr;

  // Pin blocks that we read to hold merge operands
  if (merge_operator_) {
    pinned_iters_mgr.StartPinning();
  }
  uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;

  if (vset_ && vset_->block_cache_tracer_ &&
      vset_->block_cache_tracer_->is_tracing_enabled()) {
    tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
  }
  // Even though we know the batch size won't be > MAX_BATCH_SIZE,
  // use autovector in order to avoid unnecessary construction of GetContext
  // objects, which is expensive
  autovector<GetContext, 16> get_ctx;
  BlobFetcher blob_fetcher(this, read_options);
  for (auto iter = range->begin(); iter != range->end(); ++iter) {
    assert(iter->s->ok() || iter->s->IsMergeInProgress());
    get_ctx.emplace_back(
        user_comparator(), merge_operator_, info_log_, db_statistics_,
        iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
        iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
        &(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_,
        nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
        &iter->is_blob_index, tracing_mget_id, &blob_fetcher);
    // MergeInProgress status, if set, has been transferred to the get_context
    // state, so we set status to ok here. From now on, the iter status will
    // be used for IO errors, and get_context state will be used for any
    // key level errors
    *(iter->s) = Status::OK();
  }
  int get_ctx_index = 0;
  for (auto iter = range->begin(); iter != range->end();
       ++iter, get_ctx_index++) {
    iter->get_context = &(get_ctx[get_ctx_index]);
  }

  MultiGetRange file_picker_range(*range, range->begin(), range->end());
  FilePickerMultiGet fp(
      &file_picker_range,
      &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
      &storage_info_.file_indexer_, user_comparator(), internal_comparator());
  FdWithKeyRange* f = fp.GetNextFile();
  Status s;
  uint64_t num_index_read = 0;
  uint64_t num_filter_read = 0;
  uint64_t num_data_read = 0;
  uint64_t num_sst_read = 0;

  MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
  // blob_file => [[blob_idx, it], ...]
  std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
  int level = -1;

  while (f != nullptr) {
    MultiGetRange file_range = fp.CurrentFileRange();
    bool timer_enabled =
        GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
        get_perf_context()->per_level_perf_context_enabled;

    // Report MultiGet stats per level.
    if (level >= 0 && level != (int)fp.GetHitFileLevel()) {
      // Dump the stats if the search has moved to the next level and
      // reset for next level.
      RecordInHistogram(db_statistics_,
                        NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
                        num_index_read + num_filter_read);
      RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
                        num_data_read);
      RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
      num_filter_read = 0;
      num_index_read = 0;
      num_data_read = 0;
      num_sst_read = 0;
      level = fp.GetHitFileLevel();
    }

    StopWatchNano timer(clock_, timer_enabled /* auto_start */);
    s = table_cache_->MultiGet(
        read_options, *internal_comparator(), *f->file_metadata, &file_range,
        mutable_cf_options_.prefix_extractor,
        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
                        fp.IsHitFileLastInLevel()),
        fp.GetHitFileLevel());
    // TODO: examine the behavior for corrupted key
    if (timer_enabled) {
      PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
                                fp.GetHitFileLevel());
    }
    if (!s.ok()) {
      // TODO: Set status for individual keys appropriately
      for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
        *iter->s = s;
        file_range.MarkKeyDone(iter);
      }
      return;
    }
    uint64_t batch_size = 0;
    for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
         ++iter) {
      GetContext& get_context = *iter->get_context;
      Status* status = iter->s;
      // The Status in the KeyContext takes precedence over GetContext state
      // Status may be an error if there were any IO errors in the table
      // reader. We never expect Status to be NotFound(), as that is
      // determined by get_context
      assert(!status->IsNotFound());
      if (!status->ok()) {
        file_range.MarkKeyDone(iter);
        continue;
      }

      if (get_context.sample()) {
        sample_file_read_inc(f->file_metadata);
      }
      batch_size++;
      num_index_read += get_context.get_context_stats_.num_index_read;
      num_filter_read += get_context.get_context_stats_.num_filter_read;
      num_data_read += get_context.get_context_stats_.num_data_read;
      num_sst_read += get_context.get_context_stats_.num_sst_read;
      // Reset these stats since they're specific to a level
      get_context.get_context_stats_.num_index_read = 0;
      get_context.get_context_stats_.num_filter_read = 0;
      get_context.get_context_stats_.num_data_read = 0;
      get_context.get_context_stats_.num_sst_read = 0;

      // report the counters before returning
      if (get_context.State() != GetContext::kNotFound &&
          get_context.State() != GetContext::kMerge &&
          db_statistics_ != nullptr) {
        get_context.ReportCounters();
      } else {
        if (iter->max_covering_tombstone_seq > 0) {
          // The remaining files we look at will only contain covered keys, so
          // we stop here for this key
          file_picker_range.SkipKey(iter);
        }
      }
      switch (get_context.State()) {
        case GetContext::kNotFound:
          // Keep searching in other files
          break;
        case GetContext::kMerge:
          // TODO: update per-level perfcontext user_key_return_count for kMerge
          break;
        case GetContext::kFound:
          if (fp.GetHitFileLevel() == 0) {
            RecordTick(db_statistics_, GET_HIT_L0);
          } else if (fp.GetHitFileLevel() == 1) {
            RecordTick(db_statistics_, GET_HIT_L1);
          } else if (fp.GetHitFileLevel() >= 2) {
            RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
          }

          PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
                                    fp.GetHitFileLevel());

          file_range.MarkKeyDone(iter);

          if (iter->is_blob_index) {
            if (iter->value) {
              const Slice& blob_index_slice = *(iter->value);
              BlobIndex blob_index;
              Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
              if (tmp_s.ok()) {
                const uint64_t blob_file_num = blob_index.file_number();
                blob_rqs[blob_file_num].emplace_back(
                    std::make_pair(blob_index, std::cref(*iter)));
              } else {
                *(iter->s) = tmp_s;
              }
            }
          } else {
            file_range.AddValueSize(iter->value->size());
            if (file_range.GetValueSize() >
                read_options.value_size_soft_limit) {
              s = Status::Aborted();
              break;
            }
          }
          continue;
        case GetContext::kDeleted:
          // Use empty error message for speed
          *status = Status::NotFound();
          file_range.MarkKeyDone(iter);
          continue;
        case GetContext::kCorrupt:
          *status =
              Status::Corruption("corrupted key for ", iter->lkey->user_key());
          file_range.MarkKeyDone(iter);
          continue;
        case GetContext::kUnexpectedBlobIndex:
          ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
          *status = Status::NotSupported(
              "Encounter unexpected blob index. Please open DB with "
              "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
          file_range.MarkKeyDone(iter);
          continue;
      }
    }

    RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
    if (!s.ok() || file_picker_range.empty()) {
      break;
    }
    f = fp.GetNextFile();
  }

  // Dump stats for most recent level
  RecordInHistogram(db_statistics_, NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
                    num_index_read + num_filter_read);
  RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
                    num_data_read);
  RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);

  if (s.ok() && !blob_rqs.empty()) {
    MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
  }

  // Process any left over keys
  for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
    GetContext& get_context = *iter->get_context;
    Status* status = iter->s;
    Slice user_key = iter->lkey->user_key();

    if (db_statistics_ != nullptr) {
      get_context.ReportCounters();
    }
    if (GetContext::kMerge == get_context.State()) {
      if (!merge_operator_) {
        *status = Status::InvalidArgument(
            "merge_operator is not properly initialized.");
        range->MarkKeyDone(iter);
        continue;
      }
      // merge_operands are in saver and we hit the beginning of the key history
      // do a final merge of nullptr and operands;
      std::string* str_value =
          iter->value != nullptr ? iter->value->GetSelf() : nullptr;
      *status = MergeHelper::TimedFullMerge(
          merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
          str_value, info_log_, db_statistics_, clock_,
          nullptr /* result_operand */, true);
      if (LIKELY(iter->value != nullptr)) {
        iter->value->PinSelf();
        range->AddValueSize(iter->value->size());
        range->MarkKeyDone(iter);
        if (range->GetValueSize() > read_options.value_size_soft_limit) {
          s = Status::Aborted();
          break;
        }
      }
    } else {
      range->MarkKeyDone(iter);
      *status = Status::NotFound();  // Use an empty error message for speed
    }
  }

  for (auto iter = range->begin(); iter != range->end(); ++iter) {
    range->MarkKeyDone(iter);
    *(iter->s) = s;
  }
}