in table/arrow_scanner.go [388:454]
func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerated[FileScanTask], out chan<- enumeratedRecord, positionalDeletes positionDeletes) (err error) {
defer func() {
if err != nil {
out <- enumeratedRecord{Task: task, Err: err}
}
}()
var (
rdr internal.FileReader
iceSchema *iceberg.Schema
colIndices []int
filterFunc recProcessFn
dropFile bool
)
iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
if err != nil {
return
}
defer rdr.Close()
pipeline := make([]recProcessFn, 0, 2)
if len(positionalDeletes) > 0 {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
for _, a := range chunk.Chunks() {
for _, v := range a.(*array.Int64).Int64Values() {
deletes[v] = struct{}{}
}
}
}
pipeline = append(pipeline, processPositionalDeletes(ctx, deletes))
}
filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
if err != nil {
return
}
if dropFile {
var emptySchema *arrow.Schema
emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil, false, as.useLargeTypes)
if err != nil {
return err
}
out <- enumeratedRecord{Task: task, Record: internal.Enumerated[arrow.Record]{
Value: array.NewRecord(emptySchema, nil, 0), Index: 0, Last: true,
}}
return
}
if filterFunc != nil {
pipeline = append(pipeline, filterFunc)
}
pipeline = append(pipeline, func(r arrow.Record) (arrow.Record, error) {
defer r.Release()
return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r, false, false, as.useLargeTypes)
})
err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out)
return
}