fn next()

in parquet/src/arrow/arrow_reader/mod.rs [328:398]


    fn next(&mut self) -> Option<Self::Item> {
        let mut read_records = 0;
        match self.selection.as_mut() {
            Some(selection) => {
                while read_records < self.batch_size && !selection.is_empty() {
                    let front = selection.pop_front().unwrap();
                    if front.skip {
                        let skipped =
                            match self.array_reader.skip_records(front.row_count) {
                                Ok(skipped) => skipped,
                                Err(e) => return Some(Err(e.into())),
                            };

                        if skipped != front.row_count {
                            return Some(Err(general_err!(
                                "failed to skip rows, expected {}, got {}",
                                front.row_count,
                                skipped
                            )
                            .into()));
                        }
                        continue;
                    }

                    //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
                    //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
                    if front.row_count == 0 {
                        continue;
                    }

                    // try to read record
                    let need_read = self.batch_size - read_records;
                    let to_read = match front.row_count.checked_sub(need_read) {
                        Some(remaining) if remaining != 0 => {
                            // if page row count less than batch_size we must set batch size to page row count.
                            // add check avoid dead loop
                            selection.push_front(RowSelector::select(remaining));
                            need_read
                        }
                        _ => front.row_count,
                    };
                    match self.array_reader.read_records(to_read) {
                        Ok(0) => break,
                        Ok(rec) => read_records += rec,
                        Err(error) => return Some(Err(error.into())),
                    }
                }
            }
            None => {
                if let Err(error) = self.array_reader.read_records(self.batch_size) {
                    return Some(Err(error.into()));
                }
            }
        };

        match self.array_reader.consume_batch() {
            Err(error) => Some(Err(error.into())),
            Ok(array) => {
                let struct_array = array.as_struct_opt().ok_or_else(|| {
                    ArrowError::ParquetError(
                        "Struct array reader should return struct array".to_string(),
                    )
                });

                match struct_array {
                    Err(err) => Some(Err(err)),
                    Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
                }
            }
        }
    }