void BlockBasedTable::RetrieveMultipleBlocks()

in table/block_based/block_based_table_reader.cc [1652:1924]


void BlockBasedTable::RetrieveMultipleBlocks(
    const ReadOptions& options, const MultiGetRange* batch,
    const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
    autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
    autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
    char* scratch, const UncompressionDict& uncompression_dict) const {
  RandomAccessFileReader* file = rep_->file.get();
  const Footer& footer = rep_->footer;
  const ImmutableOptions& ioptions = rep_->ioptions;
  size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
  MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);

  if (ioptions.allow_mmap_reads) {
    size_t idx_in_batch = 0;
    for (auto mget_iter = batch->begin(); mget_iter != batch->end();
         ++mget_iter, ++idx_in_batch) {
      BlockCacheLookupContext lookup_data_block_context(
          TableReaderCaller::kUserMultiGet);
      const BlockHandle& handle = (*handles)[idx_in_batch];
      if (handle.IsNull()) {
        continue;
      }

      (*statuses)[idx_in_batch] =
          RetrieveBlock(nullptr, options, handle, uncompression_dict,
                        &(*results)[idx_in_batch], BlockType::kData,
                        mget_iter->get_context, &lookup_data_block_context,
                        /* for_compaction */ false, /* use_cache */ true,
                        /* wait_for_cache */ true);
    }
    return;
  }

  // In direct IO mode, blocks share the direct io buffer.
  // Otherwise, blocks share the scratch buffer.
  const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;

  autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
  size_t buf_offset = 0;
  size_t idx_in_batch = 0;

  uint64_t prev_offset = 0;
  size_t prev_len = 0;
  autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
  autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
  for (auto mget_iter = batch->begin(); mget_iter != batch->end();
       ++mget_iter, ++idx_in_batch) {
    const BlockHandle& handle = (*handles)[idx_in_batch];
    if (handle.IsNull()) {
      continue;
    }

    size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;

    // If current block is adjacent to the previous one, at the same time,
    // compression is enabled and there is no compressed cache, we combine
    // the two block read as one.
    // We don't combine block reads here in direct IO mode, because when doing
    // direct IO read, the block requests will be realigned and merged when
    // necessary.
    if (use_shared_buffer && !file->use_direct_io() &&
        prev_end == handle.offset()) {
      req_offset_for_block.emplace_back(prev_len);
      prev_len += BlockSizeWithTrailer(handle);
    } else {
      // No compression or current block and previous one is not adjacent:
      // Step 1, create a new request for previous blocks
      if (prev_len != 0) {
        FSReadRequest req;
        req.offset = prev_offset;
        req.len = prev_len;
        if (file->use_direct_io()) {
          req.scratch = nullptr;
        } else if (use_shared_buffer) {
          req.scratch = scratch + buf_offset;
          buf_offset += req.len;
        } else {
          req.scratch = new char[req.len];
        }
        read_reqs.emplace_back(req);
      }

      // Step 2, remeber the previous block info
      prev_offset = handle.offset();
      prev_len = BlockSizeWithTrailer(handle);
      req_offset_for_block.emplace_back(0);
    }
    req_idx_for_block.emplace_back(read_reqs.size());

    PERF_COUNTER_ADD(block_read_count, 1);
    PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle));
  }
  // Handle the last block and process the pending last request
  if (prev_len != 0) {
    FSReadRequest req;
    req.offset = prev_offset;
    req.len = prev_len;
    if (file->use_direct_io()) {
      req.scratch = nullptr;
    } else if (use_shared_buffer) {
      req.scratch = scratch + buf_offset;
    } else {
      req.scratch = new char[req.len];
    }
    read_reqs.emplace_back(req);
  }

  AlignedBuf direct_io_buf;
  {
    IOOptions opts;
    IOStatus s = file->PrepareIOOptions(options, opts);
    if (s.ok()) {
      s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf,
                          options.rate_limiter_priority);
    }
    if (!s.ok()) {
      // Discard all the results in this batch if there is any time out
      // or overall MultiRead error
      for (FSReadRequest& req : read_reqs) {
        req.status = s;
      }
    }
  }

  idx_in_batch = 0;
  size_t valid_batch_idx = 0;
  for (auto mget_iter = batch->begin(); mget_iter != batch->end();
       ++mget_iter, ++idx_in_batch) {
    const BlockHandle& handle = (*handles)[idx_in_batch];

    if (handle.IsNull()) {
      continue;
    }

    assert(valid_batch_idx < req_idx_for_block.size());
    assert(valid_batch_idx < req_offset_for_block.size());
    assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
    size_t& req_idx = req_idx_for_block[valid_batch_idx];
    size_t& req_offset = req_offset_for_block[valid_batch_idx];
    valid_batch_idx++;
    if (mget_iter->get_context) {
      ++(mget_iter->get_context->get_context_stats_.num_data_read);
    }
    FSReadRequest& req = read_reqs[req_idx];
    Status s = req.status;
    if (s.ok()) {
      if ((req.result.size() != req.len) ||
          (req_offset + BlockSizeWithTrailer(handle) > req.result.size())) {
        s = Status::Corruption(
            "truncated block read from " + rep_->file->file_name() +
            " offset " + ToString(handle.offset()) + ", expected " +
            ToString(req.len) + " bytes, got " + ToString(req.result.size()));
      }
    }

    BlockContents raw_block_contents;
    if (s.ok()) {
      if (!use_shared_buffer) {
        // We allocated a buffer for this block. Give ownership of it to
        // BlockContents so it can free the memory
        assert(req.result.data() == req.scratch);
        assert(req.result.size() == BlockSizeWithTrailer(handle));
        assert(req_offset == 0);
        std::unique_ptr<char[]> raw_block(req.scratch);
        raw_block_contents = BlockContents(std::move(raw_block), handle.size());
      } else {
        // We used the scratch buffer or direct io buffer
        // which are shared by the blocks.
        // raw_block_contents does not have the ownership.
        raw_block_contents =
            BlockContents(Slice(req.result.data() + req_offset, handle.size()));
      }
#ifndef NDEBUG
      raw_block_contents.is_raw_block = true;
#endif

      if (options.verify_checksums) {
        PERF_TIMER_GUARD(block_checksum_time);
        const char* data = req.result.data();
        // Since the scratch might be shared, the offset of the data block in
        // the buffer might not be 0. req.result.data() only point to the
        // begin address of each read request, we need to add the offset
        // in each read request. Checksum is stored in the block trailer,
        // beyond the payload size.
        s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset,
                                handle.size(), rep_->file->file_name(),
                                handle.offset());
        TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
      }
    } else if (!use_shared_buffer) {
      // Free the allocated scratch buffer.
      delete[] req.scratch;
    }

    if (s.ok()) {
      // When the blocks share the same underlying buffer (scratch or direct io
      // buffer), we may need to manually copy the block into heap if the raw
      // block has to be inserted into a cache. That falls into th following
      // cases -
      // 1. Raw block is not compressed, it needs to be inserted into the
      //    uncompressed block cache if there is one
      // 2. If the raw block is compressed, it needs to be inserted into the
      //    compressed block cache if there is one
      //
      // In all other cases, the raw block is either uncompressed into a heap
      // buffer or there is no cache at all.
      CompressionType compression_type =
          GetBlockCompressionType(raw_block_contents);
      if (use_shared_buffer && (compression_type == kNoCompression ||
                                (compression_type != kNoCompression &&
                                 rep_->table_options.block_cache_compressed))) {
        Slice raw =
            Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle));
        raw_block_contents = BlockContents(
            CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
            handle.size());
