in crates/core/src/file_group/log_file/reader.rs [224:263]
fn read_next_block(&mut self, instant_range: &InstantRange) -> Result<Option<LogBlock>> {
if !self.read_magic()? {
return Ok(None);
}
let curr_pos = self
.reader
.stream_position()
.map_err(CoreError::ReadLogFileError)?;
let (block_length, _) = self.read_block_length_or_corrupted_block(curr_pos)?;
let format_version = self.read_log_format_version()?;
let block_type = self.read_block_type(&format_version)?;
let header = self.read_block_metadata(BlockMetadataType::Header, &format_version)?;
let mut skipped = false;
if self.should_skip_block(&header, instant_range)? {
skipped = true;
// TODO skip reading block
}
let decoder = Decoder::new(self.hudi_configs.clone());
let record_batches = decoder.decode_content(
self.reader.by_ref(),
&format_version,
block_length,
&block_type,
&header,
)?;
let footer = self.read_block_metadata(BlockMetadataType::Footer, &format_version)?;
let _ = self.read_total_block_length(&format_version)?;
Ok(Some(LogBlock {
format_version,
block_type,
header,
record_batches,
footer,
skipped,
}))
}