table/table.go (287 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package table import ( "context" "fmt" "iter" "runtime" "slices" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/internal" "github.com/apache/iceberg-go/io" tblutils "github.com/apache/iceberg-go/table/internal" "golang.org/x/sync/errgroup" ) type Identifier = []string type CatalogIO interface { LoadTable(context.Context, Identifier, iceberg.Properties) (*Table, error) CommitTable(context.Context, *Table, []Requirement, []Update) (Metadata, string, error) } type Table struct { identifier Identifier metadata Metadata metadataLocation string fs io.IO cat CatalogIO } func (t Table) Equals(other Table) bool { return slices.Equal(t.identifier, other.identifier) && t.metadataLocation == other.metadataLocation && t.metadata.Equals(other.metadata) } func (t Table) Identifier() Identifier { return t.identifier } func (t Table) Metadata() Metadata { return t.metadata } func (t Table) MetadataLocation() string { return t.metadataLocation } func (t Table) FS() io.IO { return t.fs } func (t Table) Schema() *iceberg.Schema { return t.metadata.CurrentSchema() } func (t Table) Spec() iceberg.PartitionSpec { return t.metadata.PartitionSpec() } func (t Table) SortOrder() SortOrder { return t.metadata.SortOrder() } func (t Table) Properties() iceberg.Properties { return t.metadata.Properties() } func (t Table) NameMapping() iceberg.NameMapping { return t.metadata.NameMapping() } func (t Table) Location() string { return t.metadata.Location() } func (t Table) CurrentSnapshot() *Snapshot { return t.metadata.CurrentSnapshot() } func (t Table) SnapshotByID(id int64) *Snapshot { return t.metadata.SnapshotByID(id) } func (t Table) SnapshotByName(name string) *Snapshot { return t.metadata.SnapshotByName(name) } func (t Table) Schemas() map[int]*iceberg.Schema { m := make(map[int]*iceberg.Schema) for _, s := range t.metadata.Schemas() { m[s.ID] = s } return m } func (t Table) LocationProvider() (LocationProvider, error) { return LoadLocationProvider(t.metadata.Location(), t.metadata.Properties()) } func (t Table) NewTransaction() *Transaction { meta, _ := MetadataBuilderFromBase(t.metadata) return &Transaction{ tbl: &t, meta: meta, reqs: []Requirement{}, } } // AppendTable is a shortcut for NewTransaction().AppendTable() and then committing the transaction func (t Table) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) (*Table, error) { txn := t.NewTransaction() if err := txn.AppendTable(ctx, tbl, batchSize, snapshotProps); err != nil { return nil, err } return txn.Commit(ctx) } // Append is a shortcut for NewTransaction().Append() and then committing the transaction func (t Table) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties) (*Table, error) { txn := t.NewTransaction() if err := txn.Append(ctx, rdr, snapshotProps); err != nil { return nil, err } return txn.Commit(ctx) } func (t Table) AllManifests() iter.Seq2[iceberg.ManifestFile, error] { type list = tblutils.Enumerated[[]iceberg.ManifestFile] g := errgroup.Group{} n := len(t.metadata.Snapshots()) ch := make(chan list, n) for i, sn := range t.metadata.Snapshots() { g.Go(func() error { manifests, err := sn.Manifests(t.fs) if err != nil { return err } ch <- list{Index: i, Value: manifests, Last: i == n-1} return nil }) } errch := make(chan error, 1) go func() { defer close(errch) defer close(ch) if err := g.Wait(); err != nil { errch <- err } }() results := tblutils.MakeSequencedChan(uint(n), ch, func(left, right *list) bool { switch { case left.Index < 0: return true case right.Index < 0: return false default: return left.Index < right.Index } }, func(prev, next *list) bool { if prev.Index < 0 { return next.Index == 0 } return next.Index == prev.Index+1 }, list{Index: -1}) return func(yield func(iceberg.ManifestFile, error) bool) { defer func() { // drain channels if we exited early go func() { for range results { } for range errch { } }() }() for { select { case err := <-errch: if err != nil { yield(nil, err) return } case next, ok := <-results: for _, mf := range next.Value { if !yield(mf, nil) { return } } if next.Last || !ok { return } } } } } func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requirement) (*Table, error) { newMeta, newLoc, err := t.cat.CommitTable(ctx, &t, reqs, updates) if err != nil { return nil, err } if err := deleteOldMetadata(t.fs, t.metadata, newMeta); err != nil { return nil, err } return New(t.identifier, newMeta, newLoc, t.fs, t.cat), nil } func getFiles(it iter.Seq[MetadataLogEntry]) iter.Seq[string] { return func(yield func(string) bool) { next, stop := iter.Pull(it) defer stop() for { entry, ok := next() if !ok { return } if !yield(entry.MetadataFile) { return } } } } func deleteOldMetadata(fs io.IO, baseMeta, newMeta Metadata) error { deleteAfterCommit := newMeta.Properties().GetBool(MetadataDeleteAfterCommitEnabledKey, MetadataDeleteAfterCommitEnabledDefault) if deleteAfterCommit { removedPrevious := slices.Collect(getFiles(baseMeta.PreviousFiles())) currentMetadata := slices.Collect(getFiles(newMeta.PreviousFiles())) toRemove := internal.Difference(removedPrevious, currentMetadata) for _, file := range toRemove { if err := fs.Remove(file); err != nil { return fmt.Errorf("failed to delete old metadata file %s: %w", file, err) } } } return nil } type ScanOption func(*Scan) func noopOption(*Scan) {} func WithSelectedFields(fields ...string) ScanOption { if len(fields) == 0 || slices.Contains(fields, "*") { return noopOption } return func(scan *Scan) { scan.selectedFields = fields } } func WithRowFilter(e iceberg.BooleanExpression) ScanOption { if e == nil || e.Equals(iceberg.AlwaysTrue{}) { return noopOption } return func(scan *Scan) { scan.rowFilter = e } } func WithSnapshotID(n int64) ScanOption { if n == 0 { return noopOption } return func(scan *Scan) { scan.snapshotID = &n } } func WithCaseSensitive(b bool) ScanOption { return func(scan *Scan) { scan.caseSensitive = b } } func WithLimit(n int64) ScanOption { if n < 0 { return noopOption } return func(scan *Scan) { scan.limit = n } } // WitMaxConcurrency sets the maximum concurrency for table scan and plan // operations. When unset it defaults to runtime.GOMAXPROCS. func WitMaxConcurrency(n int) ScanOption { if n <= 0 { return noopOption } return func(scan *Scan) { scan.concurrency = n } } func WithOptions(opts iceberg.Properties) ScanOption { if opts == nil { return noopOption } return func(scan *Scan) { scan.options = opts } } func (t Table) Scan(opts ...ScanOption) *Scan { s := &Scan{ metadata: t.metadata, io: t.fs, rowFilter: iceberg.AlwaysTrue{}, selectedFields: []string{"*"}, caseSensitive: true, limit: ScanNoLimit, concurrency: runtime.GOMAXPROCS(0), } for _, opt := range opts { opt(s) } s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection) return s } func New(ident Identifier, meta Metadata, metadataLocation string, fs io.IO, cat CatalogIO) *Table { return &Table{ identifier: ident, metadata: meta, metadataLocation: metadataLocation, fs: fs, cat: cat, } } func NewFromLocation(ident Identifier, metalocation string, fsys io.IO, cat CatalogIO) (*Table, error) { var meta Metadata if rf, ok := fsys.(io.ReadFileIO); ok { data, err := rf.ReadFile(metalocation) if err != nil { return nil, err } if meta, err = ParseMetadataBytes(data); err != nil { return nil, err } } else { f, err := fsys.Open(metalocation) if err != nil { return nil, err } defer f.Close() if meta, err = ParseMetadata(f); err != nil { return nil, err } } return New(ident, meta, metalocation, fsys, cat), nil }