in hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java [215:412]
private void scanInternalV1(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit = new HashMap<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
totalRollbacks = new AtomicLong(0);
totalCorruptBlocks = new AtomicLong(0);
totalLogBlocks = new AtomicLong(0);
totalLogRecords = new AtomicLong(0);
HoodieLogFormatReader logFormatReaderWrapper = null;
HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(storage,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new StoragePath(logFile))).collect(Collectors.toList()),
readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
LOG.info("Scanning log file {}", logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the log file
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
final String blockSequenceNumberStr = logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, "");
int blockSeqNo = -1;
long attemptNo = -1L;
if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
String[] parts = blockSequenceNumberStr.split(",");
attemptNo = Long.parseLong(parts[0]);
blockSeqNo = Integer.parseInt(parts[1]);
}
totalLogBlocks.incrementAndGet();
if (logBlock.isDataOrDeleteBlock()) {
if (compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
}
if (!allowInflightInstants
&& (inflightInstantsTimeline.containsInstant(instantTime) || !completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime))) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
continue;
}
if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
}
}
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
LOG.info("Reading a data block from file {} at instant {}", logFile.getPath(), instantTime);
// store the current block
currentInstantLogBlocks.push(logBlock);
validLogBlockInstants.add(logBlock);
updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo,
blockSequenceMapPerCommit);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file {}", logFile.getPath());
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
validLogBlockInstants.add(logBlock);
updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo,
blockSequenceMapPerCommit);
break;
case COMMAND_BLOCK:
// Consider the following scenario
// (Time 0, C1, Task T1) -> Running
// (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct
// DataBlock (B1) with commitTime C1
// (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
// (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
// Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
// Say, commit C1 eventually failed and a rollback is triggered.
// Rollback will write only 1 rollback block (R1) since it assumes one block is
// written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback
// both B1 & B2
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
LOG.info("Reading a command block {} with targetInstantTime {} from file {}", commandBlock.getType(), targetInstantForCommandBlock,
logFile.getPath());
switch (commandBlock.getType()) { // there can be different types of command blocks
case ROLLBACK_BLOCK:
// Rollback older read log block(s)
// Get commit time from older record blocks, compare with targetCommitTime,
// rollback only if equal, this is required in scenarios of invalid/extra
// rollback blocks written due to failures during the rollback operation itself
// and ensures the same rollback block (R1) is used to rollback both B1 & B2 with
// same instant_time.
final int instantLogBlockSizeBeforeRollback = currentInstantLogBlocks.size();
currentInstantLogBlocks.removeIf(block -> {
// handle corrupt blocks separately since they may not have metadata
if (block.getBlockType() == CORRUPT_BLOCK) {
LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath());
return true;
}
if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) {
// rollback older data block or delete block
LOG.info("Rolling back an older log block read from {} with instantTime {}",
logFile.getPath(), targetInstantForCommandBlock);
return true;
}
return false;
});
// remove entire entry from blockSequenceTracker
blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);
/// remove all matching log blocks from valid list tracked so far
validLogBlockInstants = validLogBlockInstants.stream().filter(block -> {
// handle corrupt blocks separately since they may not have metadata
if (block.getBlockType() == CORRUPT_BLOCK) {
LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath());
return true;
}
if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) {
// rollback older data block or delete block
LOG.info(
"Rolling back an older log block read from {} with instantTime {}",
logFile.getPath(), targetInstantForCommandBlock);
return false;
}
return true;
}).collect(Collectors.toList());
final int numBlocksRolledBack = instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
totalRollbacks.addAndGet(numBlocksRolledBack);
LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
if (numBlocksRolledBack == 0) {
LOG.warn("TargetInstantTime {} invalid or extra rollback command block in {}",
targetInstantForCommandBlock, logFile.getPath());
}
break;
default:
throw new UnsupportedOperationException("Command type not yet supported.");
}
break;
case CORRUPT_BLOCK:
LOG.info("Found a corrupt block in {}", logFile.getPath());
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(logBlock);
validLogBlockInstants.add(logBlock);
// we don't need to update the block sequence tracker here, since the block sequence tracker is meant to remove additional/spurious valid logblocks.
// anyway, contents of corrupt blocks are not read.
break;
default:
throw new UnsupportedOperationException("Block type not supported yet");
}
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit);
if (dedupedLogBlocksInfo.getKey()) {
// if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks
currentInstantLogBlocks = new ArrayDeque<>();
dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block));
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
} else {
// if there are no dups, we can take currentInstantLogBlocks as is.
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
}
// Done
progress = 1.0f;
totalLogRecords.set(recordBuffer.getTotalLogRecords());
} catch (IOException e) {
LOG.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
LOG.error("Got exception when reading log file", e);
throw new HoodieException("Exception when reading log file ", e);
} finally {
try {
if (null != logFormatReaderWrapper) {
logFormatReaderWrapper.close();
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that can happen
LOG.error("Unable to close log format reader", ioe);
}
}
}