in table/block_based/block_based_table_reader.cc [1652:1924]
void BlockBasedTable::RetrieveMultipleBlocks(
const ReadOptions& options, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
char* scratch, const UncompressionDict& uncompression_dict) const {
RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer;
const ImmutableOptions& ioptions = rep_->ioptions;
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
if (ioptions.allow_mmap_reads) {
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
(*statuses)[idx_in_batch] =
RetrieveBlock(nullptr, options, handle, uncompression_dict,
&(*results)[idx_in_batch], BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
/* for_compaction */ false, /* use_cache */ true,
/* wait_for_cache */ true);
}
return;
}
// In direct IO mode, blocks share the direct io buffer.
// Otherwise, blocks share the scratch buffer.
const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
size_t buf_offset = 0;
size_t idx_in_batch = 0;
uint64_t prev_offset = 0;
size_t prev_len = 0;
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
// If current block is adjacent to the previous one, at the same time,
// compression is enabled and there is no compressed cache, we combine
// the two block read as one.
// We don't combine block reads here in direct IO mode, because when doing
// direct IO read, the block requests will be realigned and merged when
// necessary.
if (use_shared_buffer && !file->use_direct_io() &&
prev_end == handle.offset()) {
req_offset_for_block.emplace_back(prev_len);
prev_len += BlockSizeWithTrailer(handle);
} else {
// No compression or current block and previous one is not adjacent:
// Step 1, create a new request for previous blocks
if (prev_len != 0) {
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (file->use_direct_io()) {
req.scratch = nullptr;
} else if (use_shared_buffer) {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
} else {
req.scratch = new char[req.len];
}
read_reqs.emplace_back(req);
}
// Step 2, remeber the previous block info
prev_offset = handle.offset();
prev_len = BlockSizeWithTrailer(handle);
req_offset_for_block.emplace_back(0);
}
req_idx_for_block.emplace_back(read_reqs.size());
PERF_COUNTER_ADD(block_read_count, 1);
PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle));
}
// Handle the last block and process the pending last request
if (prev_len != 0) {
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (file->use_direct_io()) {
req.scratch = nullptr;
} else if (use_shared_buffer) {
req.scratch = scratch + buf_offset;
} else {
req.scratch = new char[req.len];
}
read_reqs.emplace_back(req);
}
AlignedBuf direct_io_buf;
{
IOOptions opts;
IOStatus s = file->PrepareIOOptions(options, opts);
if (s.ok()) {
s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf,
options.rate_limiter_priority);
}
if (!s.ok()) {
// Discard all the results in this batch if there is any time out
// or overall MultiRead error
for (FSReadRequest& req : read_reqs) {
req.status = s;
}
}
}
idx_in_batch = 0;
size_t valid_batch_idx = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
assert(valid_batch_idx < req_idx_for_block.size());
assert(valid_batch_idx < req_offset_for_block.size());
assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
size_t& req_idx = req_idx_for_block[valid_batch_idx];
size_t& req_offset = req_offset_for_block[valid_batch_idx];
valid_batch_idx++;
if (mget_iter->get_context) {
++(mget_iter->get_context->get_context_stats_.num_data_read);
}
FSReadRequest& req = read_reqs[req_idx];
Status s = req.status;
if (s.ok()) {
if ((req.result.size() != req.len) ||
(req_offset + BlockSizeWithTrailer(handle) > req.result.size())) {
s = Status::Corruption(
"truncated block read from " + rep_->file->file_name() +
" offset " + ToString(handle.offset()) + ", expected " +
ToString(req.len) + " bytes, got " + ToString(req.result.size()));
}
}
BlockContents raw_block_contents;
if (s.ok()) {
if (!use_shared_buffer) {
// We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory
assert(req.result.data() == req.scratch);
assert(req.result.size() == BlockSizeWithTrailer(handle));
assert(req_offset == 0);
std::unique_ptr<char[]> raw_block(req.scratch);
raw_block_contents = BlockContents(std::move(raw_block), handle.size());
} else {
// We used the scratch buffer or direct io buffer
// which are shared by the blocks.
// raw_block_contents does not have the ownership.
raw_block_contents =
BlockContents(Slice(req.result.data() + req_offset, handle.size()));
}
#ifndef NDEBUG
raw_block_contents.is_raw_block = true;
#endif
if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
const char* data = req.result.data();
// Since the scratch might be shared, the offset of the data block in
// the buffer might not be 0. req.result.data() only point to the
// begin address of each read request, we need to add the offset
// in each read request. Checksum is stored in the block trailer,
// beyond the payload size.
s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset,
handle.size(), rep_->file->file_name(),
handle.offset());
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
}
} else if (!use_shared_buffer) {
// Free the allocated scratch buffer.
delete[] req.scratch;
}
if (s.ok()) {
// When the blocks share the same underlying buffer (scratch or direct io
// buffer), we may need to manually copy the block into heap if the raw
// block has to be inserted into a cache. That falls into th following
// cases -
// 1. Raw block is not compressed, it needs to be inserted into the
// uncompressed block cache if there is one
// 2. If the raw block is compressed, it needs to be inserted into the
// compressed block cache if there is one
//
// In all other cases, the raw block is either uncompressed into a heap
// buffer or there is no cache at all.
CompressionType compression_type =
GetBlockCompressionType(raw_block_contents);
if (use_shared_buffer && (compression_type == kNoCompression ||
(compression_type != kNoCompression &&
rep_->table_options.block_cache_compressed))) {
Slice raw =
Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle));
raw_block_contents = BlockContents(
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
handle.size());
#ifndef NDEBUG
raw_block_contents.is_raw_block = true;
#endif
}
}
if (s.ok()) {
if (options.fill_cache) {
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
// MaybeReadBlockAndLoadToCache will insert into the block caches if
// necessary. Since we're passing the raw block contents, it will
// avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(
nullptr, options, handle, uncompression_dict, /*wait=*/true,
/*for_compaction=*/false, block_entry, BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
&raw_block_contents);
// block_entry value could be null if no block cache is present, i.e
// BlockBasedTableOptions::no_block_cache is true and no compressed
// block cache is configured. In that case, fall
// through and set up the block explicitly
if (block_entry->GetValue() != nullptr) {
s.PermitUncheckedError();
continue;
}
}
CompressionType compression_type =
GetBlockCompressionType(raw_block_contents);
BlockContents contents;
if (compression_type != kNoCompression) {
UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents(
info, req.result.data() + req_offset, handle.size(), &contents,
footer.format_version(), rep_->ioptions, memory_allocator);
} else {
// There are two cases here:
// 1) caller uses the shared buffer (scratch or direct io buffer);
// 2) we use the requst buffer.
// If scratch buffer or direct io buffer is used, we ensure that
// all raw blocks are copyed to the heap as single blocks. If scratch
// buffer is not used, we also have no combined read, so the raw
// block can be used directly.
contents = std::move(raw_block_contents);
}
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(new Block(
std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
}
}
(*statuses)[idx_in_batch] = s;
}
}