table/metadata.go (969 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"maps"
"slices"
"strconv"
"time"
"github.com/apache/iceberg-go"
"github.com/google/uuid"
)
const (
partitionFieldStartID = 1000
supportedTableFormatVersion = 2
)
func generateSnapshotID() int64 {
var (
rndUUID = uuid.New()
out [8]byte
)
for i := range 8 {
lhs, rhs := rndUUID[i], rndUUID[i+8]
out[i] = lhs ^ rhs
}
snapshotID := int64(binary.LittleEndian.Uint64(out[:]))
if snapshotID < 0 {
snapshotID = -snapshotID
}
return snapshotID
}
// Metadata for an iceberg table as specified in the Iceberg spec
//
// https://iceberg.apache.org/spec/#iceberg-table-spec
type Metadata interface {
// Version indicates the version of this metadata, 1 for V1, 2 for V2, etc.
Version() int
// TableUUID returns a UUID that identifies the table, generated when the
// table is created. Implementations must throw an exception if a table's
// UUID does not match the expected UUID after refreshing metadata.
TableUUID() uuid.UUID
// Location is the table's base location. This is used by writers to determine
// where to store data files, manifest files, and table metadata files.
Location() string
// LastUpdatedMillis is the timestamp in milliseconds from the unix epoch when
// the table was last updated. Each table metadata file should update this
// field just before writing.
LastUpdatedMillis() int64
// LastColumnID returns the highest assigned column ID for the table.
// This is used to ensure fields are always assigned an unused ID when
// evolving schemas.
LastColumnID() int
// Schemas returns the list of schemas, stored as objects with their
// schema-id.
Schemas() []*iceberg.Schema
// CurrentSchema returns the table's current schema.
CurrentSchema() *iceberg.Schema
// PartitionSpecs returns the list of all partition specs in the table.
PartitionSpecs() []iceberg.PartitionSpec
// PartitionSpec returns the current partition spec that the table is using.
PartitionSpec() iceberg.PartitionSpec
// DefaultPartitionSpec is the ID of the current spec that writers should
// use by default.
DefaultPartitionSpec() int
// LastPartitionSpecID is the highest assigned partition field ID across
// all partition specs for the table. This is used to ensure partition
// fields are always assigned an unused ID when evolving specs.
LastPartitionSpecID() *int
// Snapshots returns the list of valid snapshots. Valid snapshots are
// snapshots for which all data files exist in the file system. A data
// file must not be deleted from the file system until the last snapshot
// in which it was listed is garbage collected.
Snapshots() []Snapshot
// SnapshotByID find and return a specific snapshot by its ID. Returns
// nil if the ID is not found in the list of snapshots.
SnapshotByID(int64) *Snapshot
// SnapshotByName searches the list of snapshots for a snapshot with a given
// ref name. Returns nil if there's no ref with this name for a snapshot.
SnapshotByName(name string) *Snapshot
// CurrentSnapshot returns the table's current snapshot.
CurrentSnapshot() *Snapshot
// Ref returns the snapshot ref for the main branch.
Ref() SnapshotRef
// Refs returns a list of snapshot name/reference pairs.
Refs() iter.Seq2[string, SnapshotRef]
// SnapshotLogs returns the list of snapshot logs for the table.
SnapshotLogs() iter.Seq[SnapshotLogEntry]
// SortOrder returns the table's current sort order, ie: the one with the
// ID that matches the default-sort-order-id.
SortOrder() SortOrder
// SortOrders returns the list of sort orders in the table.
SortOrders() []SortOrder
// DefaultSortOrder returns the ID of the current sort order that writers
// should use by default.
DefaultSortOrder() int
// Properties is a string to string map of table properties. This is used
// to control settings that affect reading and writing and is not intended
// to be used for arbitrary metadata. For example, commit.retry.num-retries
// is used to control the number of commit retries.
Properties() iceberg.Properties
// PreviousFiles returns the list of metadata log entries for the table.
PreviousFiles() iter.Seq[MetadataLogEntry]
Equals(Metadata) bool
NameMapping() iceberg.NameMapping
LastSequenceNumber() int64
}
type MetadataBuilder struct {
base Metadata
updates []Update
// common fields
formatVersion int
uuid uuid.UUID
loc string
lastUpdatedMS int64
lastColumnId int
schemaList []*iceberg.Schema
currentSchemaID int
specs []iceberg.PartitionSpec
defaultSpecID int
lastPartitionID *int
props iceberg.Properties
snapshotList []Snapshot
currentSnapshotID *int64
snapshotLog []SnapshotLogEntry
metadataLog []MetadataLogEntry
sortOrderList []SortOrder
defaultSortOrderID int
refs map[string]SnapshotRef
// >v1 specific
lastSequenceNumber *int64
}
func NewMetadataBuilder() (*MetadataBuilder, error) {
return &MetadataBuilder{
updates: make([]Update, 0),
schemaList: make([]*iceberg.Schema, 0),
specs: make([]iceberg.PartitionSpec, 0),
props: make(iceberg.Properties),
snapshotList: make([]Snapshot, 0),
snapshotLog: make([]SnapshotLogEntry, 0),
metadataLog: make([]MetadataLogEntry, 0),
sortOrderList: make([]SortOrder, 0),
refs: make(map[string]SnapshotRef),
}, nil
}
func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
b := &MetadataBuilder{}
b.base = metadata
b.formatVersion = metadata.Version()
b.uuid = metadata.TableUUID()
b.loc = metadata.Location()
b.lastUpdatedMS = metadata.LastUpdatedMillis()
b.lastColumnId = metadata.LastColumnID()
b.schemaList = metadata.Schemas()
b.currentSchemaID = metadata.CurrentSchema().ID
b.specs = metadata.PartitionSpecs()
b.defaultSpecID = metadata.DefaultPartitionSpec()
b.lastPartitionID = metadata.LastPartitionSpecID()
b.props = metadata.Properties()
b.snapshotList = metadata.Snapshots()
b.sortOrderList = metadata.SortOrders()
b.defaultSortOrderID = metadata.DefaultSortOrder()
if metadata.Version() > 1 {
seq := metadata.LastSequenceNumber()
b.lastSequenceNumber = &seq
}
if metadata.CurrentSnapshot() != nil {
b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
}
b.refs = maps.Collect(metadata.Refs())
b.snapshotLog = slices.Collect(metadata.SnapshotLogs())
b.metadataLog = slices.Collect(metadata.PreviousFiles())
return b, nil
}
func (b *MetadataBuilder) HasChanges() bool { return len(b.updates) > 0 }
func (b *MetadataBuilder) CurrentSpec() iceberg.PartitionSpec {
return b.specs[b.defaultSpecID]
}
func (b *MetadataBuilder) CurrentSchema() *iceberg.Schema {
s, _ := b.GetSchemaByID(b.currentSchemaID)
return s
}
func (b *MetadataBuilder) LastUpdatedMS() int64 { return b.lastUpdatedMS }
func (b *MetadataBuilder) nextSequenceNumber() int64 {
if b.formatVersion > 1 {
if b.lastSequenceNumber == nil {
return 0
}
return *b.lastSequenceNumber + 1
}
return 0
}
func (b *MetadataBuilder) newSnapshotID() int64 {
snapshotID := generateSnapshotID()
for slices.ContainsFunc(b.snapshotList, func(s Snapshot) bool { return s.SnapshotID == snapshotID }) {
snapshotID = generateSnapshotID()
}
return snapshotID
}
func (b *MetadataBuilder) currentSnapshot() *Snapshot {
if b.currentSnapshotID == nil {
return nil
}
s, _ := b.SnapshotByID(*b.currentSnapshotID)
return s
}
func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID int, initial bool) (*MetadataBuilder, error) {
if newLastColumnID < b.lastColumnId {
return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
}
var schemas []*iceberg.Schema
if initial {
schemas = []*iceberg.Schema{schema}
} else {
schemas = append(b.schemaList, schema)
}
b.lastColumnId = newLastColumnID
b.schemaList = schemas
b.updates = append(b.updates, NewAddSchemaUpdate(schema, newLastColumnID, initial))
return b, nil
}
func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, initial bool) (*MetadataBuilder, error) {
for _, s := range b.specs {
if s.ID() == spec.ID() && !initial {
return nil, fmt.Errorf("partition spec with id %d already exists", spec.ID())
}
}
maxFieldID := 0
for f := range spec.Fields() {
maxFieldID = max(maxFieldID, f.FieldID)
}
prev := partitionFieldStartID - 1
if b.lastPartitionID != nil {
prev = *b.lastPartitionID
}
lastPartitionID := max(maxFieldID, prev)
var specs []iceberg.PartitionSpec
if initial {
specs = []iceberg.PartitionSpec{*spec}
} else {
specs = append(b.specs, *spec)
}
b.specs = specs
b.lastPartitionID = &lastPartitionID
b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
return b, nil
}
func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, error) {
if snapshot == nil {
return nil, nil
}
if len(b.schemaList) == 0 {
return nil, errors.New("can't add snapshot with no added schemas")
} else if len(b.specs) == 0 {
return nil, errors.New("can't add snapshot with no added partition specs")
} else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
return nil, fmt.Errorf("can't add snapshot with id %d, already exists", snapshot.SnapshotID)
} else if b.formatVersion == 2 &&
snapshot.SequenceNumber > 0 &&
snapshot.ParentSnapshotID != nil &&
snapshot.SequenceNumber <= *b.lastSequenceNumber {
return nil, fmt.Errorf("can't add snapshot with sequence number %d, must be > than last sequence number %d",
snapshot.SequenceNumber, b.lastSequenceNumber)
}
b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
b.lastUpdatedMS = snapshot.TimestampMs
b.lastSequenceNumber = &snapshot.SequenceNumber
b.snapshotList = append(b.snapshotList, *snapshot)
return b, nil
}
func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) (*MetadataBuilder, error) {
var sortOrders []SortOrder
if !initial {
sortOrders = append(sortOrders, b.sortOrderList...)
}
for _, s := range sortOrders {
if s.OrderID == sortOrder.OrderID {
return nil, fmt.Errorf("sort order with id %d already exists", sortOrder.OrderID)
}
}
b.sortOrderList = append(sortOrders, *sortOrder)
b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
return b, nil
}
func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, error) {
if len(keys) == 0 {
return b, nil
}
b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
for _, key := range keys {
delete(b.props, key)
}
return b, nil
}
func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) (*MetadataBuilder, error) {
if currentSchemaID == -1 {
currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) int {
return s.ID
})
if !slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSchema && u.(*addSchemaUpdate).Schema.ID == currentSchemaID
}) {
return nil, errors.New("can't set current schema to last added schema, no schema has been added")
}
}
if currentSchemaID == b.currentSchemaID {
return b, nil
}
_, err := b.GetSchemaByID(currentSchemaID)
if err != nil {
return nil, fmt.Errorf("can't set current schema to schema with id %d: %w", currentSchemaID, err)
}
b.updates = append(b.updates, NewSetCurrentSchemaUpdate(currentSchemaID))
b.currentSchemaID = currentSchemaID
return b, nil
}
func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) (*MetadataBuilder, error) {
if defaultSortOrderID == -1 {
defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) int {
return s.OrderID
})
if !slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSortOrder && u.(*addSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID
}) {
return nil, errors.New("can't set default sort order to last added with no added sort orders")
}
}
if defaultSortOrderID == b.defaultSortOrderID {
return b, nil
}
if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
return nil, fmt.Errorf("can't set default sort order to sort order with id %d: %w", defaultSortOrderID, err)
}
b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(defaultSortOrderID))
b.defaultSortOrderID = defaultSortOrderID
return b, nil
}
func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) (*MetadataBuilder, error) {
if defaultSpecID == -1 {
defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) int {
return s.ID()
})
if !slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSpec && u.(*addPartitionSpecUpdate).Spec.ID() == defaultSpecID
}) {
return nil, errors.New("can't set default spec to last added with no added partition specs")
}
}
if defaultSpecID == b.defaultSpecID {
return b, nil
}
if _, err := b.GetSpecByID(defaultSpecID); err != nil {
return nil, fmt.Errorf("can't set default spec to spec with id %d: %w", defaultSpecID, err)
}
b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
b.defaultSpecID = defaultSpecID
return b, nil
}
func (b *MetadataBuilder) SetFormatVersion(formatVersion int) (*MetadataBuilder, error) {
if formatVersion < b.formatVersion {
return nil, fmt.Errorf("downgrading format version from %d to %d is not allowed",
b.formatVersion, formatVersion)
}
if formatVersion > supportedTableFormatVersion {
return nil, fmt.Errorf("unsupported format version %d", formatVersion)
}
if formatVersion == b.formatVersion {
return b, nil
}
b.updates = append(b.updates, NewUpgradeFormatVersionUpdate(formatVersion))
b.formatVersion = formatVersion
return b, nil
}
func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) {
if b.loc == loc {
return b, nil
}
b.updates = append(b.updates, NewSetLocationUpdate(loc))
b.loc = loc
return b, nil
}
func (b *MetadataBuilder) SetProperties(props iceberg.Properties) (*MetadataBuilder, error) {
if len(props) == 0 {
return b, nil
}
b.updates = append(b.updates, NewSetPropertiesUpdate(props))
if b.props == nil {
b.props = props
} else {
maps.Copy(b.props, props)
}
return b, nil
}
type setSnapshotRefOption func(*SnapshotRef) error
func WithMaxRefAgeMs(maxRefAgeMs int64) setSnapshotRefOption {
return func(ref *SnapshotRef) error {
if maxRefAgeMs <= 0 {
return fmt.Errorf("%w: maxRefAgeMs %d, must be > 0", iceberg.ErrInvalidArgument, maxRefAgeMs)
}
ref.MaxRefAgeMs = &maxRefAgeMs
return nil
}
}
func WithMaxSnapshotAgeMs(maxSnapshotAgeMs int64) setSnapshotRefOption {
return func(ref *SnapshotRef) error {
if maxSnapshotAgeMs <= 0 {
return fmt.Errorf("%w: maxSnapshotAgeMs %d, must be > 0", iceberg.ErrInvalidArgument, maxSnapshotAgeMs)
}
ref.MaxSnapshotAgeMs = &maxSnapshotAgeMs
return nil
}
}
func WithMinSnapshotsToKeep(minSnapshotsToKeep int) setSnapshotRefOption {
return func(ref *SnapshotRef) error {
if minSnapshotsToKeep <= 0 {
return fmt.Errorf("%w: minSnapshotsToKeep %d, must be > 0", iceberg.ErrInvalidArgument, minSnapshotsToKeep)
}
ref.MinSnapshotsToKeep = &minSnapshotsToKeep
return nil
}
}
func (b *MetadataBuilder) SetSnapshotRef(
name string,
snapshotID int64,
refType RefType,
options ...setSnapshotRefOption,
) (*MetadataBuilder, error) {
ref := SnapshotRef{
SnapshotID: snapshotID,
SnapshotRefType: refType,
}
for _, opt := range options {
if err := opt(&ref); err != nil {
return nil, fmt.Errorf("invalid snapshot ref option: %w", err)
}
}
var maxRefAgeMs, maxSnapshotAgeMs int64
var minSnapshotsToKeep int
if ref.MaxRefAgeMs != nil {
maxRefAgeMs = *ref.MaxRefAgeMs
}
if ref.MaxSnapshotAgeMs != nil {
maxSnapshotAgeMs = *ref.MaxSnapshotAgeMs
}
if ref.MinSnapshotsToKeep != nil {
minSnapshotsToKeep = *ref.MinSnapshotsToKeep
}
if existingRef, ok := b.refs[name]; ok && existingRef.Equals(ref) {
return b, nil
}
snapshot, err := b.SnapshotByID(snapshotID)
if err != nil {
return nil, fmt.Errorf("can't set snapshot ref %s to unknown snapshot %d: %w", name, snapshotID, err)
}
isAddedSnapshot := slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSnapshot && u.(*addSnapshotUpdate).Snapshot.SnapshotID == snapshotID
})
if isAddedSnapshot {
b.lastUpdatedMS = snapshot.TimestampMs
}
if name == MainBranch {
b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep))
b.currentSnapshotID = &snapshotID
if !isAddedSnapshot {
b.lastUpdatedMS = time.Now().Local().UnixMilli()
}
b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{
SnapshotID: snapshotID,
TimestampMs: b.lastUpdatedMS,
})
}
if slices.ContainsFunc(b.updates, func(u Update) bool {
return u.Action() == UpdateAddSnapshot && u.(*addSnapshotUpdate).Snapshot.SnapshotID == snapshotID
}) {
b.lastUpdatedMS = snapshot.TimestampMs
}
b.refs[name] = ref
return b, nil
}
func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) {
if b.uuid == uuid {
return b, nil
}
b.updates = append(b.updates, NewAssignUUIDUpdate(uuid))
b.uuid = uuid
return b, nil
}
func (b *MetadataBuilder) SetLastUpdatedMS() *MetadataBuilder {
b.lastUpdatedMS = time.Now().UnixMilli()
return b
}
func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
if b.lastUpdatedMS == 0 {
b.lastUpdatedMS = time.Now().UnixMilli()
}
return &commonMetadata{
FormatVersion: b.formatVersion,
UUID: b.uuid,
Loc: b.loc,
LastUpdatedMS: b.lastUpdatedMS,
LastColumnId: b.lastColumnId,
SchemaList: b.schemaList,
CurrentSchemaID: b.currentSchemaID,
Specs: b.specs,
DefaultSpecID: b.defaultSpecID,
LastPartitionID: b.lastPartitionID,
Props: b.props,
SnapshotList: b.snapshotList,
CurrentSnapshotID: b.currentSnapshotID,
SnapshotLog: b.snapshotLog,
MetadataLog: b.metadataLog,
SortOrderList: b.sortOrderList,
DefaultSortOrderID: b.defaultSortOrderID,
SnapshotRefs: b.refs,
}
}
func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
for _, s := range b.schemaList {
if s.ID == id {
return s, nil
}
}
return nil, fmt.Errorf("%w: schema with id %d not found", iceberg.ErrInvalidArgument, id)
}
func (b *MetadataBuilder) GetSpecByID(id int) (*iceberg.PartitionSpec, error) {
for _, s := range b.specs {
if s.ID() == id {
return &s, nil
}
}
return nil, fmt.Errorf("partition spec with id %d not found", id)
}
func (b *MetadataBuilder) GetSortOrderByID(id int) (*SortOrder, error) {
for _, s := range b.sortOrderList {
if s.OrderID == id {
return &s, nil
}
}
return nil, fmt.Errorf("sort order with id %d not found", id)
}
func (b *MetadataBuilder) SnapshotByID(id int64) (*Snapshot, error) {
for _, s := range b.snapshotList {
if s.SnapshotID == id {
return &s, nil
}
}
return nil, fmt.Errorf("snapshot with id %d not found", id)
}
func (b *MetadataBuilder) NameMapping() iceberg.NameMapping {
if nameMappingJson, ok := b.props[DefaultNameMappingKey]; ok {
nm := iceberg.NameMapping{}
if err := json.Unmarshal([]byte(nameMappingJson), &nm); err == nil {
return nm
}
}
return nil
}
func (b *MetadataBuilder) TrimMetadataLogs(maxEntries int) *MetadataBuilder {
if len(b.metadataLog) <= maxEntries {
return b
}
b.metadataLog = b.metadataLog[len(b.metadataLog)-maxEntries:]
return b
}
func (b *MetadataBuilder) AppendMetadataLog(entry MetadataLogEntry) *MetadataBuilder {
b.metadataLog = append(b.metadataLog, entry)
return b
}
func (b *MetadataBuilder) Build() (Metadata, error) {
common := b.buildCommonMetadata()
if err := common.validate(); err != nil {
return nil, err
}
switch b.formatVersion {
case 1:
schema, err := b.GetSchemaByID(b.currentSchemaID)
if err != nil {
return nil, fmt.Errorf("can't build metadata, missing schema for schema ID %d: %w", b.currentSchemaID, err)
}
partition, err := b.GetSpecByID(b.defaultSpecID)
if err != nil {
return nil, fmt.Errorf("can't build metadata, missing partition spec for spec ID %d: %w", b.defaultSpecID, err)
}
partitionFields := make([]iceberg.PartitionField, 0)
for f := range partition.Fields() {
partitionFields = append(partitionFields, f)
}
return &metadataV1{
Schema: schema,
Partition: partitionFields,
commonMetadata: *common,
}, nil
case 2:
var lastSequenceNumber int64
if b.lastSequenceNumber != nil {
lastSequenceNumber = *b.lastSequenceNumber
}
return &metadataV2{
LastSeqNum: lastSequenceNumber,
commonMetadata: *common,
}, nil
default:
panic("unreachable: invalid format version")
}
}
// maxBy returns the maximum value of extract(e) for all e in elems.
// If elems is empty, returns 0.
func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int {
m := 0
for _, e := range elems {
m = max(m, extract(e))
}
return m
}
var (
ErrInvalidMetadataFormatVersion = errors.New("invalid or missing format-version in table metadata")
ErrInvalidMetadata = errors.New("invalid metadata")
)
// ParseMetadata parses json metadata provided by the passed in reader,
// returning an error if one is encountered.
func ParseMetadata(r io.Reader) (Metadata, error) {
data, err := io.ReadAll(r)
if err != nil {
return nil, err
}
return ParseMetadataBytes(data)
}
// ParseMetadataString is like [ParseMetadata], but for a string rather than
// an io.Reader.
func ParseMetadataString(s string) (Metadata, error) {
return ParseMetadataBytes([]byte(s))
}
// ParseMetadataBytes is like [ParseMetadataString] but for a byte slice.
func ParseMetadataBytes(b []byte) (Metadata, error) {
ver := struct {
FormatVersion int `json:"format-version"`
}{}
if err := json.Unmarshal(b, &ver); err != nil {
return nil, err
}
var ret Metadata
switch ver.FormatVersion {
case 1:
ret = &metadataV1{}
case 2:
ret = &metadataV2{}
default:
return nil, ErrInvalidMetadataFormatVersion
}
return ret, json.Unmarshal(b, ret)
}
func sliceEqualHelper[T interface{ Equals(T) bool }](s1, s2 []T) bool {
return slices.EqualFunc(s1, s2, func(t1, t2 T) bool {
return t1.Equals(t2)
})
}
// https://iceberg.apache.org/spec/#iceberg-table-spec
type commonMetadata struct {
FormatVersion int `json:"format-version"`
UUID uuid.UUID `json:"table-uuid"`
Loc string `json:"location"`
LastUpdatedMS int64 `json:"last-updated-ms"`
LastColumnId int `json:"last-column-id"`
SchemaList []*iceberg.Schema `json:"schemas"`
CurrentSchemaID int `json:"current-schema-id"`
Specs []iceberg.PartitionSpec `json:"partition-specs"`
DefaultSpecID int `json:"default-spec-id"`
LastPartitionID *int `json:"last-partition-id,omitempty"`
Props iceberg.Properties `json:"properties,omitempty"`
SnapshotList []Snapshot `json:"snapshots,omitempty"`
CurrentSnapshotID *int64 `json:"current-snapshot-id,omitempty"`
SnapshotLog []SnapshotLogEntry `json:"snapshot-log,omitempty"`
MetadataLog []MetadataLogEntry `json:"metadata-log,omitempty"`
SortOrderList []SortOrder `json:"sort-orders"`
DefaultSortOrderID int `json:"default-sort-order-id"`
SnapshotRefs map[string]SnapshotRef `json:"refs,omitempty"`
}
func (c *commonMetadata) Ref() SnapshotRef { return c.SnapshotRefs[MainBranch] }
func (c *commonMetadata) Refs() iter.Seq2[string, SnapshotRef] { return maps.All(c.SnapshotRefs) }
func (c *commonMetadata) SnapshotLogs() iter.Seq[SnapshotLogEntry] {
return slices.Values(c.SnapshotLog)
}
func (c *commonMetadata) PreviousFiles() iter.Seq[MetadataLogEntry] {
return slices.Values(c.MetadataLog)
}
func (c *commonMetadata) Equals(other *commonMetadata) bool {
if other == nil {
return false
}
if c == other {
return true
}
switch {
case c.LastPartitionID == nil && other.LastPartitionID != nil:
fallthrough
case c.LastPartitionID != nil && other.LastPartitionID == nil:
fallthrough
case c.CurrentSnapshotID == nil && other.CurrentSnapshotID != nil:
fallthrough
case c.CurrentSnapshotID != nil && other.CurrentSnapshotID == nil:
return false
}
switch {
case !sliceEqualHelper(c.SchemaList, other.SchemaList):
fallthrough
case !sliceEqualHelper(c.SnapshotList, other.SnapshotList):
fallthrough
case !sliceEqualHelper(c.Specs, other.Specs):
fallthrough
case !maps.Equal(c.Props, other.Props):
fallthrough
case !maps.EqualFunc(c.SnapshotRefs, other.SnapshotRefs, func(sr1, sr2 SnapshotRef) bool { return sr1.Equals(sr2) }):
return false
}
return c.FormatVersion == other.FormatVersion && c.UUID == other.UUID &&
((c.LastPartitionID == other.LastPartitionID) || (*c.LastPartitionID == *other.LastPartitionID)) &&
((c.CurrentSnapshotID == other.CurrentSnapshotID) || (*c.CurrentSnapshotID == *other.CurrentSnapshotID)) &&
c.Loc == other.Loc && c.LastUpdatedMS == other.LastUpdatedMS &&
c.LastColumnId == other.LastColumnId && c.CurrentSchemaID == other.CurrentSchemaID &&
c.DefaultSpecID == other.DefaultSpecID && c.DefaultSortOrderID == other.DefaultSortOrderID &&
slices.Equal(c.SnapshotLog, other.SnapshotLog) && slices.Equal(c.MetadataLog, other.MetadataLog) &&
sliceEqualHelper(c.SortOrderList, other.SortOrderList)
}
func (c *commonMetadata) TableUUID() uuid.UUID { return c.UUID }
func (c *commonMetadata) Location() string { return c.Loc }
func (c *commonMetadata) LastUpdatedMillis() int64 { return c.LastUpdatedMS }
func (c *commonMetadata) LastColumnID() int { return c.LastColumnId }
func (c *commonMetadata) Schemas() []*iceberg.Schema { return c.SchemaList }
func (c *commonMetadata) CurrentSchema() *iceberg.Schema {
for _, s := range c.SchemaList {
if s.ID == c.CurrentSchemaID {
return s
}
}
panic("should never get here")
}
func (c *commonMetadata) PartitionSpecs() []iceberg.PartitionSpec {
return c.Specs
}
func (c *commonMetadata) DefaultPartitionSpec() int {
return c.DefaultSpecID
}
func (c *commonMetadata) PartitionSpec() iceberg.PartitionSpec {
for _, s := range c.Specs {
if s.ID() == c.DefaultSpecID {
return s
}
}
return *iceberg.UnpartitionedSpec
}
func (c *commonMetadata) LastPartitionSpecID() *int { return c.LastPartitionID }
func (c *commonMetadata) Snapshots() []Snapshot { return c.SnapshotList }
func (c *commonMetadata) SnapshotByID(id int64) *Snapshot {
for i := range c.SnapshotList {
if c.SnapshotList[i].SnapshotID == id {
return &c.SnapshotList[i]
}
}
return nil
}
func (c *commonMetadata) SnapshotByName(name string) *Snapshot {
if ref, ok := c.SnapshotRefs[name]; ok {
return c.SnapshotByID(ref.SnapshotID)
}
return nil
}
func (c *commonMetadata) CurrentSnapshot() *Snapshot {
if c.CurrentSnapshotID == nil {
return nil
}
return c.SnapshotByID(*c.CurrentSnapshotID)
}
func (c *commonMetadata) SortOrders() []SortOrder { return c.SortOrderList }
func (c *commonMetadata) SortOrder() SortOrder {
for _, s := range c.SortOrderList {
if s.OrderID == c.DefaultSortOrderID {
return s
}
}
return UnsortedSortOrder
}
func (c *commonMetadata) DefaultSortOrder() int {
return c.DefaultSortOrderID
}
func (c *commonMetadata) Properties() iceberg.Properties {
return c.Props
}
// preValidate updates values in the metadata struct with defaults based on
// combinations of struct members. Such as initializing slices as empty slices
// if they were null in the metadata, or normalizing inconsistencies between
// metadata versions.
func (c *commonMetadata) preValidate() {
if c.CurrentSnapshotID != nil && *c.CurrentSnapshotID == -1 {
// treat -1 as the same as nil, clean this up in pre-validation
// to make the validation logic simplified later
c.CurrentSnapshotID = nil
}
if c.CurrentSnapshotID != nil {
if _, ok := c.SnapshotRefs[MainBranch]; !ok {
c.SnapshotRefs[MainBranch] = SnapshotRef{
SnapshotID: *c.CurrentSnapshotID,
SnapshotRefType: BranchRef,
}
}
}
if c.MetadataLog == nil {
c.MetadataLog = []MetadataLogEntry{}
}
if c.SnapshotRefs == nil {
c.SnapshotRefs = make(map[string]SnapshotRef)
}
if c.SnapshotLog == nil {
c.SnapshotLog = []SnapshotLogEntry{}
}
}
func (c *commonMetadata) checkSchemas() error {
// check that current-schema-id is present in schemas
for _, s := range c.SchemaList {
if s.ID == c.CurrentSchemaID {
return nil
}
}
return fmt.Errorf("%w: current-schema-id %d can't be found in any schema",
ErrInvalidMetadata, c.CurrentSchemaID)
}
func (c *commonMetadata) checkPartitionSpecs() error {
for _, spec := range c.Specs {
if spec.ID() == c.DefaultSpecID {
return nil
}
}
return fmt.Errorf("%w: default-spec-id %d can't be found",
ErrInvalidMetadata, c.DefaultSpecID)
}
func (c *commonMetadata) checkSortOrders() error {
if c.DefaultSortOrderID == UnsortedSortOrderID {
return nil
}
for _, o := range c.SortOrderList {
if o.OrderID == c.DefaultSortOrderID {
return nil
}
}
return fmt.Errorf("%w: default-sort-order-id %d can't be found in %+v",
ErrInvalidMetadata, c.DefaultSortOrderID, c.SortOrderList)
}
func (c *commonMetadata) constructRefs() {
if c.CurrentSnapshotID != nil {
_, ok := c.SnapshotRefs[MainBranch]
if !ok {
c.SnapshotRefs[MainBranch] = SnapshotRef{
SnapshotID: *c.CurrentSnapshotID,
SnapshotRefType: BranchRef,
}
}
}
}
func (c *commonMetadata) validate() error {
if err := c.checkSchemas(); err != nil {
return err
}
if err := c.checkPartitionSpecs(); err != nil {
return err
}
if err := c.checkSortOrders(); err != nil {
return err
}
c.constructRefs()
switch {
case c.LastUpdatedMS == 0:
// last-updated-ms is required
return fmt.Errorf("%w: missing last-updated-ms", ErrInvalidMetadata)
case c.LastColumnId < 0:
// last-column-id is required
return fmt.Errorf("%w: missing last-column-id", ErrInvalidMetadata)
}
return nil
}
func (c *commonMetadata) NameMapping() iceberg.NameMapping {
if nameMappingJson, ok := c.Props[DefaultNameMappingKey]; ok {
nm := iceberg.NameMapping{}
if err := json.Unmarshal([]byte(nameMappingJson), &nm); err == nil {
return nm
}
}
return nil
}
func (c *commonMetadata) Version() int { return c.FormatVersion }
type metadataV1 struct {
Schema *iceberg.Schema `json:"schema,omitempty"`
Partition []iceberg.PartitionField `json:"partition-spec,omitempty"`
commonMetadata
}
func (m *metadataV1) LastSequenceNumber() int64 { return 0 }
func (m *metadataV1) Equals(other Metadata) bool {
rhs, ok := other.(*metadataV1)
if !ok {
return false
}
if m == rhs {
return true
}
return m.Schema.Equals(rhs.Schema) && slices.Equal(m.Partition, rhs.Partition) &&
m.commonMetadata.Equals(&rhs.commonMetadata)
}
func (m *metadataV1) preValidate() {
if len(m.SchemaList) == 0 && m.Schema != nil {
m.SchemaList = []*iceberg.Schema{m.Schema}
}
if len(m.Specs) == 0 {
m.Specs = []iceberg.PartitionSpec{
iceberg.NewPartitionSpec(m.Partition...),
}
m.DefaultSpecID = m.Specs[0].ID()
}
if m.LastPartitionID == nil {
id := m.Specs[0].LastAssignedFieldID()
for _, spec := range m.Specs[1:] {
last := spec.LastAssignedFieldID()
if last > id {
id = last
}
}
m.LastPartitionID = &id
}
if len(m.SortOrderList) == 0 {
m.SortOrderList = []SortOrder{UnsortedSortOrder}
}
m.commonMetadata.preValidate()
}
func (m *metadataV1) UnmarshalJSON(b []byte) error {
type Alias metadataV1
aux := (*Alias)(m)
// Set LastColumnId to -1 to indicate that it is not set as LastColumnId = 0 is a valid value for when no schema is present
aux.LastColumnId = -1
if err := json.Unmarshal(b, aux); err != nil {
return err
}
m.preValidate()
return m.validate()
}
func (m *metadataV1) ToV2() metadataV2 {
commonOut := m.commonMetadata
commonOut.FormatVersion = 2
if commonOut.UUID.String() == "" {
commonOut.UUID = uuid.New()
}
return metadataV2{commonMetadata: commonOut}
}
type metadataV2 struct {
LastSeqNum int64 `json:"last-sequence-number"`
commonMetadata
}
func (m *metadataV2) LastSequenceNumber() int64 { return m.LastSeqNum }
func (m *metadataV2) Equals(other Metadata) bool {
rhs, ok := other.(*metadataV2)
if !ok {
return false
}
if m == rhs {
return true
}
return m.LastSeqNum == rhs.LastSeqNum &&
m.commonMetadata.Equals(&rhs.commonMetadata)
}
func (m *metadataV2) UnmarshalJSON(b []byte) error {
type Alias metadataV2
aux := (*Alias)(m)
// Set LastColumnId to -1 to indicate that it is not set as LastColumnId = 0 is a valid value for when no schema is present
aux.LastColumnId = -1
if err := json.Unmarshal(b, aux); err != nil {
return err
}
m.preValidate()
return m.validate()
}
const DefaultFormatVersion = 2
// NewMetadata creates a new table metadata object using the provided schema, information, generating a fresh UUID for
// the new table metadata. By default, this will generate a V2 table metadata, but this can be modified
// by adding a "format-version" property to the props map. An error will be returned if the "format-version"
// property exists and is not a valid version number.
func NewMetadata(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrder SortOrder, location string, props iceberg.Properties) (Metadata, error) {
return NewMetadataWithUUID(sc, partitions, sortOrder, location, props, uuid.Nil)
}
// NewMetadataWithUUID is like NewMetadata, but allows the caller to specify the UUID of the table rather than creating a new one.
func NewMetadataWithUUID(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrder SortOrder, location string, props iceberg.Properties, tableUuid uuid.UUID) (Metadata, error) {
freshSchema, err := iceberg.AssignFreshSchemaIDs(sc, nil)
if err != nil {
return nil, err
}
freshPartitions, err := iceberg.AssignFreshPartitionSpecIDs(partitions, sc, freshSchema)
if err != nil {
return nil, err
}
freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, freshSchema)
if err != nil {
return nil, err
}
if tableUuid == uuid.Nil {
tableUuid = uuid.New()
}
formatVersion := DefaultFormatVersion
if props != nil {
verStr, ok := props["format-version"]
if ok {
if formatVersion, err = strconv.Atoi(verStr); err != nil {
formatVersion = DefaultFormatVersion
}
delete(props, "format-version")
}
}
lastPartitionID := freshPartitions.LastAssignedFieldID()
common := commonMetadata{
LastUpdatedMS: time.Now().UnixMilli(),
LastColumnId: freshSchema.HighestFieldID(),
FormatVersion: formatVersion,
UUID: tableUuid,
Loc: location,
SchemaList: []*iceberg.Schema{freshSchema},
CurrentSchemaID: freshSchema.ID,
Specs: []iceberg.PartitionSpec{freshPartitions},
DefaultSpecID: freshPartitions.ID(),
LastPartitionID: &lastPartitionID,
Props: props,
SortOrderList: []SortOrder{freshSortOrder},
DefaultSortOrderID: freshSortOrder.OrderID,
}
switch formatVersion {
case 1:
return &metadataV1{
commonMetadata: common,
Schema: freshSchema,
Partition: slices.Collect(freshPartitions.Fields()),
}, nil
case 2:
return &metadataV2{commonMetadata: common}, nil
default:
return nil, fmt.Errorf("invalid format version: %d", formatVersion)
}
}