in parquet/src/arrow/arrow_reader/mod.rs [328:398]
fn next(&mut self) -> Option<Self::Item> {
let mut read_records = 0;
match self.selection.as_mut() {
Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped =
match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};
if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
}
continue;
}
//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
if front.row_count == 0 {
continue;
}
// try to read record
let need_read = self.batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read) {
Ok(0) => break,
Ok(rec) => read_records += rec,
Err(error) => return Some(Err(error.into())),
}
}
}
None => {
if let Err(error) = self.array_reader.read_records(self.batch_size) {
return Some(Err(error.into()));
}
}
};
match self.array_reader.consume_batch() {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
});
match struct_array {
Err(err) => Some(Err(err)),
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
}
}
}
}