in hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java [1271:1405]
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly)
throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException(path + " block index not loaded");
}
long trailerOffset = trailer.getLoadOnOpenDataOffset();
if (dataBlockOffset < 0 || dataBlockOffset >= trailerOffset) {
throw new IOException("Requested block is out of range: " + dataBlockOffset
+ ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset()
+ ", trailer.getLoadOnOpenDataOffset: " + trailerOffset + ", path=" + path);
}
// For any given block from any given file, synchronize reads for said
// block.
// Without a cache, this synchronizing is needless overhead, but really
// the other choice is to duplicate work (which the cache would prevent you
// from doing).
BlockCacheKey cacheKey =
new BlockCacheKey(path, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
boolean useLock = false;
IdLock.Entry lockEntry = null;
final Span span = Span.current();
// BlockCacheKey#toString() is quite expensive to call, so if tracing isn't enabled, don't
// record
Attributes attributes = span.isRecording()
? Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString())
: Attributes.empty();
try {
while (true) {
// Check cache for block. If found return.
if (cacheConf.shouldReadBlockFromCache(expectedBlockType) && !cacheOnly) {
if (useLock) {
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
}
// Try and get the block from the block cache. If the useLock variable is true then this
// is the second time through the loop and it should not be counted as a block cache miss.
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics,
expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Block for file {} is coming from Cache {}",
Bytes.toString(cachedBlock.getHFileContext().getTableName()), cachedBlock);
}
span.addEvent("block cache hit", attributes);
assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) {
if (updateCacheMetrics) {
HFile.DATABLOCK_READ_COUNT.increment();
}
// Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
// Remember to release the block when in exceptional path.
cacheConf.getBlockCache().ifPresent(cache -> {
returnAndEvictBlock(cache, cacheKey, cachedBlock);
});
throw new IOException("Cached block under key " + cacheKey + " "
+ "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+ dataBlockEncoder.getDataBlockEncoding() + "), path=" + path);
}
}
// Cache-hit. Return!
return cachedBlock;
}
if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
// check cache again with lock
useLock = true;
continue;
}
// Carry on, please load.
}
span.addEvent("block cache miss", attributes);
// Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType, cacheBlock));
try {
validateBlockType(hfileBlock, expectedBlockType);
} catch (IOException e) {
hfileBlock.release();
throw e;
}
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
HFileBlock blockNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock);
cacheConf.getBlockCache().ifPresent(cache -> {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly);
}
});
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.DATABLOCK_READ_COUNT.increment();
}
return blockNoChecksum;
}
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey,
cacheCompressed
? BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock)
: unpackedNoChecksum,
cacheConf.isInMemory(), cacheOnly);
}
});
if (unpacked != hfileBlock) {
// End of life here if hfileBlock is an independent block.
hfileBlock.release();
}
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.DATABLOCK_READ_COUNT.increment();
}
return unpackedNoChecksum;
}
} finally {
if (lockEntry != null) {
offsetLock.releaseLockEntry(lockEntry);
}
}
}