#ifndef NDEBUG
        raw_block_contents.is_raw_block = true;
#endif
      }
    }

    if (s.ok()) {
      if (options.fill_cache) {
        BlockCacheLookupContext lookup_data_block_context(
            TableReaderCaller::kUserMultiGet);
        CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
        // MaybeReadBlockAndLoadToCache will insert into the block caches if
        // necessary. Since we're passing the raw block contents, it will
        // avoid looking up the block cache
        s = MaybeReadBlockAndLoadToCache(
            nullptr, options, handle, uncompression_dict, /*wait=*/true,
            /*for_compaction=*/false, block_entry, BlockType::kData,
            mget_iter->get_context, &lookup_data_block_context,
            &raw_block_contents);

        // block_entry value could be null if no block cache is present, i.e
        // BlockBasedTableOptions::no_block_cache is true and no compressed
        // block cache is configured. In that case, fall
        // through and set up the block explicitly
        if (block_entry->GetValue() != nullptr) {
          s.PermitUncheckedError();
          continue;
        }
      }

      CompressionType compression_type =
          GetBlockCompressionType(raw_block_contents);
      BlockContents contents;
      if (compression_type != kNoCompression) {
        UncompressionContext context(compression_type);
        UncompressionInfo info(context, uncompression_dict, compression_type);
        s = UncompressBlockContents(
            info, req.result.data() + req_offset, handle.size(), &contents,
            footer.format_version(), rep_->ioptions, memory_allocator);
      } else {
        // There are two cases here:
        // 1) caller uses the shared buffer (scratch or direct io buffer);
        // 2) we use the requst buffer.
        // If scratch buffer or direct io buffer is used, we ensure that
        // all raw blocks are copyed to the heap as single blocks. If scratch
        // buffer is not used, we also have no combined read, so the raw
        // block can be used directly.
        contents = std::move(raw_block_contents);
      }
      if (s.ok()) {
        (*results)[idx_in_batch].SetOwnedValue(new Block(
            std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
      }
    }
    (*statuses)[idx_in_batch] = s;
  }
}