in table/arrow_scanner.go [456:554]
func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.Record, error] {
isBeforeAny := func(batch enumeratedRecord) bool {
return batch.Task.Index < 0
}
sequenced := internal.MakeSequencedChan(uint(numWorkers), records,
func(left, right *enumeratedRecord) bool {
switch {
case isBeforeAny(*left):
return true
case isBeforeAny(*right):
return false
case left.Err != nil || right.Err != nil:
return true
case left.Task.Index == right.Task.Index:
return left.Record.Index < right.Record.Index
default:
return left.Task.Index < right.Task.Index
}
}, func(prev, next *enumeratedRecord) bool {
switch {
case isBeforeAny(*prev):
return next.Task.Index == 0 && next.Record.Index == 0
case next.Err != nil:
return true
case prev.Task.Index == next.Task.Index:
return next.Record.Index == prev.Record.Index+1
default:
return next.Task.Index == prev.Task.Index+1 &&
prev.Record.Last && next.Record.Index == 0
}
}, enumeratedRecord{Task: internal.Enumerated[FileScanTask]{Index: -1}})
totalRowCount := int64(0)
return func(yield func(arrow.Record, error) bool) {
defer func() {
for rec := range sequenced {
if rec.Record.Value != nil {
rec.Record.Value.Release()
}
}
for _, v := range deletesPerFile {
for _, chunk := range v {
chunk.Release()
}
}
}()
defer cancel(nil)
for {
select {
case <-ctx.Done():
if err := context.Cause(ctx); err != nil {
yield(nil, err)
}
return
case enum, ok := <-sequenced:
if !ok {
return
}
if enum.Err != nil {
yield(nil, enum.Err)
return
}
rec := enum.Record.Value
if rowLimit > 0 {
if totalRowCount >= rowLimit {
rec.Release()
return
} else if totalRowCount+rec.NumRows() > rowLimit {
defer rec.Release()
rec = rec.NewSlice(0, rowLimit-totalRowCount)
}
}
if rec.NumRows() == 0 {
// skip empty records
continue
}
if !yield(rec, nil) {
return
}
totalRowCount += rec.NumRows()
if rowLimit > 0 && totalRowCount >= rowLimit {
return
}
}
}
}
}