in hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java [1713:1884]
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
boolean intoHeap) throws IOException {
final Span span = Span.current();
final AttributesBuilder attributesBuilder = Attributes.builder();
Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
.ifPresent(c -> c.accept(attributesBuilder));
if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
+ onDiskSizeWithHeaderL + ")");
}
if (!checkCallerProvidedOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) {
LOG.trace("Caller provided invalid onDiskSizeWithHeaderL={}", onDiskSizeWithHeaderL);
onDiskSizeWithHeaderL = -1;
}
int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
// Try to use the cached header. Will serve us in rare case where onDiskSizeWithHeaderL==-1
// and will save us having to seek the stream backwards to reread the header we
// read the last time through here.
ByteBuff headerBuf = getCachedHeader(offset);
LOG.trace(
"Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, "
+ "onDiskSizeWithHeader={}",
this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf,
onDiskSizeWithHeader);
// This is NOT same as verifyChecksum. This latter is whether to do hbase
// checksums. Can change with circumstances. The below flag is whether the
// file has support for checksums (version 2+).
boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
long startTime = EnvironmentEdgeManager.currentTime();
if (onDiskSizeWithHeader == -1) {
// The caller does not know the block size. Need to get it from the header. If header was
// not cached (see getCachedHeader above), need to seek to pull it in. This is costly
// and should happen very rarely. Currently happens on open of a hfile reader where we
// read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
// out of the hfile index. To check, enable TRACE in this file and you'll get an exception
// in a LOG every time we seek. See HBASE-17072 for more detail.
if (headerBuf == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Extra seek to get block size!", new RuntimeException());
}
span.addEvent("Extra seek to get block size!", attributesBuilder.build());
headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind();
}
onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
}
// Inspect the header's checksumType for known valid values. If we don't find such a value,
// assume that the bytes read are corrupted.We will clear the cached value and roll back to
// HDFS checksum
if (!checkCheckSumTypeOnHeaderBuf(headerBuf)) {
if (verifyChecksum) {
invalidateNextBlockHeader();
span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build());
return null;
} else {
throw new IOException(
"Unknown checksum type code " + headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX)
+ "for file " + pathName + ", the headerBuf of HFileBlock may corrupted.");
}
}
// The common case is that onDiskSizeWithHeader was produced by a read without checksum
// validation, so give it a sanity check before trying to use it.
if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) {
if (verifyChecksum) {
invalidateNextBlockHeader();
span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build());
return null;
} else {
throw new IOException("Invalid onDiskSizeWithHeader=" + onDiskSizeWithHeader);
}
}
int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
// Allocate enough space to fit the next block's header too; saves a seek next time through.
// onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
// onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
// says where to start reading. If we have the header cached, then we don't need to read
// it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here.
ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
boolean initHFileBlockSuccess = false;
try {
if (headerBuf != null) {
onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
}
boolean readNextHeader = readAtOffset(is, onDiskBlock,
onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
onDiskBlock.rewind(); // in case of moving position when copying a cached header
// the call to validateChecksum for this block excludes the next block header over-read, so
// no reason to delay extracting this value.
int nextBlockOnDiskSize = -1;
if (readNextHeader) {
int parsedVal = getNextBlockOnDiskSize(onDiskBlock, onDiskSizeWithHeader);
if (checkOnDiskSizeWithHeader(parsedVal)) {
nextBlockOnDiskSize = parsedVal;
}
}
if (headerBuf == null) {
headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
}
ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
// Verify checksum of the data before using it for building HFileBlock.
if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
invalidateNextBlockHeader();
span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build());
return null;
}
// TODO: is this check necessary or can we proceed with a provided value regardless of
// what is in the header?
int fromHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
if (onDiskSizeWithHeader != fromHeader) {
if (LOG.isTraceEnabled()) {
LOG.trace("Passed in onDiskSizeWithHeader={} != {}, offset={}, fileContext={}",
onDiskSizeWithHeader, fromHeader, offset, this.fileContext);
}
if (checksumSupport && verifyChecksum) {
// This file supports HBase checksums and verification of those checksums was
// requested. The block size provided by the caller (presumably from the block index)
// does not match the block size written to the block header. treat this as
// HBase-checksum failure.
span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build());
invalidateNextBlockHeader();
return null;
}
throw new IOException("Passed in onDiskSizeWithHeader=" + onDiskSizeWithHeader + " != "
+ fromHeader + ", offset=" + offset + ", fileContext=" + this.fileContext);
}
// remove checksum from buffer now that it's verified
int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
curBlock.limit(sizeWithoutChecksum);
long duration = EnvironmentEdgeManager.currentTime() - startTime;
boolean tooSlow = this.readWarnTime >= 0 && duration > this.readWarnTime;
if (updateMetrics) {
HFile.updateReadLatency(duration, pread, tooSlow);
}
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
// Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed();
}
LOG.trace("Read {} in {} ms", hFileBlock, duration);
if (!LOG.isTraceEnabled() && tooSlow) {
LOG.warn("Read Block Slow: read {} cost {} ms, threshold = {} ms", hFileBlock, duration,
this.readWarnTime);
}
span.addEvent("Read block", attributesBuilder.build());
// Cache next block header if we read it for the next time through here.
if (nextBlockOnDiskSize != -1) {
cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
onDiskSizeWithHeader, hdrSize);
}
initHFileBlockSuccess = true;
return hFileBlock;
} finally {
if (!initHFileBlockSuccess) {
onDiskBlock.release();
}
}
}