in hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java [403:617]
private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
totalRollbacks = new AtomicLong(0);
totalCorruptBlocks = new AtomicLong(0);
totalLogBlocks = new AtomicLong(0);
totalLogRecords = new AtomicLong(0);
HoodieLogFormatReader logFormatReaderWrapper = null;
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(storage,
logFilePaths.stream()
.map(logFile -> new HoodieLogFile(new StoragePath(logFile)))
.collect(Collectors.toList()),
readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
/**
* Scanning log blocks and placing the compacted blocks at the right place require two traversals.
* First traversal to identify the rollback blocks and valid data and compacted blocks.
*
* Scanning blocks is easy to do in single writer mode, where the rollback block is right after the effected data blocks.
* With multi-writer mode the blocks can be out of sync. An example scenario.
* B1, B2, B3, B4, R1(B3), B5
* In this case, rollback block R1 is invalidating the B3 which is not the previous block.
* This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction.
*
* To solve this, run a single traversal, collect all the valid blocks that are not corrupted
* along with the block instant times and rollback block's target instant times.
*
* As part of second traversal iterate block instant times in reverse order.
* While iterating in reverse order keep a track of final compacted instant times for each block.
* In doing so, when a data block is seen include the final compacted block if it is not already added.
*
* find the final compacted block which contains the merged contents.
* For example B1 and B2 are merged and created a compacted block called M1 and now M1, B3 and B4 are merged and
* created another compacted block called M2. So, now M2 is the final block which contains all the changes of B1,B2,B3,B4.
* So, blockTimeToCompactionBlockTimeMap will look like
* (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
* This map is updated while iterating and is used to place the compacted blocks in the correct position.
* This way we can have multiple layers of merge blocks and still be able to find the correct positions of merged blocks.
*/
// Collect targetRollbackInstants, using which we can determine which blocks are invalid.
Set<String> targetRollbackInstants = new HashSet<>();
// This holds block instant time to list of blocks. Note here the log blocks can be normal data blocks or compacted log blocks.
Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
// Order of Instants.
List<String> orderedInstantsList = new ArrayList<>();
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
/*
* 1. First step to traverse in forward direction. While traversing the log blocks collect following,
* a. instant times
* b. instant to logblocks map.
* c. targetRollbackInstants.
*/
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
LOG.info("Scanning log file {}", logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the log file
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
// Ignore the corrupt blocks. No further handling is required for them.
if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
LOG.info("Found a corrupt block in {}", logFile.getPath());
totalCorruptBlocks.incrementAndGet();
continue;
}
if (logBlock.isDataOrDeleteBlock()
&& compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
}
if (logBlock.getBlockType() != COMMAND_BLOCK) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && !allowInflightInstants) {
if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
continue;
}
}
if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
}
}
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
case DELETE_BLOCK:
List<HoodieLogBlock> logBlocksList = instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
if (logBlocksList.isEmpty()) {
// Keep a track of instant Times in the order of arrival.
orderedInstantsList.add(instantTime);
}
logBlocksList.add(logBlock);
instantToBlocksMap.put(instantTime, logBlocksList);
break;
case COMMAND_BLOCK:
LOG.info("Reading a command block from file {}", logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
// Rollback blocks contain information of instants that are failed, collect them in a set..
if (commandBlock.getType().equals(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK)) {
totalRollbacks.incrementAndGet();
String targetInstantForCommandBlock = logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
targetRollbackInstants.add(targetInstantForCommandBlock);
orderedInstantsList.remove(targetInstantForCommandBlock);
instantToBlocksMap.remove(targetInstantForCommandBlock);
} else {
throw new UnsupportedOperationException("Command type not yet supported.");
}
break;
default:
throw new UnsupportedOperationException("Block type not yet supported.");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Ordered instant times seen {}", orderedInstantsList);
}
int numBlocksRolledBack = 0;
// All the block's instants time that are added to the queue are collected in this set.
Set<String> instantTimesIncluded = new HashSet<>();
// Key will have details related to instant time and value will be empty if that instant is not compacted.
// Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 -> i3.
Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
/*
* 2. Iterate the instants list in reverse order to get the latest instants first.
* While iterating update the blockTimeToCompactionBlockTimesMap and include the compacted blocks in right position.
*/
for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
String instantTime = orderedInstantsList.get(i);
List<HoodieLogBlock> instantsBlocks = instantToBlocksMap.get(instantTime);
if (instantsBlocks.isEmpty()) {
throw new HoodieException("Data corrupted while writing. Found zero blocks for an instant " + instantTime);
}
HoodieLogBlock firstBlock = instantsBlocks.get(0);
// For compacted blocks COMPACTED_BLOCK_TIMES entry is present under its headers.
if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) {
// When compacted blocks are seen update the blockTimeToCompactionBlockTimeMap.
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
.forEach(originalInstant -> {
String finalInstant = blockTimeToCompactionBlockTimeMap.getOrDefault(instantTime, instantTime);
blockTimeToCompactionBlockTimeMap.put(originalInstant, finalInstant);
});
} else {
// When a data block is found check if it is already compacted.
String compactedFinalInstantTime = blockTimeToCompactionBlockTimeMap.get(instantTime);
if (compactedFinalInstantTime == null) {
// If it is not compacted then add the blocks related to the instant time at the end of the queue and continue.
List<HoodieLogBlock> logBlocks = instantToBlocksMap.get(instantTime);
Collections.reverse(logBlocks);
logBlocks.forEach(block -> currentInstantLogBlocks.addLast(block));
instantTimesIncluded.add(instantTime);
validBlockInstants.add(instantTime);
continue;
}
// If the compacted block exists and it is already included in the dequeue then ignore and continue.
if (instantTimesIncluded.contains(compactedFinalInstantTime)) {
continue;
}
// If the compacted block exists and it is not already added then add all the blocks related to that instant time.
List<HoodieLogBlock> logBlocks = instantToBlocksMap.get(compactedFinalInstantTime);
Collections.reverse(logBlocks);
logBlocks.forEach(block -> currentInstantLogBlocks.addLast(block));
instantTimesIncluded.add(compactedFinalInstantTime);
validBlockInstants.add(compactedFinalInstantTime);
}
}
LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
if (LOG.isDebugEnabled()) {
LOG.info("Final view of the Block time to compactionBlockMap {}", blockTimeToCompactionBlockTimeMap);
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOption);
}
// Done
progress = 1.0f;
} catch (IOException e) {
LOG.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
LOG.error("Got exception when reading log file", e);
throw new HoodieException("Exception when reading log file ", e);
} finally {
try {
if (null != logFormatReaderWrapper) {
logFormatReaderWrapper.close();
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that can happen
LOG.error("Unable to close log format reader", ioe);
}
}
}