table/arrow_scanner.go (497 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"context"
"io"
"iter"
"strconv"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/compute/exprs"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table/internal"
"github.com/apache/iceberg-go/table/substrait"
"github.com/substrait-io/substrait-go/v3/expr"
"golang.org/x/sync/errgroup"
)
const (
ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
)
type (
positionDeletes = []*arrow.Chunked
perFilePosDeletes = map[string]positionDeletes
)
func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask, concurrency int) (perFilePosDeletes, error) {
var (
deletesPerFile = make(perFilePosDeletes)
uniqueDeletes = make(map[string]iceberg.DataFile)
err error
)
for _, t := range tasks {
for _, d := range t.DeleteFiles {
if d.ContentType() != iceberg.EntryContentPosDeletes {
continue
}
if _, ok := uniqueDeletes[d.FilePath()]; !ok {
uniqueDeletes[d.FilePath()] = d
}
}
}
if len(uniqueDeletes) == 0 {
return deletesPerFile, nil
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)
perFileChan := make(chan map[string]*arrow.Chunked, concurrency)
go func() {
defer close(perFileChan)
for _, v := range uniqueDeletes {
g.Go(func() error {
deletes, err := readDeletes(ctx, fs, v)
if deletes != nil {
perFileChan <- deletes
}
return err
})
}
err = g.Wait()
}()
for deletes := range perFileChan {
for file, arr := range deletes {
deletesPerFile[file] = append(deletesPerFile[file], arr)
}
}
return deletesPerFile, err
}
func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) (map[string]*arrow.Chunked, error) {
src, err := internal.GetFile(ctx, fs, dataFile, true)
if err != nil {
return nil, err
}
rdr, err := src.GetReader(ctx)
if err != nil {
return nil, err
}
defer rdr.Close()
tbl, err := rdr.ReadTable(ctx)
if err != nil {
return nil, err
}
defer tbl.Release()
tbl, err = array.UnifyTableDicts(compute.GetAllocator(ctx), tbl)
if err != nil {
return nil, err
}
defer tbl.Release()
filePathCol := tbl.Column(tbl.Schema().FieldIndices("file_path")[0]).Data()
posCol := tbl.Column(tbl.Schema().FieldIndices("pos")[0]).Data()
dict := filePathCol.Chunk(0).(*array.Dictionary).Dictionary().(*array.String)
results := make(map[string]*arrow.Chunked)
for i := 0; i < dict.Len(); i++ {
v := dict.Value(i)
mask, err := compute.CallFunction(ctx, "equal", nil,
compute.NewDatumWithoutOwning(filePathCol), compute.NewDatum(v))
if err != nil {
return nil, err
}
defer mask.Release()
filtered, err := compute.Filter(ctx, compute.NewDatumWithoutOwning(posCol),
mask, *compute.DefaultFilterOptions())
if err != nil {
return nil, err
}
results[v] = filtered.(*compute.ChunkedDatum).Value
}
return results, nil
}
type set[T comparable] map[T]struct{}
func combinePositionalDeletes(mem memory.Allocator, deletes set[int64], start, end int64) arrow.Array {
bldr := array.NewInt64Builder(mem)
defer bldr.Release()
for i := start; i < end; i++ {
if _, ok := deletes[i]; !ok {
bldr.Append(i)
}
}
return bldr.NewArray()
}
type recProcessFn func(arrow.Record) (arrow.Record, error)
func processPositionalDeletes(ctx context.Context, deletes set[int64]) recProcessFn {
nextIdx, mem := int64(0), compute.GetAllocator(ctx)
return func(r arrow.Record) (arrow.Record, error) {
defer r.Release()
currentIdx := nextIdx
nextIdx += r.NumRows()
indices := combinePositionalDeletes(mem, deletes, currentIdx, nextIdx)
defer indices.Release()
out, err := compute.Take(ctx, *compute.DefaultTakeOptions(),
compute.NewDatumWithoutOwning(r), compute.NewDatumWithoutOwning(indices))
if err != nil {
return nil, err
}
return out.(*compute.RecordDatum).Value, nil
}
}
func filterRecords(ctx context.Context, recordFilter expr.Expression) recProcessFn {
return func(rec arrow.Record) (arrow.Record, error) {
defer rec.Release()
input := compute.NewDatumWithoutOwning(rec)
mask, err := exprs.ExecuteScalarExpression(ctx, rec.Schema(), recordFilter, input)
if err != nil {
return nil, err
}
defer mask.Release()
result, err := compute.Filter(ctx, input, mask, *compute.DefaultFilterOptions())
if err != nil {
return nil, err
}
return result.(*compute.RecordDatum).Value, nil
}
}
type arrowScan struct {
fs iceio.IO
metadata Metadata
projectedSchema *iceberg.Schema
boundRowFilter iceberg.BooleanExpression
caseSensitive bool
rowLimit int64
options iceberg.Properties
useLargeTypes bool
concurrency int
nameMapping iceberg.NameMapping
}
func (as *arrowScan) projectedFieldIDs() (set[int], error) {
idset := set[int]{}
for _, id := range as.projectedSchema.FieldIDs() {
typ, _ := as.projectedSchema.FindTypeByID(id)
switch typ.(type) {
case *iceberg.MapType, *iceberg.ListType:
default:
idset[id] = struct{}{}
}
}
if as.boundRowFilter != nil {
extracted, err := iceberg.ExtractFieldIDs(as.boundRowFilter)
if err != nil {
return nil, err
}
for _, id := range extracted {
idset[id] = struct{}{}
}
}
return idset, nil
}
type enumeratedRecord struct {
Record internal.Enumerated[arrow.Record]
Task internal.Enumerated[FileScanTask]
Err error
}
func (as *arrowScan) prepareToRead(ctx context.Context, file iceberg.DataFile) (*iceberg.Schema, []int, internal.FileReader, error) {
ids, err := as.projectedFieldIDs()
if err != nil {
return nil, nil, nil, err
}
src, err := internal.GetFile(ctx, as.fs, file, false)
if err != nil {
return nil, nil, nil, err
}
rdr, err := src.GetReader(ctx)
if err != nil {
return nil, nil, nil, err
}
fileSchema, colIndices, err := rdr.PrunedSchema(ids, as.nameMapping)
if err != nil {
rdr.Close()
return nil, nil, nil, err
}
iceSchema, err := ArrowSchemaToIceberg(fileSchema, false, as.nameMapping)
if err != nil {
rdr.Close()
return nil, nil, nil, err
}
return iceSchema, colIndices, rdr, nil
}
func (as *arrowScan) getRecordFilter(ctx context.Context, fileSchema *iceberg.Schema) (recProcessFn, bool, error) {
if as.boundRowFilter == nil || as.boundRowFilter.Equals(iceberg.AlwaysTrue{}) {
return nil, false, nil
}
translatedFilter, err := iceberg.TranslateColumnNames(as.boundRowFilter, fileSchema)
if err != nil {
return nil, false, err
}
if translatedFilter.Equals(iceberg.AlwaysFalse{}) {
return nil, true, nil
}
translatedFilter, err = iceberg.BindExpr(fileSchema, translatedFilter, as.caseSensitive)
if err != nil {
return nil, false, err
}
if !translatedFilter.Equals(iceberg.AlwaysTrue{}) {
extSet, recordFilter, err := substrait.ConvertExpr(fileSchema, translatedFilter, as.caseSensitive)
if err != nil {
return nil, false, err
}
ctx = exprs.WithExtensionIDSet(ctx, exprs.NewExtensionSetDefault(*extSet))
return filterRecords(ctx, recordFilter), false, nil
}
return nil, false, nil
}
func (as *arrowScan) processRecords(
ctx context.Context,
task internal.Enumerated[FileScanTask],
fileSchema *iceberg.Schema,
rdr internal.FileReader,
columns []int,
pipeline []recProcessFn,
out chan<- enumeratedRecord,
) (err error) {
var (
testRowGroups any
recRdr array.RecordReader
)
switch task.Value.File.FileFormat() {
case iceberg.ParquetFile:
testRowGroups, err = newParquetRowGroupStatsEvaluator(fileSchema, as.boundRowFilter, false)
if err != nil {
return err
}
}
recRdr, err = rdr.GetRecords(ctx, columns, testRowGroups)
if err != nil {
return err
}
defer recRdr.Release()
var (
idx int
prev arrow.Record
)
for recRdr.Next() {
if prev != nil {
out <- enumeratedRecord{Record: internal.Enumerated[arrow.Record]{
Value: prev, Index: idx, Last: false,
}, Task: task}
idx++
}
prev = recRdr.Record()
prev.Retain()
for _, f := range pipeline {
prev, err = f(prev)
if err != nil {
return err
}
}
}
if prev != nil {
out <- enumeratedRecord{Record: internal.Enumerated[arrow.Record]{
Value: prev, Index: idx, Last: true,
}, Task: task}
}
if recRdr.Err() != nil && recRdr.Err() != io.EOF {
err = recRdr.Err()
}
return err
}
func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerated[FileScanTask], out chan<- enumeratedRecord, positionalDeletes positionDeletes) (err error) {
defer func() {
if err != nil {
out <- enumeratedRecord{Task: task, Err: err}
}
}()
var (
rdr internal.FileReader
iceSchema *iceberg.Schema
colIndices []int
filterFunc recProcessFn
dropFile bool
)
iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
if err != nil {
return
}
defer rdr.Close()
pipeline := make([]recProcessFn, 0, 2)
if len(positionalDeletes) > 0 {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
for _, a := range chunk.Chunks() {
for _, v := range a.(*array.Int64).Int64Values() {
deletes[v] = struct{}{}
}
}
}
pipeline = append(pipeline, processPositionalDeletes(ctx, deletes))
}
filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
if err != nil {
return
}
if dropFile {
var emptySchema *arrow.Schema
emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil, false, as.useLargeTypes)
if err != nil {
return err
}
out <- enumeratedRecord{Task: task, Record: internal.Enumerated[arrow.Record]{
Value: array.NewRecord(emptySchema, nil, 0), Index: 0, Last: true,
}}
return
}
if filterFunc != nil {
pipeline = append(pipeline, filterFunc)
}
pipeline = append(pipeline, func(r arrow.Record) (arrow.Record, error) {
defer r.Release()
return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r, false, false, as.useLargeTypes)
})
err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out)
return
}
func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.Record, error] {
isBeforeAny := func(batch enumeratedRecord) bool {
return batch.Task.Index < 0
}
sequenced := internal.MakeSequencedChan(uint(numWorkers), records,
func(left, right *enumeratedRecord) bool {
switch {
case isBeforeAny(*left):
return true
case isBeforeAny(*right):
return false
case left.Err != nil || right.Err != nil:
return true
case left.Task.Index == right.Task.Index:
return left.Record.Index < right.Record.Index
default:
return left.Task.Index < right.Task.Index
}
}, func(prev, next *enumeratedRecord) bool {
switch {
case isBeforeAny(*prev):
return next.Task.Index == 0 && next.Record.Index == 0
case next.Err != nil:
return true
case prev.Task.Index == next.Task.Index:
return next.Record.Index == prev.Record.Index+1
default:
return next.Task.Index == prev.Task.Index+1 &&
prev.Record.Last && next.Record.Index == 0
}
}, enumeratedRecord{Task: internal.Enumerated[FileScanTask]{Index: -1}})
totalRowCount := int64(0)
return func(yield func(arrow.Record, error) bool) {
defer func() {
for rec := range sequenced {
if rec.Record.Value != nil {
rec.Record.Value.Release()
}
}
for _, v := range deletesPerFile {
for _, chunk := range v {
chunk.Release()
}
}
}()
defer cancel(nil)
for {
select {
case <-ctx.Done():
if err := context.Cause(ctx); err != nil {
yield(nil, err)
}
return
case enum, ok := <-sequenced:
if !ok {
return
}
if enum.Err != nil {
yield(nil, enum.Err)
return
}
rec := enum.Record.Value
if rowLimit > 0 {
if totalRowCount >= rowLimit {
rec.Release()
return
} else if totalRowCount+rec.NumRows() > rowLimit {
defer rec.Release()
rec = rec.NewSlice(0, rowLimit-totalRowCount)
}
}
if rec.NumRows() == 0 {
// skip empty records
continue
}
if !yield(rec, nil) {
return
}
totalRowCount += rec.NumRows()
if rowLimit > 0 && totalRowCount >= rowLimit {
return
}
}
}
}
}
func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks []FileScanTask, deletesPerFile perFilePosDeletes) iter.Seq2[arrow.Record, error] {
extSet := substrait.NewExtensionSet()
as.nameMapping = as.metadata.NameMapping()
ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx, extSet))
taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))
// numWorkers := 1
numWorkers := min(as.concurrency, len(tasks))
records := make(chan enumeratedRecord, numWorkers)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-taskChan:
if !ok {
return
}
if err := as.recordsFromTask(ctx, task, records,
deletesPerFile[task.Value.File.FilePath()]); err != nil {
cancel(err)
return
}
}
}
}()
}
go func() {
for i, t := range tasks {
taskChan <- internal.Enumerated[FileScanTask]{
Value: t, Index: i, Last: i == len(tasks)-1,
}
}
close(taskChan)
wg.Wait()
close(records)
}()
return createIterator(ctx, uint(numWorkers), records, deletesPerFile,
cancel, as.rowLimit)
}
func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask) (*arrow.Schema, iter.Seq2[arrow.Record, error], error) {
var err error
as.useLargeTypes, err = strconv.ParseBool(as.options.Get(ScanOptionArrowUseLargeTypes, "false"))
if err != nil {
as.useLargeTypes = false
}
resultSchema, err := SchemaToArrowSchema(as.projectedSchema, nil, false, as.useLargeTypes)
if err != nil {
return nil, nil, err
}
if as.rowLimit == 0 {
return resultSchema, func(yield func(arrow.Record, error) bool) {}, nil
}
deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks, as.concurrency)
if err != nil {
return nil, nil, err
}
return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks, deletesPerFile), nil
}