table/snapshots.go (384 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package table import ( "encoding/json" "errors" "fmt" "iter" "maps" "slices" "strconv" "strings" "github.com/apache/iceberg-go" iceio "github.com/apache/iceberg-go/io" ) type Operation string const ( OpAppend Operation = "append" OpReplace Operation = "replace" OpOverwrite Operation = "overwrite" OpDelete Operation = "delete" ) var ( ErrInvalidOperation = errors.New("invalid operation value") ErrMissingOperation = errors.New("missing operation key") ) // ValidOperation ensures that a given string is one of the valid operation // types: append,replace,overwrite,delete func ValidOperation(s string) (Operation, error) { switch s { case "append", "replace", "overwrite", "delete": return Operation(s), nil } return "", fmt.Errorf("%w: found '%s'", ErrInvalidOperation, s) } const ( operationKey = "operation" addedDataFilesKey = "added-data-files" addedDeleteFilesKey = "added-delete-files" addedEqDeletesKey = "added-equality-deletes" addedFileSizeKey = "added-files-size" addedPosDeletesKey = "added-position-deletes" addedPosDeleteFilesKey = "added-position-delete-files" addedRecordsKey = "added-records" addedEqDeleteFilesKey = "added-equality-delete-files" deletedDataFilesKey = "deleted-data-files" deletedRecordsKey = "deleted-records" removedDeleteFilesKey = "removed-delete-files" removedEqDeletesKey = "removed-equality-deletes" removedEqDeleteFilesKey = "removed-equality-delete-files" removedFileSizeKey = "removed-files-size" removedPosDeletesKey = "removed-position-deletes" removedPosDeleteFilesKey = "removed-position-delete-files" totalEqDeletesKey = "total-equality-deletes" totalPosDeletesKey = "total-position-deletes" totalDataFilesKey = "total-data-files" totalDeleteFilesKey = "total-delete-files" totalRecordsKey = "total-records" totalFileSizeKey = "total-files-size" changedPartitionCountProp = "changed-partition-count" changedPartitionPrefix = "partitions." ) type updateMetrics struct { addedFileSize int64 removedFileSize int64 addedDataFiles int64 removedDataFiles int64 addedEqDeleteFiles int64 removedEqDeleteFiles int64 addedPosDeleteFiles int64 removedPosDeleteFiles int64 addedDeleteFiles int64 removedDeleteFiles int64 addedRecords int64 deletedRecords int64 addedPosDeletes int64 removedPosDeletes int64 addedEqDeletes int64 removedEqDeletes int64 } func (m *updateMetrics) addDataFile(df iceberg.DataFile) error { m.addedFileSize += df.FileSizeBytes() switch df.ContentType() { case iceberg.EntryContentData: m.addedDataFiles++ m.addedRecords += df.Count() case iceberg.EntryContentPosDeletes: m.addedDeleteFiles++ m.addedPosDeleteFiles++ m.addedPosDeletes += df.Count() case iceberg.EntryContentEqDeletes: m.addedDeleteFiles++ m.addedEqDeleteFiles++ m.addedEqDeletes += df.Count() default: return fmt.Errorf("unknown data file content: %s", df.ContentType()) } return nil } func (m *updateMetrics) removeFile(df iceberg.DataFile) error { m.removedFileSize += df.FileSizeBytes() switch df.ContentType() { case iceberg.EntryContentData: m.removedDataFiles++ m.deletedRecords += df.Count() case iceberg.EntryContentPosDeletes: m.removedDeleteFiles++ m.removedPosDeleteFiles++ m.removedPosDeletes += df.Count() case iceberg.EntryContentEqDeletes: m.removedDeleteFiles++ m.removedEqDeleteFiles++ m.removedEqDeletes += df.Count() default: return fmt.Errorf("unknown data file content: %s", df.ContentType()) } return nil } func setWhenPositive(props iceberg.Properties, key string, value int64) { if value > 0 { props[key] = strconv.FormatInt(value, 10) } } func (m *updateMetrics) toProps() iceberg.Properties { props := iceberg.Properties{} setWhenPositive(props, addedFileSizeKey, m.addedFileSize) setWhenPositive(props, removedFileSizeKey, m.removedFileSize) setWhenPositive(props, addedDataFilesKey, m.addedDataFiles) setWhenPositive(props, deletedDataFilesKey, m.removedDataFiles) setWhenPositive(props, addedEqDeleteFilesKey, m.addedEqDeleteFiles) setWhenPositive(props, removedEqDeleteFilesKey, m.removedEqDeleteFiles) setWhenPositive(props, addedPosDeleteFilesKey, m.addedPosDeleteFiles) setWhenPositive(props, removedPosDeleteFilesKey, m.removedPosDeleteFiles) setWhenPositive(props, addedDeleteFilesKey, m.addedDeleteFiles) setWhenPositive(props, removedDeleteFilesKey, m.removedDeleteFiles) setWhenPositive(props, addedRecordsKey, m.addedRecords) setWhenPositive(props, deletedRecordsKey, m.deletedRecords) setWhenPositive(props, addedPosDeletesKey, m.addedPosDeletes) setWhenPositive(props, removedPosDeletesKey, m.removedPosDeletes) setWhenPositive(props, addedEqDeletesKey, m.addedEqDeletes) setWhenPositive(props, removedEqDeletesKey, m.removedEqDeletes) return props } // Summary stores the summary information for a snapshot indicating // the operation that created the snapshot, and various properties // which might exist in the summary. type Summary struct { Operation Operation Properties iceberg.Properties } func (s *Summary) String() string { out := string(s.Operation) if s.Properties != nil { data, _ := json.Marshal(s.Properties) out += ", " + string(data) } return out } func (s *Summary) Equals(other *Summary) bool { if s == other { return true } if s != nil && other == nil { return false } if s.Operation != other.Operation { return false } if len(s.Properties) == 0 && len(other.Properties) == 0 { return true } return maps.Equal(s.Properties, other.Properties) } func (s *Summary) UnmarshalJSON(b []byte) (err error) { alias := map[string]string{} if err = json.Unmarshal(b, &alias); err != nil { return } op, ok := alias[operationKey] if !ok { return ErrMissingOperation } if s.Operation, err = ValidOperation(op); err != nil { return } delete(alias, operationKey) s.Properties = alias return nil } func (s *Summary) MarshalJSON() ([]byte, error) { props := maps.Clone(s.Properties) if s.Operation != "" { if props == nil { props = make(map[string]string) } props[operationKey] = string(s.Operation) } return json.Marshal(props) } type Snapshot struct { SnapshotID int64 `json:"snapshot-id"` ParentSnapshotID *int64 `json:"parent-snapshot-id,omitempty"` SequenceNumber int64 `json:"sequence-number"` TimestampMs int64 `json:"timestamp-ms"` ManifestList string `json:"manifest-list,omitempty"` Summary *Summary `json:"summary,omitempty"` SchemaID *int `json:"schema-id,omitempty"` } func (s Snapshot) String() string { var op, parent, schema string if s.Summary != nil { op = s.Summary.String() + ": " } if s.ParentSnapshotID != nil { parent = ", parent_id=" + strconv.FormatInt(*s.ParentSnapshotID, 10) } if s.SchemaID != nil { schema = ", schema_id=" + strconv.Itoa(*s.SchemaID) } return fmt.Sprintf("%sid=%d%s%s, sequence_number=%d, timestamp_ms=%d, manifest_list=%s", op, s.SnapshotID, parent, schema, s.SequenceNumber, s.TimestampMs, s.ManifestList) } func (s Snapshot) Equals(other Snapshot) bool { switch { case s.ParentSnapshotID == nil && other.ParentSnapshotID != nil: fallthrough case s.ParentSnapshotID != nil && other.ParentSnapshotID == nil: fallthrough case s.SchemaID == nil && other.SchemaID != nil: fallthrough case s.SchemaID != nil && other.SchemaID == nil: return false } return s.SnapshotID == other.SnapshotID && ((s.ParentSnapshotID == other.ParentSnapshotID) || (*s.ParentSnapshotID == *other.ParentSnapshotID)) && ((s.SchemaID == other.SchemaID) || (*s.SchemaID == *other.SchemaID)) && s.SequenceNumber == other.SequenceNumber && s.TimestampMs == other.TimestampMs && s.ManifestList == other.ManifestList && s.Summary.Equals(other.Summary) } func (s Snapshot) Manifests(fio iceio.IO) ([]iceberg.ManifestFile, error) { if s.ManifestList != "" { f, err := fio.Open(s.ManifestList) if err != nil { return nil, fmt.Errorf("could not open manifest file: %w", err) } defer f.Close() return iceberg.ReadManifestList(f) } return nil, nil } func (s Snapshot) dataFiles(fio iceio.IO, fileFilter set[iceberg.ManifestEntryContent]) iter.Seq2[iceberg.DataFile, error] { return func(yield func(iceberg.DataFile, error) bool) { manifests, err := s.Manifests(fio) if err != nil { yield(nil, err) return } for _, m := range manifests { dataFiles, err := m.FetchEntries(fio, false) if err != nil { yield(nil, err) return } for _, f := range dataFiles { if fileFilter != nil { if _, ok := fileFilter[f.DataFile().ContentType()]; !ok { continue } } if !yield(f.DataFile(), nil) { return } } } } } type MetadataLogEntry struct { MetadataFile string `json:"metadata-file"` TimestampMs int64 `json:"timestamp-ms"` } type SnapshotLogEntry struct { SnapshotID int64 `json:"snapshot-id"` TimestampMs int64 `json:"timestamp-ms"` } type SnapshotSummaryCollector struct { metrics updateMetrics partitionMetrics map[string]updateMetrics maxChangedPartitionsForSummaries int } func (s *SnapshotSummaryCollector) setPartitionSummaryLimit(limit int) { s.maxChangedPartitionsForSummaries = limit } func (s *SnapshotSummaryCollector) updatePartitionMetrics(partitionPath string, df iceberg.DataFile, isAddFile bool) error { if s.partitionMetrics == nil { s.partitionMetrics = make(map[string]updateMetrics) } metrics := s.partitionMetrics[partitionPath] if isAddFile { if err := metrics.addDataFile(df); err != nil { return err } } else { if err := metrics.removeFile(df); err != nil { return err } } s.partitionMetrics[partitionPath] = metrics return nil } func (s *SnapshotSummaryCollector) addFile(df iceberg.DataFile, sc *iceberg.Schema, spec iceberg.PartitionSpec) error { if err := s.metrics.addDataFile(df); err != nil { return err } if len(df.Partition()) > 0 { partitionPath := spec.PartitionToPath( getPartitionRecord(df, spec.PartitionType(sc)), sc) return s.updatePartitionMetrics(partitionPath, df, true) } return nil } func (s *SnapshotSummaryCollector) removeFile(df iceberg.DataFile, sc *iceberg.Schema, spec iceberg.PartitionSpec) error { if err := s.metrics.removeFile(df); err != nil { return err } if len(df.Partition()) > 0 { partitionPath := spec.PartitionToPath( getPartitionRecord(df, spec.PartitionType(sc)), sc) return s.updatePartitionMetrics(partitionPath, df, false) } return nil } func (s *SnapshotSummaryCollector) partitionSummary(metrics *updateMetrics) string { props := metrics.toProps() return strings.Join(slices.Sorted(func(yield func(s string) bool) { for k, v := range props { if !yield(fmt.Sprintf("%s=%s", k, v)) { return } } }), ",") } func (s *SnapshotSummaryCollector) build() iceberg.Properties { props := s.metrics.toProps() changedPartitionsSize := len(s.partitionMetrics) setWhenPositive(props, changedPartitionCountProp, int64(changedPartitionsSize)) if changedPartitionsSize <= s.maxChangedPartitionsForSummaries { for partPath, updateMetricsPart := range s.partitionMetrics { if summary := s.partitionSummary(&updateMetricsPart); len(summary) > 0 { props[changedPartitionPrefix+partPath] = summary } } } return props } func updateSnapshotSummaries(sum Summary, previous iceberg.Properties) (Summary, error) { switch sum.Operation { case OpAppend, OpOverwrite, OpDelete: default: return sum, fmt.Errorf("%w: operation: %s", iceberg.ErrNotImplemented, sum.Operation) } if sum.Properties == nil { sum.Properties = make(iceberg.Properties) } if previous == nil { previous = iceberg.Properties{ totalDataFilesKey: "0", totalDeleteFilesKey: "0", totalRecordsKey: "0", totalFileSizeKey: "0", totalPosDeletesKey: "0", totalEqDeletesKey: "0", } } updateTotals := func(totalProp, addedProp, removedProp string) { newTotal := previous.GetInt(totalProp, 0) newTotal += sum.Properties.GetInt(addedProp, 0) newTotal -= sum.Properties.GetInt(removedProp, 0) if newTotal >= 0 { sum.Properties[totalProp] = strconv.Itoa(newTotal) } } updateTotals(totalDataFilesKey, addedDataFilesKey, deletedDataFilesKey) updateTotals(totalDeleteFilesKey, addedDeleteFilesKey, removedDeleteFilesKey) updateTotals(totalRecordsKey, addedRecordsKey, deletedRecordsKey) updateTotals(totalFileSizeKey, addedFileSizeKey, removedFileSizeKey) updateTotals(totalPosDeletesKey, addedPosDeletesKey, removedPosDeletesKey) updateTotals(totalEqDeletesKey, addedEqDeletesKey, removedEqDeletesKey) return sum, nil }