private void scanInternalV1()

in hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java [215:412]


  private void scanInternalV1(Option<KeySpec> keySpecOpt) {
    currentInstantLogBlocks = new ArrayDeque<>();
    List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
    Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit = new HashMap<>();

    progress = 0.0f;
    totalLogFiles = new AtomicLong(0);
    totalRollbacks = new AtomicLong(0);
    totalCorruptBlocks = new AtomicLong(0);
    totalLogBlocks = new AtomicLong(0);
    totalLogRecords = new AtomicLong(0);
    HoodieLogFormatReader logFormatReaderWrapper = null;
    HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
    HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
    HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
    try {
      // Iterate over the paths
      logFormatReaderWrapper = new HoodieLogFormatReader(storage,
          logFilePaths.stream().map(logFile -> new HoodieLogFile(new StoragePath(logFile))).collect(Collectors.toList()),
          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);

      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
      while (logFormatReaderWrapper.hasNext()) {
        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
        LOG.info("Scanning log file {}", logFile);
        scannedLogFiles.add(logFile);
        totalLogFiles.set(scannedLogFiles.size());
        // Use the HoodieLogFileReader to iterate through the blocks in the log file
        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
        final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
        final String blockSequenceNumberStr = logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, "");
        int blockSeqNo = -1;
        long attemptNo = -1L;
        if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
          String[] parts = blockSequenceNumberStr.split(",");
          attemptNo = Long.parseLong(parts[0]);
          blockSeqNo = Integer.parseInt(parts[1]);
        }
        totalLogBlocks.incrementAndGet();
        if (logBlock.isDataOrDeleteBlock()) {
          if (compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THAN, this.latestInstantTime)) {
            // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
            continue;
          }
          if (!allowInflightInstants
              && (inflightInstantsTimeline.containsInstant(instantTime) || !completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime))) {
            // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
            continue;
          }
          if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
            // filter the log block by instant range
            continue;
          }
        }

        switch (logBlock.getBlockType()) {
          case HFILE_DATA_BLOCK:
          case AVRO_DATA_BLOCK:
          case PARQUET_DATA_BLOCK:
            LOG.info("Reading a data block from file {} at instant {}", logFile.getPath(), instantTime);
            // store the current block
            currentInstantLogBlocks.push(logBlock);
            validLogBlockInstants.add(logBlock);
            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo,
                blockSequenceMapPerCommit);
            break;
          case DELETE_BLOCK:
            LOG.info("Reading a delete block from file {}", logFile.getPath());
            // store deletes so can be rolled back
            currentInstantLogBlocks.push(logBlock);
            validLogBlockInstants.add(logBlock);
            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo,
                blockSequenceMapPerCommit);
            break;
          case COMMAND_BLOCK:
            // Consider the following scenario
            // (Time 0, C1, Task T1) -> Running
            // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct
            // DataBlock (B1) with commitTime C1
            // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
            // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
            // Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
            // Say, commit C1 eventually failed and a rollback is triggered.
            // Rollback will write only 1 rollback block (R1) since it assumes one block is
            // written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
            // The following code ensures the same rollback block (R1) is used to rollback
            // both B1 & B2
            // This is a command block - take appropriate action based on the command
            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
            String targetInstantForCommandBlock =
                logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
            LOG.info("Reading a command block {} with targetInstantTime {} from file {}", commandBlock.getType(), targetInstantForCommandBlock,
                logFile.getPath());
            switch (commandBlock.getType()) { // there can be different types of command blocks
              case ROLLBACK_BLOCK:
                // Rollback older read log block(s)
                // Get commit time from older record blocks, compare with targetCommitTime,
                // rollback only if equal, this is required in scenarios of invalid/extra
                // rollback blocks written due to failures during the rollback operation itself
                // and ensures the same rollback block (R1) is used to rollback both B1 & B2 with
                // same instant_time.
                final int instantLogBlockSizeBeforeRollback = currentInstantLogBlocks.size();
                currentInstantLogBlocks.removeIf(block -> {
                  // handle corrupt blocks separately since they may not have metadata
                  if (block.getBlockType() == CORRUPT_BLOCK) {
                    LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath());
                    return true;
                  }
                  if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) {
                    // rollback older data block or delete block
                    LOG.info("Rolling back an older log block read from {} with instantTime {}",
                        logFile.getPath(), targetInstantForCommandBlock);
                    return true;
                  }
                  return false;
                });

                // remove entire entry from blockSequenceTracker
                blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);

                /// remove all matching log blocks from valid list tracked so far
                validLogBlockInstants = validLogBlockInstants.stream().filter(block -> {
                  // handle corrupt blocks separately since they may not have metadata
                  if (block.getBlockType() == CORRUPT_BLOCK) {
                    LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath());
                    return true;
                  }
                  if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) {
                    // rollback older data block or delete block
                    LOG.info(
                        "Rolling back an older log block read from {} with instantTime {}",
                        logFile.getPath(), targetInstantForCommandBlock);
                    return false;
                  }
                  return true;
                }).collect(Collectors.toList());

                final int numBlocksRolledBack = instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
                totalRollbacks.addAndGet(numBlocksRolledBack);
                LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
                if (numBlocksRolledBack == 0) {
                  LOG.warn("TargetInstantTime {} invalid or extra rollback command block in {}",
                      targetInstantForCommandBlock, logFile.getPath());
                }
                break;
              default:
                throw new UnsupportedOperationException("Command type not yet supported.");
            }
            break;
          case CORRUPT_BLOCK:
            LOG.info("Found a corrupt block in {}", logFile.getPath());
            totalCorruptBlocks.incrementAndGet();
            // If there is a corrupt block - we will assume that this was the next data block
            currentInstantLogBlocks.push(logBlock);
            validLogBlockInstants.add(logBlock);
            // we don't need to update the block sequence tracker here, since the block sequence tracker is meant to remove additional/spurious valid logblocks.
            // anyway, contents of corrupt blocks are not read.
            break;
          default:
            throw new UnsupportedOperationException("Block type not supported yet");
        }
      }
      // merge the last read block when all the blocks are done reading
      if (!currentInstantLogBlocks.isEmpty()) {
        Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit);
        if (dedupedLogBlocksInfo.getKey()) {
          // if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks
          currentInstantLogBlocks = new ArrayDeque<>();
          dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block));
          LOG.info("Merging the final data blocks");
          processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
        } else {
          // if there are no dups, we can take currentInstantLogBlocks as is.
          LOG.info("Merging the final data blocks");
          processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
        }
      }

      // Done
      progress = 1.0f;
      totalLogRecords.set(recordBuffer.getTotalLogRecords());
    } catch (IOException e) {
      LOG.error("Got IOException when reading log file", e);
      throw new HoodieIOException("IOException when reading log file ", e);
    } catch (Exception e) {
      LOG.error("Got exception when reading log file", e);
      throw new HoodieException("Exception when reading log file ", e);
    } finally {
      try {
        if (null != logFormatReaderWrapper) {
          logFormatReaderWrapper.close();
        }
      } catch (IOException ioe) {
        // Eat exception as we do not want to mask the original exception that can happen
        LOG.error("Unable to close log format reader", ioe);
      }
    }
  }