table/snapshot_producers.go (527 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package table import ( "fmt" "io" "maps" "slices" "sync/atomic" "time" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/config" "github.com/apache/iceberg-go/internal" iceio "github.com/apache/iceberg-go/io" tblutils "github.com/apache/iceberg-go/table/internal" "github.com/google/uuid" "golang.org/x/sync/errgroup" ) type producerImpl interface { // to perform any post-processing on the manifests before writing them // to the new snapshot. This will be called as the last step // before writing a manifest list file, using the result of this function // as the final list of manifests to write. processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) // perform any processing necessary and return the list of existing // manifests that should be included in the snapshot existingManifests() ([]iceberg.ManifestFile, error) // return the deleted entries for writing delete file manifests deletedEntries() ([]iceberg.ManifestEntry, error) } func newManifestFileName(num int, commit uuid.UUID) string { return fmt.Sprintf("%s-m%d.avro", commit, num) } func newManifestListFileName(snapshotID int64, attempt int, commit uuid.UUID) string { // mimics behavior of java // https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 return fmt.Sprintf("snap-%d-%d-%s.avro", snapshotID, attempt, commit) } func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) prod.producerImpl = &fastAppendFiles{base: prod} return prod } type fastAppendFiles struct { base *snapshotProducer } func (fa *fastAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { return manifests, nil } func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) { existing := make([]iceberg.ManifestFile, 0) if fa.base.parentSnapshotID > 0 { previous, err := fa.base.txn.meta.SnapshotByID(fa.base.parentSnapshotID) if err != nil { return nil, fmt.Errorf("could not find parent snapshot %d", fa.base.parentSnapshotID) } manifests, err := previous.Manifests(fa.base.io) if err != nil { return nil, err } for _, m := range manifests { if m.HasAddedFiles() || m.HasExistingFiles() || m.SnapshotID() == fa.base.snapshotID { existing = append(existing, m) } } } return existing, nil } func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { // for fast appends, there are no deleted entries return nil, nil } type overwriteFiles struct { base *snapshotProducer } func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) prod.producerImpl = &overwriteFiles{base: prod} return prod } func (of *overwriteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { // no post processing return manifests, nil } func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { // determine if there are any existing manifest files existingFiles := make([]iceberg.ManifestFile, 0) snap := of.base.txn.meta.currentSnapshot() if snap == nil { return existingFiles, nil } manifestList, err := snap.Manifests(of.base.io) if err != nil { return existingFiles, err } for _, m := range manifestList { entries, err := of.base.fetchManifestEntry(m, true) if err != nil { return existingFiles, err } foundDeleted := make([]iceberg.ManifestEntry, 0) notDeleted := make([]iceberg.ManifestEntry, 0, len(entries)) for _, entry := range entries { if _, ok := of.base.deletedFiles[entry.DataFile().FilePath()]; ok { foundDeleted = append(foundDeleted, entry) } else { notDeleted = append(notDeleted, entry) } } if len(foundDeleted) == 0 { existingFiles = append(existingFiles, m) continue } if len(notDeleted) == 0 { continue } spec, err := of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID())) if err != nil { return existingFiles, err } wr, path, counter, err := of.base.newManifestWriter(*spec) if err != nil { return existingFiles, err } defer counter.W.(io.Closer).Close() for _, entry := range notDeleted { if err := wr.Existing(entry); err != nil { return existingFiles, err } } mf, err := wr.ToManifestFile(path, counter.Count) if err != nil { return existingFiles, err } existingFiles = append(existingFiles, mf) } return existingFiles, nil } func (of *overwriteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { // determine if we need to record any deleted entries // // with a full overwrite all the entries are considered deleted // with partial overwrites we have to use the predicate to evaluate // which entries are affected if of.base.parentSnapshotID <= 0 { return nil, nil } parent, err := of.base.txn.meta.SnapshotByID(of.base.parentSnapshotID) if err != nil { return nil, fmt.Errorf("%w: cannot overwrite empty table", err) } previousManifests, err := parent.Manifests(of.base.io) if err != nil { return nil, err } getEntries := func(m iceberg.ManifestFile) ([]iceberg.ManifestEntry, error) { entries, err := of.base.fetchManifestEntry(m, true) if err != nil { return nil, err } result := make([]iceberg.ManifestEntry, 0, len(entries)) for _, entry := range entries { _, ok := of.base.deletedFiles[entry.DataFile().FilePath()] if ok && entry.DataFile().ContentType() == iceberg.EntryContentData { seqNum := entry.SequenceNum() result = append(result, iceberg.NewManifestEntry(iceberg.EntryStatusDELETED, &of.base.snapshotID, &seqNum, entry.FileSequenceNum(), entry.DataFile())) } } return result, nil } nWorkers := config.EnvConfig.MaxWorkers finalResult := make([]iceberg.ManifestEntry, 0, len(previousManifests)) for entries, err := range tblutils.MapExec(nWorkers, slices.Values(previousManifests), getEntries) { if err != nil { return nil, err } finalResult = append(finalResult, entries...) } return finalResult, nil } type manifestMergeManager struct { targetSizeBytes int minCountToMerge int mergeEnabled bool snap *snapshotProducer } func (m *manifestMergeManager) groupBySpec(manifests []iceberg.ManifestFile) map[int][]iceberg.ManifestFile { groups := make(map[int][]iceberg.ManifestFile) for _, m := range manifests { specid := int(m.PartitionSpecID()) group := groups[specid] groups[specid] = append(group, m) } return groups } func (m *manifestMergeManager) createManifest(specID int, bin []iceberg.ManifestFile) (iceberg.ManifestFile, error) { wr, path, counter, err := m.snap.newManifestWriter(m.snap.spec(specID)) if err != nil { return nil, err } defer counter.W.(io.Closer).Close() for _, manifest := range bin { entries, err := m.snap.fetchManifestEntry(manifest, false) if err != nil { return nil, err } for _, entry := range entries { switch { case entry.Status() == iceberg.EntryStatusDELETED && entry.SnapshotID() == m.snap.snapshotID: // only files deleted by this snapshot should be added to the new manifest wr.Delete(entry) case entry.Status() == iceberg.EntryStatusADDED && entry.SnapshotID() == m.snap.snapshotID: // added entries from this snapshot are still added, otherwise they should be existing wr.Add(entry) case entry.Status() != iceberg.EntryStatusDELETED: // add all non-deleted files from the old manifest as existing files wr.Existing(entry) } } } return wr.ToManifestFile(path, counter.Count) } func (m *manifestMergeManager) mergeGroup(firstManifest iceberg.ManifestFile, specID int, manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { packer := internal.SlicePacker[iceberg.ManifestFile]{ TargetWeight: int64(m.targetSizeBytes), Lookback: 1, LargestBinFirst: false, } bins := packer.PackEnd(manifests, func(m iceberg.ManifestFile) int64 { return m.Length() }) mergeBin := func(bin []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { output := make([]iceberg.ManifestFile, 0, 1) if len(bin) == 1 { output = append(output, bin[0]) } else if len(bin) < m.minCountToMerge && slices.ContainsFunc(bin, func(m iceberg.ManifestFile) bool { return m == firstManifest }) { // if the bin has the first manifest (the new data files or an appended // manifest file) then only merge it if the number of manifests is above // the minimum count. this is applied only to bins with an in-memory manifest // so that large manifests don't prevent merging older groups output = append(output, bin...) } else { created, err := m.createManifest(specID, bin) if err != nil { return nil, err } output = append(output, created) } return output, nil } binResults := make([][]iceberg.ManifestFile, len(bins)) g := errgroup.Group{} for i, bin := range bins { i, bin := i, bin g.Go(func() error { var err error binResults[i], err = mergeBin(bin) return err }) } if err := g.Wait(); err != nil { return nil, err } return slices.Concat(binResults...), nil } func (m *manifestMergeManager) mergeManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { if !m.mergeEnabled || len(manifests) == 0 { return manifests, nil } first := manifests[0] groups := m.groupBySpec(manifests) merged := make([]iceberg.ManifestFile, 0, len(groups)) for _, specID := range slices.Backward(slices.Sorted(maps.Keys(groups))) { manifests, err := m.mergeGroup(first, specID, groups[specID]) if err != nil { return nil, err } merged = append(merged, manifests...) } return merged, nil } type mergeAppendFiles struct { fastAppendFiles targetSizeBytes int minCountToMerge int mergeEnabled bool } func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) prod.producerImpl = &mergeAppendFiles{ fastAppendFiles: fastAppendFiles{base: prod}, targetSizeBytes: txn.meta.props.GetInt(ManifestTargetSizeBytesKey, ManifestTargetSizeBytesDefault), minCountToMerge: txn.meta.props.GetInt(ManifestMinMergeCountKey, ManifestMinMergeCountDefault), mergeEnabled: txn.meta.props.GetBool(ManifestMergeEnabledKey, ManifestMergeEnabledDefault), } return prod } func (m *mergeAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { unmergedDataManifests, unmergedDeleteManifests := []iceberg.ManifestFile{}, []iceberg.ManifestFile{} for _, m := range manifests { if m.ManifestContent() == iceberg.ManifestContentData { unmergedDataManifests = append(unmergedDataManifests, m) } else if m.ManifestContent() == iceberg.ManifestContentDeletes { unmergedDeleteManifests = append(unmergedDeleteManifests, m) } } dataManifestMergeMgr := manifestMergeManager{ targetSizeBytes: m.targetSizeBytes, minCountToMerge: m.minCountToMerge, mergeEnabled: m.mergeEnabled, snap: m.base, } result, err := dataManifestMergeMgr.mergeManifests(unmergedDataManifests) if err != nil { return nil, err } return append(result, unmergedDeleteManifests...), nil } type snapshotProducer struct { producerImpl commitUuid uuid.UUID io iceio.WriteFileIO txn *Transaction op Operation snapshotID int64 parentSnapshotID int64 addedFiles []iceberg.DataFile manifestCount atomic.Int32 deletedFiles map[string]iceberg.DataFile snapshotProps iceberg.Properties } func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { var ( commit uuid.UUID parentSnapshot int64 = -1 ) if commitUUID == nil { commit = uuid.New() } else { commit = *commitUUID } if snap := txn.meta.currentSnapshot(); snap != nil { parentSnapshot = snap.SnapshotID } return &snapshotProducer{ commitUuid: commit, io: fs, txn: txn, op: op, snapshotID: txn.meta.newSnapshotID(), parentSnapshotID: parentSnapshot, addedFiles: []iceberg.DataFile{}, deletedFiles: make(map[string]iceberg.DataFile), snapshotProps: snapshotProps, } } func (sp *snapshotProducer) spec(id int) iceberg.PartitionSpec { if spec, _ := sp.txn.meta.GetSpecByID(id); spec != nil { return *spec } return iceberg.NewPartitionSpec() } func (sp *snapshotProducer) appendDataFile(df iceberg.DataFile) *snapshotProducer { sp.addedFiles = append(sp.addedFiles, df) return sp } func (sp *snapshotProducer) deleteDataFile(df iceberg.DataFile) *snapshotProducer { sp.deletedFiles[df.FilePath()] = df return sp } func (sp *snapshotProducer) newManifestWriter(spec iceberg.PartitionSpec) (*iceberg.ManifestWriter, string, *internal.CountingWriter, error) { out, path, err := sp.newManifestOutput() if err != nil { return nil, "", nil, err } counter := &internal.CountingWriter{W: out} wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter, spec, sp.txn.meta.CurrentSchema(), sp.snapshotID) if err != nil { defer out.Close() return nil, "", nil, err } return wr, path, counter, nil } func (sp *snapshotProducer) newManifestOutput() (io.WriteCloser, string, error) { provider, err := sp.txn.tbl.LocationProvider() if err != nil { return nil, "", err } fname := newManifestFileName(int(sp.manifestCount.Add(1)), sp.commitUuid) filepath := provider.NewMetadataLocation(fname) f, err := sp.io.Create(filepath) if err != nil { return nil, "", fmt.Errorf("could not create manifest file: %w", err) } return f, filepath, nil } func (sp *snapshotProducer) fetchManifestEntry(m iceberg.ManifestFile, discardDeleted bool) ([]iceberg.ManifestEntry, error) { return m.FetchEntries(sp.io, discardDeleted) } func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) { var g errgroup.Group results := [...][]iceberg.ManifestFile{nil, nil, nil} if len(sp.addedFiles) > 0 { g.Go(func() error { out, path, err := sp.newManifestOutput() if err != nil { return err } defer out.Close() counter := &internal.CountingWriter{W: out} wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter, sp.txn.meta.CurrentSpec(), sp.txn.meta.CurrentSchema(), sp.snapshotID) if err != nil { return err } for _, df := range sp.addedFiles { err := wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, nil, nil, df)) if err != nil { return err } } mf, err := wr.ToManifestFile(path, counter.Count) if err == nil { results[0] = append(results[0], mf) } return err }) } deleted, err := sp.deletedEntries() if err != nil { return nil, err } if len(deleted) > 0 { g.Go(func() error { partitionGroups := map[int][]iceberg.ManifestEntry{} for _, entry := range deleted { specid := int(entry.DataFile().SpecID()) group := partitionGroups[specid] partitionGroups[specid] = append(group, entry) } for specid, entries := range partitionGroups { out, path, err := sp.newManifestOutput() if err != nil { return err } defer out.Close() mf, err := iceberg.WriteManifest(path, out, sp.txn.meta.formatVersion, sp.spec(specid), sp.txn.meta.CurrentSchema(), sp.snapshotID, entries) if err != nil { return err } results[1] = append(results[1], mf) } return nil }) } g.Go(func() error { m, err := sp.existingManifests() if err != nil { return err } results[2] = m return nil }) if err := g.Wait(); err != nil { return nil, err } manifests := slices.Concat(results[0], results[1], results[2]) return sp.processManifests(manifests) } func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) { var ssc SnapshotSummaryCollector partitionSummaryLimit := sp.txn.meta.props. GetInt(WritePartitionSummaryLimitKey, WritePartitionSummaryLimitDefault) ssc.setPartitionSummaryLimit(partitionSummaryLimit) currentSchema := sp.txn.meta.CurrentSchema() partitionSpec := sp.txn.meta.CurrentSpec() for _, df := range sp.addedFiles { ssc.addFile(df, currentSchema, partitionSpec) } if len(sp.deletedFiles) > 0 { specs := sp.txn.meta.specs for _, df := range sp.deletedFiles { ssc.removeFile(df, currentSchema, specs[df.SpecID()]) } } var previousSnapshot *Snapshot if sp.parentSnapshotID > 0 { previousSnapshot, _ = sp.txn.meta.SnapshotByID(sp.parentSnapshotID) } var previousSummary iceberg.Properties if previousSnapshot != nil { previousSummary = previousSnapshot.Summary.Properties } summaryProps := ssc.build() maps.Copy(summaryProps, props) return updateSnapshotSummaries(Summary{ Operation: sp.op, Properties: summaryProps, }, previousSummary) } func (sp *snapshotProducer) commit() ([]Update, []Requirement, error) { newManifests, err := sp.manifests() if err != nil { return nil, nil, err } nextSequence := sp.txn.meta.nextSequenceNumber() summary, err := sp.summary(sp.snapshotProps) if err != nil { return nil, nil, err } fname := newManifestListFileName(sp.snapshotID, 0, sp.commitUuid) locProvider, err := sp.txn.tbl.LocationProvider() if err != nil { return nil, nil, err } manifestListFilePath := locProvider.NewMetadataLocation(fname) var parentSnapshot *int64 if sp.parentSnapshotID > 0 { parentSnapshot = &sp.parentSnapshotID } out, err := sp.io.Create(manifestListFilePath) if err != nil { return nil, nil, err } defer out.Close() err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, sp.snapshotID, parentSnapshot, &nextSequence, newManifests) if err != nil { return nil, nil, err } snapshot := Snapshot{ SnapshotID: sp.snapshotID, ParentSnapshotID: parentSnapshot, SequenceNumber: nextSequence, ManifestList: manifestListFilePath, Summary: &summary, SchemaID: &sp.txn.meta.currentSchemaID, TimestampMs: time.Now().UnixMilli(), } return []Update{ NewAddSnapshotUpdate(&snapshot), NewSetSnapshotRefUpdate("main", sp.snapshotID, BranchRef, -1, -1, -1), }, []Requirement{ AssertRefSnapshotID("main", sp.txn.meta.currentSnapshotID), }, nil }