table/scanner.go (366 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"cmp"
"context"
"errors"
"fmt"
"iter"
"slices"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"golang.org/x/sync/errgroup"
)
const ScanNoLimit = -1
type keyDefaultMap[K comparable, V any] struct {
defaultFactory func(K) V
data map[K]V
mx sync.RWMutex
}
func (k *keyDefaultMap[K, V]) Get(key K) V {
k.mx.RLock()
if v, ok := k.data[key]; ok {
k.mx.RUnlock()
return v
}
k.mx.RUnlock()
k.mx.Lock()
defer k.mx.Unlock()
v := k.defaultFactory(key)
k.data[key] = v
return v
}
func newKeyDefaultMap[K comparable, V any](factory func(K) V) *keyDefaultMap[K, V] {
return &keyDefaultMap[K, V]{
data: make(map[K]V),
defaultFactory: factory,
}
}
func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error)) *keyDefaultMap[K, V] {
return &keyDefaultMap[K, V]{
data: make(map[K]V),
defaultFactory: func(k K) V {
v, err := factory(k)
if err != nil {
panic(err)
}
return v
},
}
}
type partitionRecord []any
func (p partitionRecord) Size() int { return len(p) }
func (p partitionRecord) Get(pos int) any { return p[pos] }
func (p partitionRecord) Set(pos int, val any) { p[pos] = val }
// manifestEntries holds the data and positional delete entries read from manifests.
type manifestEntries struct {
dataEntries []iceberg.ManifestEntry
positionalDeleteEntries []iceberg.ManifestEntry
mu sync.Mutex
}
func newManifestEntries() *manifestEntries {
return &manifestEntries{
dataEntries: make([]iceberg.ManifestEntry, 0),
positionalDeleteEntries: make([]iceberg.ManifestEntry, 0),
}
}
func (m *manifestEntries) addDataEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.dataEntries = append(m.dataEntries, e)
}
func (m *manifestEntries) addPositionalDeleteEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.positionalDeleteEntries = append(m.positionalDeleteEntries, e)
}
func getPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.StructType) partitionRecord {
partitionData := dataFile.Partition()
out := make(partitionRecord, len(partitionType.FieldList))
for i, f := range partitionType.FieldList {
out[i] = partitionData[f.Name]
}
return out
}
func openManifest(io io.IO, manifest iceberg.ManifestFile,
partitionFilter, metricsEval func(iceberg.DataFile) (bool, error),
) ([]iceberg.ManifestEntry, error) {
entries, err := manifest.FetchEntries(io, true)
if err != nil {
return nil, err
}
out := make([]iceberg.ManifestEntry, 0, len(entries))
for _, entry := range entries {
p, err := partitionFilter(entry.DataFile())
if err != nil {
return nil, err
}
m, err := metricsEval(entry.DataFile())
if err != nil {
return nil, err
}
if p && m {
out = append(out, entry)
}
}
return out, nil
}
type Scan struct {
metadata Metadata
io io.IO
rowFilter iceberg.BooleanExpression
selectedFields []string
caseSensitive bool
snapshotID *int64
options iceberg.Properties
limit int64
partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
concurrency int
}
func (scan *Scan) UseRowLimit(n int64) *Scan {
out := *scan
out.limit = n
return &out
}
func (scan *Scan) UseRef(name string) (*Scan, error) {
if scan.snapshotID != nil {
return nil, fmt.Errorf("%w: cannot override ref, already set snapshot id %d",
iceberg.ErrInvalidArgument, *scan.snapshotID)
}
if snap := scan.metadata.SnapshotByName(name); snap != nil {
out := *scan
out.snapshotID = &snap.SnapshotID
out.partitionFilters = newKeyDefaultMapWrapErr(out.buildPartitionProjection)
return &out, nil
}
return nil, fmt.Errorf("%w: cannot scan unknown ref=%s", iceberg.ErrInvalidArgument, name)
}
func (scan *Scan) Snapshot() *Snapshot {
if scan.snapshotID != nil {
return scan.metadata.SnapshotByID(*scan.snapshotID)
}
return scan.metadata.CurrentSnapshot()
}
func (scan *Scan) Projection() (*iceberg.Schema, error) {
curSchema := scan.metadata.CurrentSchema()
if scan.snapshotID != nil {
snap := scan.metadata.SnapshotByID(*scan.snapshotID)
if snap == nil {
return nil, fmt.Errorf("%w: snapshot not found: %d", ErrInvalidOperation, *scan.snapshotID)
}
if snap.SchemaID != nil {
for _, schema := range scan.metadata.Schemas() {
if schema.ID == *snap.SchemaID {
curSchema = schema
break
}
}
}
}
if slices.Contains(scan.selectedFields, "*") {
return curSchema, nil
}
return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
}
func (scan *Scan) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) {
project := newInclusiveProjection(scan.metadata.CurrentSchema(),
scan.metadata.PartitionSpecs()[specID], true)
return project(scan.rowFilter)
}
func (scan *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) (bool, error), error) {
spec := scan.metadata.PartitionSpecs()[specID]
return newManifestEvaluator(spec, scan.metadata.CurrentSchema(),
scan.partitionFilters.Get(specID), scan.caseSensitive)
}
func (scan *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile) (bool, error) {
spec := scan.metadata.PartitionSpecs()[specID]
partType := spec.PartitionType(scan.metadata.CurrentSchema())
partSchema := iceberg.NewSchema(0, partType.FieldList...)
partExpr := scan.partitionFilters.Get(specID)
return func(d iceberg.DataFile) (bool, error) {
fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr, scan.caseSensitive)
if err != nil {
return false, err
}
return fn(getPartitionRecord(d, partType))
}
}
func (scan *Scan) checkSequenceNumber(minSeqNum int64, manifest iceberg.ManifestFile) bool {
return manifest.ManifestContent() == iceberg.ManifestContentData ||
(manifest.ManifestContent() == iceberg.ManifestContentDeletes &&
manifest.SequenceNum() >= minSeqNum)
}
func minSequenceNum(manifests []iceberg.ManifestFile) int64 {
n := int64(0)
for _, m := range manifests {
if m.ManifestContent() == iceberg.ManifestContentData {
n = min(n, m.MinSequenceNum())
}
}
return n
}
func matchDeletesToData(entry iceberg.ManifestEntry, positionalDeletes []iceberg.ManifestEntry) ([]iceberg.DataFile, error) {
idx, _ := slices.BinarySearchFunc(positionalDeletes, entry, func(me1, me2 iceberg.ManifestEntry) int {
return cmp.Compare(me1.SequenceNum(), me2.SequenceNum())
})
evaluator, err := newInclusiveMetricsEvaluator(iceberg.PositionalDeleteSchema,
iceberg.EqualTo(iceberg.Reference("file_path"), entry.DataFile().FilePath()), true, false)
if err != nil {
return nil, err
}
out := make([]iceberg.DataFile, 0)
for _, relevant := range positionalDeletes[idx:] {
df := relevant.DataFile()
ok, err := evaluator(df)
if err != nil {
return nil, err
}
if ok {
out = append(out, df)
}
}
return out, nil
}
// fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
// fetches its manifest files, and applies partition-spec filters to remove irrelevant manifests.
func (scan *Scan) fetchPartitionSpecFilteredManifests() ([]iceberg.ManifestFile, error) {
snap := scan.Snapshot()
if snap == nil {
return nil, nil
}
// Fetch all manifests for the current snapshot.
manifestList, err := snap.Manifests(scan.io)
if err != nil {
return nil, err
}
// Build per-spec manifest evaluators and filter out irrelevant manifests.
manifestEvaluators := newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
manifestList = slices.DeleteFunc(manifestList, func(mf iceberg.ManifestFile) bool {
eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
use, err := eval(mf)
return !use || err != nil
})
return manifestList, nil
}
// collectManifestEntries concurrently opens manifests, applies partition and metrics
// filters, and accumulates both data entries and positional-delete entries.
func (scan *Scan) collectManifestEntries(
ctx context.Context,
manifestList []iceberg.ManifestFile,
) (*manifestEntries, error) {
metricsEval, err := newInclusiveMetricsEvaluator(
scan.metadata.CurrentSchema(),
scan.rowFilter,
scan.caseSensitive,
scan.options["include_empty_files"] == "true",
)
if err != nil {
return nil, err
}
minSeqNum := minSequenceNum(manifestList)
concurrencyLimit := min(scan.concurrency, len(manifestList))
entries := newManifestEntries()
g, _ := errgroup.WithContext(ctx)
g.SetLimit(concurrencyLimit)
partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
for _, mf := range manifestList {
if !scan.checkSequenceNumber(minSeqNum, mf) {
continue
}
g.Go(func() error {
partEval := partitionEvaluators.Get(int(mf.PartitionSpecID()))
manifestEntries, err := openManifest(scan.io, mf, partEval, metricsEval)
if err != nil {
return err
}
for _, e := range manifestEntries {
df := e.DataFile()
switch df.ContentType() {
case iceberg.EntryContentData:
entries.addDataEntry(e)
case iceberg.EntryContentPosDeletes:
entries.addPositionalDeleteEntry(e)
case iceberg.EntryContentEqDeletes:
return errors.New("iceberg-go does not yet support equality deletes")
default:
return fmt.Errorf("%w: unknown DataFileContent type (%s): %s",
ErrInvalidMetadata, df.ContentType(), e)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return entries, nil
}
// PlanFiles orchestrates the fetching and filtering of manifests, and then
// building a list of FileScanTasks that match the current Scan criteria.
func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
// Step 1: Retrieve filtered manifests based on snapshot and partition specs.
manifestList, err := scan.fetchPartitionSpecFilteredManifests()
if err != nil || len(manifestList) == 0 {
return nil, err
}
// Step 2: Read manifest entries concurrently, accumulating data and positional deletes.
entries, err := scan.collectManifestEntries(ctx, manifestList)
if err != nil {
return nil, err
}
// Step 3: Sort positional deletes and match them to data files.
slices.SortFunc(entries.positionalDeleteEntries, func(a, b iceberg.ManifestEntry) int {
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
})
results := make([]FileScanTask, 0, len(entries.dataEntries))
for _, e := range entries.dataEntries {
deleteFiles, err := matchDeletesToData(e, entries.positionalDeleteEntries)
if err != nil {
return nil, err
}
results = append(results, FileScanTask{
File: e.DataFile(),
DeleteFiles: deleteFiles,
Start: 0,
Length: e.DataFile().FileSizeBytes(),
})
}
return results, nil
}
type FileScanTask struct {
File iceberg.DataFile
DeleteFiles []iceberg.DataFile
Start, Length int64
}
// ToArrowRecords returns the arrow schema of the expected records and an interator
// that can be used with a range expression to read the records as they are available.
// If an error is encountered, during the planning and setup then this will return the
// error directly. If the error occurs while iterating the records, it will be returned
// by the iterator.
//
// The purpose for returning the schema up front is to handle the case where there are no
// rows returned. The resulting Arrow Schema of the projection will still be known.
func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, iter.Seq2[arrow.Record, error], error) {
tasks, err := scan.PlanFiles(ctx)
if err != nil {
return nil, nil, err
}
var boundFilter iceberg.BooleanExpression
if scan.rowFilter != nil {
boundFilter, err = iceberg.BindExpr(scan.metadata.CurrentSchema(), scan.rowFilter, scan.caseSensitive)
if err != nil {
return nil, nil, err
}
}
schema, err := scan.Projection()
if err != nil {
return nil, nil, err
}
return (&arrowScan{
metadata: scan.metadata,
fs: scan.io,
projectedSchema: schema,
boundRowFilter: boundFilter,
caseSensitive: scan.caseSensitive,
rowLimit: scan.limit,
options: scan.options,
concurrency: scan.concurrency,
}).GetRecords(ctx, tasks)
}
// ToArrowTable calls ToArrowRecords and then gathers all of the records together
// and returns an arrow.Table make from those records.
func (scan *Scan) ToArrowTable(ctx context.Context) (arrow.Table, error) {
schema, itr, err := scan.ToArrowRecords(ctx)
if err != nil {
return nil, err
}
records := make([]arrow.Record, 0)
for rec, err := range itr {
if err != nil {
return nil, err
}
defer rec.Release()
records = append(records, rec)
}
return array.NewTableFromRecords(schema, records), nil
}