table/updates.go (280 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"fmt"
"github.com/apache/iceberg-go"
"github.com/google/uuid"
)
// These are the various update actions defined in the iceberg spec
const (
UpdateAddSpec = "add-spec"
UpdateAddSchema = "add-schema"
UpdateAddSnapshot = "add-snapshot"
UpdateAddSortOrder = "add-sort-order"
UpdateAssignUUID = "assign-uuid"
UpdateRemoveProperties = "remove-properties"
UpdateRemoveSnapshots = "remove-snapshots"
UpdateRemoveSnapshotRef = "remove-snapshot-ref"
UpdateSetCurrentSchema = "set-current-schema"
UpdateSetDefaultSortOrder = "set-default-sort-order"
UpdateSetDefaultSpec = "set-default-spec"
UpdateSetLocation = "set-location"
UpdateSetProperties = "set-properties"
UpdateSetSnapshotRef = "set-snapshot-ref"
UpdateUpgradeFormatVersion = "upgrade-format-version"
)
// Update represents a change to a table's metadata.
type Update interface {
// Action returns the name of the action that the update represents.
Action() string
// Apply applies the update to the given metadata builder.
Apply(*MetadataBuilder) error
}
// baseUpdate contains the common fields for all updates. It is used to identify the type
// of the update.
type baseUpdate struct {
ActionName string `json:"action"`
}
func (u *baseUpdate) Action() string {
return u.ActionName
}
type assignUUIDUpdate struct {
baseUpdate
UUID uuid.UUID `json:"uuid"`
}
// NewAssignUUIDUpdate creates a new update to assign a UUID to the table metadata.
func NewAssignUUIDUpdate(uuid uuid.UUID) Update {
return &assignUUIDUpdate{
baseUpdate: baseUpdate{ActionName: UpdateAssignUUID},
UUID: uuid,
}
}
func (u *assignUUIDUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetUUID(u.UUID)
return err
}
type upgradeFormatVersionUpdate struct {
baseUpdate
FormatVersion int `json:"format-version"`
}
// NewUpgradeFormatVersionUpdate creates a new update that upgrades the format version
// of the table metadata to the given formatVersion.
func NewUpgradeFormatVersionUpdate(formatVersion int) Update {
return &upgradeFormatVersionUpdate{
baseUpdate: baseUpdate{ActionName: UpdateUpgradeFormatVersion},
FormatVersion: formatVersion,
}
}
func (u *upgradeFormatVersionUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetFormatVersion(u.FormatVersion)
return err
}
type addSchemaUpdate struct {
baseUpdate
Schema *iceberg.Schema `json:"schema"`
LastColumnID int `json:"last-column-id"`
initial bool
}
// NewAddSchemaUpdate creates a new update that adds the given schema and last column ID to
// the table metadata. If the initial flag is set to true, the schema is considered the initial
// schema of the table, and all previously added schemas in the metadata builder are removed.
func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial bool) Update {
return &addSchemaUpdate{
baseUpdate: baseUpdate{ActionName: UpdateAddSchema},
Schema: schema,
LastColumnID: lastColumnID,
initial: initial,
}
}
func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial)
return err
}
type setCurrentSchemaUpdate struct {
baseUpdate
SchemaID int `json:"schema-id"`
}
// NewSetCurrentSchemaUpdate creates a new update that sets the current schema of the table
// metadata to the given schema ID.
func NewSetCurrentSchemaUpdate(id int) Update {
return &setCurrentSchemaUpdate{
baseUpdate: baseUpdate{ActionName: UpdateSetCurrentSchema},
SchemaID: id,
}
}
func (u *setCurrentSchemaUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetCurrentSchemaID(u.SchemaID)
return err
}
type addPartitionSpecUpdate struct {
baseUpdate
Spec *iceberg.PartitionSpec `json:"spec"`
initial bool
}
// NewAddPartitionSpecUpdate creates a new update that adds the given partition spec to the table
// metadata. If the initial flag is set to true, the spec is considered the initial spec of the table,
// and all other previously added specs in the metadata builder are removed.
func NewAddPartitionSpecUpdate(spec *iceberg.PartitionSpec, initial bool) Update {
return &addPartitionSpecUpdate{
baseUpdate: baseUpdate{ActionName: UpdateAddSpec},
Spec: spec,
initial: initial,
}
}
func (u *addPartitionSpecUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.AddPartitionSpec(u.Spec, u.initial)
return err
}
type setDefaultSpecUpdate struct {
baseUpdate
SpecID int `json:"spec-id"`
}
// NewSetDefaultSpecUpdate creates a new update that sets the default partition spec of the
// table metadata to the given spec ID.
func NewSetDefaultSpecUpdate(id int) Update {
return &setDefaultSpecUpdate{
baseUpdate: baseUpdate{ActionName: UpdateSetDefaultSpec},
SpecID: id,
}
}
func (u *setDefaultSpecUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetDefaultSpecID(u.SpecID)
return err
}
type addSortOrderUpdate struct {
baseUpdate
SortOrder *SortOrder `json:"sort-order"`
initial bool
}
// NewAddSortOrderUpdate creates a new update that adds the given sort order to the table metadata.
// If the initial flag is set to true, the sort order is considered the initial sort order of the table,
// and all previously added sort orders in the metadata builder are removed.
func NewAddSortOrderUpdate(sortOrder *SortOrder, initial bool) Update {
return &addSortOrderUpdate{
baseUpdate: baseUpdate{ActionName: UpdateAddSortOrder},
SortOrder: sortOrder,
initial: initial,
}
}
func (u *addSortOrderUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.AddSortOrder(u.SortOrder, u.initial)
return err
}
type setDefaultSortOrderUpdate struct {
baseUpdate
SortOrderID int `json:"sort-order-id"`
}
// NewSetDefaultSortOrderUpdate creates a new update that sets the default sort order of the table metadata
// to the given sort order ID.
func NewSetDefaultSortOrderUpdate(id int) Update {
return &setDefaultSortOrderUpdate{
baseUpdate: baseUpdate{ActionName: UpdateSetDefaultSortOrder},
SortOrderID: id,
}
}
func (u *setDefaultSortOrderUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetDefaultSortOrderID(u.SortOrderID)
return err
}
type addSnapshotUpdate struct {
baseUpdate
Snapshot *Snapshot `json:"snapshot"`
}
// NewAddSnapshotUpdate creates a new update that adds the given snapshot to the table metadata.
func NewAddSnapshotUpdate(snapshot *Snapshot) Update {
return &addSnapshotUpdate{
baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
Snapshot: snapshot,
}
}
func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.AddSnapshot(u.Snapshot)
return err
}
type setSnapshotRefUpdate struct {
baseUpdate
RefName string `json:"ref-name"`
RefType RefType `json:"type"`
SnapshotID int64 `json:"snapshot-id"`
MaxRefAgeMs int64 `json:"max-ref-age-ms,omitempty"`
MaxSnapshotAgeMs int64 `json:"max-snapshot-age-ms,omitempty"`
MinSnapshotsToKeep int `json:"min-snapshots-to-keep,omitempty"`
}
// NewSetSnapshotRefUpdate creates a new update that sets the given snapshot reference
// as the current snapshot of the table metadata. MaxRefAgeMs, MaxSnapshotAgeMs,
// and MinSnapshotsToKeep are optional, and any non-positive values are ignored.
func NewSetSnapshotRefUpdate(
name string,
snapshotID int64,
refType RefType,
maxRefAgeMs, maxSnapshotAgeMs int64,
minSnapshotsToKeep int,
) Update {
return &setSnapshotRefUpdate{
baseUpdate: baseUpdate{ActionName: UpdateSetSnapshotRef},
RefName: name,
RefType: refType,
SnapshotID: snapshotID,
MaxRefAgeMs: maxRefAgeMs,
MaxSnapshotAgeMs: maxSnapshotAgeMs,
MinSnapshotsToKeep: minSnapshotsToKeep,
}
}
func (u *setSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
opts := []setSnapshotRefOption{}
if u.MaxRefAgeMs > 0 {
opts = append(opts, WithMaxRefAgeMs(u.MaxRefAgeMs))
}
if u.MaxSnapshotAgeMs > 0 {
opts = append(opts, WithMaxSnapshotAgeMs(u.MaxSnapshotAgeMs))
}
if u.MinSnapshotsToKeep > 0 {
opts = append(opts, WithMinSnapshotsToKeep(u.MinSnapshotsToKeep))
}
_, err := builder.SetSnapshotRef(
u.RefName,
u.SnapshotID,
u.RefType,
opts...,
)
return err
}
type setLocationUpdate struct {
baseUpdate
Location string `json:"location"`
}
// NewSetLocationUpdate creates a new update that sets the location of the table metadata.
func NewSetLocationUpdate(loc string) Update {
return &setLocationUpdate{
baseUpdate: baseUpdate{ActionName: UpdateSetLocation},
Location: loc,
}
}
func (u *setLocationUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetLoc(u.Location)
return err
}
type setPropertiesUpdate struct {
baseUpdate
Updates iceberg.Properties `json:"updates"`
}
// NewSetPropertiesUpdate creates a new update that sets the given properties in the
// table metadata.
func NewSetPropertiesUpdate(updates iceberg.Properties) *setPropertiesUpdate {
return &setPropertiesUpdate{
baseUpdate: baseUpdate{ActionName: UpdateSetProperties},
Updates: updates,
}
}
func (u *setPropertiesUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.SetProperties(u.Updates)
return err
}
type removePropertiesUpdate struct {
baseUpdate
Removals []string `json:"removals"`
}
// NewRemovePropertiesUpdate creates a new update that removes properties from the table metadata.
// The properties are identified by their names, and if a property with the given name does not exist,
// it is ignored.
func NewRemovePropertiesUpdate(removals []string) Update {
return &removePropertiesUpdate{
baseUpdate: baseUpdate{ActionName: UpdateRemoveProperties},
Removals: removals,
}
}
func (u *removePropertiesUpdate) Apply(builder *MetadataBuilder) error {
_, err := builder.RemoveProperties(u.Removals)
return err
}
type removeSnapshotsUpdate struct {
baseUpdate
SnapshotIDs []int64 `json:"snapshot-ids"`
}
// NewRemoveSnapshotsUpdate creates a new update that removes all snapshots from
// the table metadata with the given snapshot IDs.
func NewRemoveSnapshotsUpdate(ids []int64) Update {
return &removeSnapshotsUpdate{
baseUpdate: baseUpdate{ActionName: UpdateRemoveSnapshots},
SnapshotIDs: ids,
}
}
func (u *removeSnapshotsUpdate) Apply(builder *MetadataBuilder) error {
return fmt.Errorf("%w: %s", iceberg.ErrNotImplemented, UpdateRemoveSnapshots)
}
type removeSnapshotRefUpdate struct {
baseUpdate
RefName string `json:"ref-name"`
}
// NewRemoveSnapshotRefUpdate creates a new update that removes a snapshot reference
// from the table metadata.
func NewRemoveSnapshotRefUpdate(ref string) *removeSnapshotRefUpdate {
return &removeSnapshotRefUpdate{
baseUpdate: baseUpdate{ActionName: UpdateRemoveSnapshotRef},
RefName: ref,
}
}
func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
return fmt.Errorf("%w: %s", iceberg.ErrNotImplemented, UpdateRemoveSnapshotRef)
}