void BlockBasedTable::MultiGet()

in table/block_based/block_based_table_reader.cc [2507:2897]


void BlockBasedTable::MultiGet(const ReadOptions& read_options,
                               const MultiGetRange* mget_range,
                               const SliceTransform* prefix_extractor,
                               bool skip_filters) {
  if (mget_range->empty()) {
    // Caller should ensure non-empty (performance bug)
    assert(false);
    return;  // Nothing to do
  }

  FilterBlockReader* const filter =
      !skip_filters ? rep_->filter.get() : nullptr;
  MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
                               mget_range->end());

  // First check the full filter
  // If full filter not useful, Then go into each block
  const bool no_io = read_options.read_tier == kBlockCacheTier;
  uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
  if (sst_file_range.begin()->get_context) {
    tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
  }
  BlockCacheLookupContext lookup_context{
      TableReaderCaller::kUserMultiGet, tracing_mget_id,
      /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
  FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor,
                         &lookup_context);

  if (!sst_file_range.empty()) {
    IndexBlockIter iiter_on_stack;
    // if prefix_extractor found in block differs from options, disable
    // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
    bool need_upper_bound_check = false;
    if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
      need_upper_bound_check = PrefixExtractorChanged(prefix_extractor);
    }
    auto iiter =
        NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
                         sst_file_range.begin()->get_context, &lookup_context);
    std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
    if (iiter != &iiter_on_stack) {
      iiter_unique_ptr.reset(iiter);
    }

    uint64_t offset = std::numeric_limits<uint64_t>::max();
    autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
    autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
    autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
    char stack_buf[kMultiGetReadStackBufSize];
    std::unique_ptr<char[]> block_buf;
    {
      MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
                                     sst_file_range.end());
      std::vector<Cache::Handle*> cache_handles;
      bool wait_for_cache_results = false;

      CachableEntry<UncompressionDict> uncompression_dict;
      Status uncompression_dict_status;
      uncompression_dict_status.PermitUncheckedError();
      bool uncompression_dict_inited = false;
      size_t total_len = 0;
      ReadOptions ro = read_options;
      ro.read_tier = kBlockCacheTier;

      for (auto miter = data_block_range.begin();
           miter != data_block_range.end(); ++miter) {
        const Slice& key = miter->ikey;
        iiter->Seek(miter->ikey);

        IndexValue v;
        if (iiter->Valid()) {
          v = iiter->value();
        }
        if (!iiter->Valid() ||
            (!v.first_internal_key.empty() && !skip_filters &&
             UserComparatorWrapper(rep_->internal_comparator.user_comparator())
                     .CompareWithoutTimestamp(
                         ExtractUserKey(key),
                         ExtractUserKey(v.first_internal_key)) < 0)) {
          // The requested key falls between highest key in previous block and
          // lowest key in current block.
          if (!iiter->status().IsNotFound()) {
            *(miter->s) = iiter->status();
          }
          data_block_range.SkipKey(miter);
          sst_file_range.SkipKey(miter);
          continue;
        }

        if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
          uncompression_dict_status =
              rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
                  nullptr /* prefetch_buffer */, no_io,
                  read_options.verify_checksums,
                  sst_file_range.begin()->get_context, &lookup_context,
                  &uncompression_dict);
          uncompression_dict_inited = true;
        }

        if (!uncompression_dict_status.ok()) {
          assert(!uncompression_dict_status.IsNotFound());
          *(miter->s) = uncompression_dict_status;
          data_block_range.SkipKey(miter);
          sst_file_range.SkipKey(miter);
          continue;
        }

        statuses.emplace_back();
        results.emplace_back();
        if (v.handle.offset() == offset) {
          // We're going to reuse the block for this key later on. No need to
          // look it up now. Place a null handle
          block_handles.emplace_back(BlockHandle::NullBlockHandle());
          continue;
        }
        // Lookup the cache for the given data block referenced by an index
        // iterator value (i.e BlockHandle). If it exists in the cache,
        // initialize block to the contents of the data block.
        offset = v.handle.offset();
        BlockHandle handle = v.handle;
        BlockCacheLookupContext lookup_data_block_context(
            TableReaderCaller::kUserMultiGet);
        const UncompressionDict& dict = uncompression_dict.GetValue()
                                            ? *uncompression_dict.GetValue()
                                            : UncompressionDict::GetEmptyDict();
        Status s = RetrieveBlock(
            nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
            miter->get_context, &lookup_data_block_context,
            /* for_compaction */ false, /* use_cache */ true,
            /* wait_for_cache */ false);
        if (s.IsIncomplete()) {
          s = Status::OK();
        }
        if (s.ok() && !results.back().IsEmpty()) {
          // Since we have a valid handle, check the value. If its nullptr,
          // it means the cache is waiting for the final result and we're
          // supposed to call WaitAll() to wait for the result.
          if (results.back().GetValue() != nullptr) {
            // Found it in the cache. Add NULL handle to indicate there is
            // nothing to read from disk.
            if (results.back().GetCacheHandle()) {
              results.back().UpdateCachedValue();
            }
            block_handles.emplace_back(BlockHandle::NullBlockHandle());
          } else {
            // We have to wait for the cache lookup to finish in the
            // background, and then we may have to read the block from disk
            // anyway
            assert(results.back().GetCacheHandle());
            wait_for_cache_results = true;
            block_handles.emplace_back(handle);
            cache_handles.emplace_back(results.back().GetCacheHandle());
          }
        } else {
          block_handles.emplace_back(handle);
          total_len += BlockSizeWithTrailer(handle);
        }
      }

      if (wait_for_cache_results) {
        Cache* block_cache = rep_->table_options.block_cache.get();
        block_cache->WaitAll(cache_handles);
        for (size_t i = 0; i < block_handles.size(); ++i) {
          // If this block was a success or failure or not needed because
          // the corresponding key is in the same block as a prior key, skip
          if (block_handles[i] == BlockHandle::NullBlockHandle() ||
              results[i].IsEmpty()) {
            continue;
          }
          results[i].UpdateCachedValue();
          void* val = results[i].GetValue();
          if (!val) {
            // The async cache lookup failed - could be due to an error
            // or a false positive. We need to read the data block from
            // the SST file
            results[i].Reset();
            total_len += BlockSizeWithTrailer(block_handles[i]);
          } else {
            block_handles[i] = BlockHandle::NullBlockHandle();
          }
        }
      }

      if (total_len) {
        char* scratch = nullptr;
        const UncompressionDict& dict = uncompression_dict.GetValue()
                                            ? *uncompression_dict.GetValue()
                                            : UncompressionDict::GetEmptyDict();
        assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
        assert(uncompression_dict_status.ok());
        // If using direct IO, then scratch is not used, so keep it nullptr.
        // If the blocks need to be uncompressed and we don't need the
        // compressed blocks, then we can use a contiguous block of
        // memory to read in all the blocks as it will be temporary
        // storage
        // 1. If blocks are compressed and compressed block cache is there,
        //    alloc heap bufs
        // 2. If blocks are uncompressed, alloc heap bufs
        // 3. If blocks are compressed and no compressed block cache, use
        //    stack buf
        if (!rep_->file->use_direct_io() &&
            rep_->table_options.block_cache_compressed == nullptr &&
            rep_->blocks_maybe_compressed) {
          if (total_len <= kMultiGetReadStackBufSize) {
            scratch = stack_buf;
          } else {
            scratch = new char[total_len];
            block_buf.reset(scratch);
          }
        }
        RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
                               &statuses, &results, scratch, dict);
        if (sst_file_range.begin()->get_context) {
          ++(sst_file_range.begin()
                 ->get_context->get_context_stats_.num_sst_read);
        }
      }
    }

    DataBlockIter first_biter;
    DataBlockIter next_biter;
    size_t idx_in_batch = 0;
    for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
         ++miter) {
      Status s;
      GetContext* get_context = miter->get_context;
      const Slice& key = miter->ikey;
      bool matched = false;  // if such user key matched a key in SST
      bool done = false;
      bool first_block = true;
      do {
        DataBlockIter* biter = nullptr;
        bool reusing_block = true;
        uint64_t referenced_data_size = 0;
        bool does_referenced_key_exist = false;
        BlockCacheLookupContext lookup_data_block_context(
            TableReaderCaller::kUserMultiGet, tracing_mget_id,
            /*get_from_user_specified_snapshot=*/read_options.snapshot !=
                nullptr);
        if (first_block) {
          if (!block_handles[idx_in_batch].IsNull() ||
              !results[idx_in_batch].IsEmpty()) {
            first_biter.Invalidate(Status::OK());
            NewDataBlockIterator<DataBlockIter>(
                read_options, results[idx_in_batch], &first_biter,
                statuses[idx_in_batch]);
            reusing_block = false;
          } else {
            // If handler is null and result is empty, then the status is never
            // set, which should be the initial value: ok().
            assert(statuses[idx_in_batch].ok());
          }
          biter = &first_biter;
          idx_in_batch++;
        } else {
          IndexValue v = iiter->value();
          if (!v.first_internal_key.empty() && !skip_filters &&
              UserComparatorWrapper(rep_->internal_comparator.user_comparator())
                      .CompareWithoutTimestamp(
                          ExtractUserKey(key),
                          ExtractUserKey(v.first_internal_key)) < 0) {
            // The requested key falls between highest key in previous block and
            // lowest key in current block.
            break;
          }

          next_biter.Invalidate(Status::OK());
          NewDataBlockIterator<DataBlockIter>(
              read_options, iiter->value().handle, &next_biter,
              BlockType::kData, get_context, &lookup_data_block_context,
              Status(), nullptr);
          biter = &next_biter;
          reusing_block = false;
        }

        if (read_options.read_tier == kBlockCacheTier &&
            biter->status().IsIncomplete()) {
          // couldn't get block from block_cache
          // Update Saver.state to Found because we are only looking for
          // whether we can guarantee the key is not there when "no_io" is set
          get_context->MarkKeyMayExist();
          break;
        }
        if (!biter->status().ok()) {
          s = biter->status();
          break;
        }

        bool may_exist = biter->SeekForGet(key);
        if (!may_exist) {
          // HashSeek cannot find the key this block and the the iter is not
          // the end of the block, i.e. cannot be in the following blocks
          // either. In this case, the seek_key cannot be found, so we break
          // from the top level for-loop.
          break;
        }

        // Call the *saver function on each entry/block until it returns false
        for (; biter->Valid(); biter->Next()) {
          ParsedInternalKey parsed_key;
          Cleanable dummy;
          Cleanable* value_pinner = nullptr;
          Status pik_status = ParseInternalKey(
              biter->key(), &parsed_key, false /* log_err_key */);  // TODO
          if (!pik_status.ok()) {
            s = pik_status;
          }
          if (biter->IsValuePinned()) {
            if (reusing_block) {
              Cache* block_cache = rep_->table_options.block_cache.get();
              assert(biter->cache_handle() != nullptr);
              block_cache->Ref(biter->cache_handle());
              dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
                                    biter->cache_handle());
              value_pinner = &dummy;
            } else {
              value_pinner = biter;
            }
          }
          if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
                                      value_pinner)) {
            if (get_context->State() == GetContext::GetState::kFound) {
              does_referenced_key_exist = true;
              referenced_data_size =
                  biter->key().size() + biter->value().size();
            }
            done = true;
            break;
          }
          s = biter->status();
        }
        // Write the block cache access.
        if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
          // Avoid making copy of block_key, cf_name, and referenced_key when
          // constructing the access record.
          Slice referenced_key;
          if (does_referenced_key_exist) {
            referenced_key = biter->key();
          } else {
            referenced_key = key;
          }
          BlockCacheTraceRecord access_record(
              rep_->ioptions.clock->NowMicros(),
              /*block_key=*/"", lookup_data_block_context.block_type,
              lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
              /*cf_name=*/"", rep_->level_for_tracing(),
              rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
              lookup_data_block_context.is_cache_hit,
              lookup_data_block_context.no_insert,
              lookup_data_block_context.get_id,
              lookup_data_block_context.get_from_user_specified_snapshot,
              /*referenced_key=*/"", referenced_data_size,
              lookup_data_block_context.num_keys_in_block,
              does_referenced_key_exist);
          // TODO: Should handle status here?
          block_cache_tracer_
              ->WriteBlockAccess(access_record,
                                 lookup_data_block_context.block_key,
                                 rep_->cf_name_for_tracing(), referenced_key)
              .PermitUncheckedError();
        }
        s = biter->status();
        if (done) {
          // Avoid the extra Next which is expensive in two-level indexes
          break;
        }
        if (first_block) {
          iiter->Seek(key);
        }
        first_block = false;
        iiter->Next();
      } while (iiter->Valid());

      if (matched && filter != nullptr && !filter->IsBlockBased()) {
        RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
        PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
                                  rep_->level);
      }
      if (s.ok() && !iiter->status().IsNotFound()) {
        s = iiter->status();
      }
      *(miter->s) = s;
    }
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
    // Not sure why we need to do it. Should investigate more.
    for (auto& st : statuses) {
      st.PermitUncheckedError();
    }
#endif  // ROCKSDB_ASSERT_STATUS_CHECKED
  }
}