in table/arrow_scanner.go [323:386]
func (as *arrowScan) processRecords(
ctx context.Context,
task internal.Enumerated[FileScanTask],
fileSchema *iceberg.Schema,
rdr internal.FileReader,
columns []int,
pipeline []recProcessFn,
out chan<- enumeratedRecord,
) (err error) {
var (
testRowGroups any
recRdr array.RecordReader
)
switch task.Value.File.FileFormat() {
case iceberg.ParquetFile:
testRowGroups, err = newParquetRowGroupStatsEvaluator(fileSchema, as.boundRowFilter, false)
if err != nil {
return err
}
}
recRdr, err = rdr.GetRecords(ctx, columns, testRowGroups)
if err != nil {
return err
}
defer recRdr.Release()
var (
idx int
prev arrow.Record
)
for recRdr.Next() {
if prev != nil {
out <- enumeratedRecord{Record: internal.Enumerated[arrow.Record]{
Value: prev, Index: idx, Last: false,
}, Task: task}
idx++
}
prev = recRdr.Record()
prev.Retain()
for _, f := range pipeline {
prev, err = f(prev)
if err != nil {
return err
}
}
}
if prev != nil {
out <- enumeratedRecord{Record: internal.Enumerated[arrow.Record]{
Value: prev, Index: idx, Last: true,
}, Task: task}
}
if recRdr.Err() != nil && recRdr.Err() != io.EOF {
err = recRdr.Err()
}
return err
}