table/transaction.go (293 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package table import ( "context" "encoding/json" "errors" "fmt" "runtime" "slices" "sync" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" "github.com/google/uuid" ) type snapshotUpdate struct { txn *Transaction io io.WriteFileIO snapshotProps iceberg.Properties } func (s snapshotUpdate) fastAppend() *snapshotProducer { return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) } func (s snapshotUpdate) mergeOverwrite(commitUUID *uuid.UUID) *snapshotProducer { op := OpOverwrite if s.txn.meta.currentSnapshot() == nil { op = OpAppend } return newOverwriteFilesProducer(op, s.txn, s.io, commitUUID, s.snapshotProps) } func (s snapshotUpdate) mergeAppend() *snapshotProducer { return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) } type Transaction struct { tbl *Table meta *MetadataBuilder reqs []Requirement mx sync.Mutex committed bool } func (t *Transaction) apply(updates []Update, reqs []Requirement) error { t.mx.Lock() defer t.mx.Unlock() if t.committed { return errors.New("transaction has already been committed") } current, err := t.meta.Build() if err != nil { return err } for _, r := range reqs { if err := r.Validate(current); err != nil { return err } } existing := map[string]struct{}{} for _, r := range t.reqs { existing[r.GetType()] = struct{}{} } for _, r := range reqs { if _, ok := existing[r.GetType()]; !ok { t.reqs = append(t.reqs, r) } } prevUpdates, prevLastUpdated := len(t.meta.updates), t.meta.lastUpdatedMS for _, u := range updates { if err := u.Apply(t.meta); err != nil { return err } } // u.Apply will add updates to t.meta.updates if they are not no-ops // and actually perform changes. So let's check if we actually had any // changes added and thus need to update the lastupdated value. if prevUpdates < len(t.meta.updates) { if prevLastUpdated == t.meta.lastUpdatedMS { t.meta.lastUpdatedMS = time.Now().UnixMilli() } } return nil } func (t *Transaction) appendSnapshotProducer(props iceberg.Properties) *snapshotProducer { manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey, ManifestMergeEnabledDefault) updateSnapshot := t.updateSnapshot(props) if manifestMerge { return updateSnapshot.mergeAppend() } return updateSnapshot.fastAppend() } func (t *Transaction) updateSnapshot(props iceberg.Properties) snapshotUpdate { return snapshotUpdate{ txn: t, io: t.tbl.fs.(io.WriteFileIO), snapshotProps: props, } } func (t *Transaction) SetProperties(props iceberg.Properties) error { if len(props) > 0 { return t.apply([]Update{NewSetPropertiesUpdate(props)}, nil) } return nil } func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { rdr := array.NewTableReader(tbl, batchSize) defer rdr.Release() return t.Append(ctx, rdr, snapshotProps) } func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties) error { appendFiles := t.appendSnapshotProducer(snapshotProps) itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ sc: rdr.Schema(), itr: array.IterFromReader(rdr), fs: t.tbl.fs.(io.WriteFileIO), writeUUID: &appendFiles.commitUuid, }) for df, err := range itr { if err != nil { return err } appendFiles.appendDataFile(df) } updates, reqs, err := appendFiles.commit() if err != nil { return err } return t.apply(updates, reqs) } // ReplaceFiles is actually just an overwrite operation with multiple // files deleted and added. // // TODO: technically, this could be a REPLACE operation but we aren't performing // any validation here that there are no changes to the underlying data. A REPLACE // operation is only valid if the data is exactly the same as the previous snapshot. // // For now, we'll keep using an overwrite operation. func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error { if len(filesToDelete) == 0 { if len(filesToAdd) > 0 { return t.AddFiles(ctx, filesToAdd, snapshotProps, false) } } var ( setToDelete = make(map[string]struct{}) setToAdd = make(map[string]struct{}) ) for _, f := range filesToDelete { setToDelete[f] = struct{}{} } for _, f := range filesToAdd { setToAdd[f] = struct{}{} } if len(setToDelete) != len(filesToDelete) { return errors.New("delete file paths must be unique for ReplaceDataFiles") } if len(setToAdd) != len(filesToAdd) { return errors.New("add file paths must be unique for ReplaceDataFiles") } s := t.meta.currentSnapshot() if s == nil { return fmt.Errorf("%w: cannot replace files in a table without an existing snapshot", ErrInvalidOperation) } markedForDeletion := make([]iceberg.DataFile, 0, len(setToDelete)) for df, err := range s.dataFiles(t.tbl.fs, nil) { if err != nil { return err } if _, ok := setToDelete[df.FilePath()]; ok { markedForDeletion = append(markedForDeletion, df) } if _, ok := setToAdd[df.FilePath()]; ok { return fmt.Errorf("cannot add files that are already referenced by table, files: %s", df.FilePath()) } } if len(markedForDeletion) != len(setToDelete) { return errors.New("cannot delete files that do not belong to the table") } if t.meta.NameMapping() == nil { nameMapping := t.meta.CurrentSchema().NameMapping() mappingJson, err := json.Marshal(nameMapping) if err != nil { return err } err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)}) if err != nil { return err } } commitUUID := uuid.New() updater := t.updateSnapshot(snapshotProps).mergeOverwrite(&commitUUID) for _, df := range markedForDeletion { updater.deleteDataFile(df) } dataFiles := filesToDataFiles(ctx, t.tbl.fs, t.meta, slices.Values(filesToAdd)) for df, err := range dataFiles { if err != nil { return err } updater.appendDataFile(df) } updates, reqs, err := updater.commit() if err != nil { return err } return t.apply(updates, reqs) } func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error { set := make(map[string]string) for _, f := range files { set[f] = f } if len(set) != len(files) { return errors.New("file paths must be unique for AddFiles") } if !ignoreDuplicates { if s := t.meta.currentSnapshot(); s != nil { referenced := make([]string, 0) for df, err := range s.dataFiles(t.tbl.fs, nil) { if err != nil { return err } if _, ok := set[df.FilePath()]; ok { referenced = append(referenced, df.FilePath()) } } if len(referenced) > 0 { return fmt.Errorf("cannot add files that are already referenced by table, files: %s", referenced) } } } if t.meta.NameMapping() == nil { nameMapping := t.meta.CurrentSchema().NameMapping() mappingJson, err := json.Marshal(nameMapping) if err != nil { return err } err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)}) if err != nil { return err } } updater := t.updateSnapshot(snapshotProps).fastAppend() dataFiles := filesToDataFiles(ctx, t.tbl.fs, t.meta, slices.Values(files)) for df, err := range dataFiles { if err != nil { return err } updater.appendDataFile(df) } updates, reqs, err := updater.commit() if err != nil { return err } return t.apply(updates, reqs) } func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { updatedMeta, err := t.meta.Build() if err != nil { return nil, err } s := &Scan{ metadata: updatedMeta, io: t.tbl.fs, rowFilter: iceberg.AlwaysTrue{}, selectedFields: []string{"*"}, caseSensitive: true, limit: ScanNoLimit, concurrency: runtime.GOMAXPROCS(0), } for _, opt := range opts { opt(s) } s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection) return s, nil } func (t *Transaction) StagedTable() (*StagedTable, error) { updatedMeta, err := t.meta.Build() if err != nil { return nil, err } return &StagedTable{Table: New(t.tbl.identifier, updatedMeta, updatedMeta.Location(), t.tbl.fs, t.tbl.cat)}, nil } func (t *Transaction) Commit(ctx context.Context) (*Table, error) { t.mx.Lock() defer t.mx.Unlock() if t.committed { return nil, errors.New("transaction has already been committed") } t.committed = true if len(t.meta.updates) > 0 { t.reqs = append(t.reqs, AssertTableUUID(t.meta.uuid)) return t.tbl.doCommit(ctx, t.meta.updates, t.reqs) } return t.tbl, nil } type StagedTable struct { *Table } func (s *StagedTable) Refresh(ctx context.Context) (*Table, error) { return nil, fmt.Errorf("%w: cannot refresh a staged table", ErrInvalidOperation) } func (s *StagedTable) Scan(opts ...ScanOption) *Scan { panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation)) }