catalog/catalog.go (137 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package catalog provides an interface for Catalog implementations along with
// a registry for registering catalog implementations.
//
// Subpackages of this package provide some default implementations for select
// catalog types which will register themselves if imported. For instance,
// adding the following import:
//
// import _ "github.com/apache/iceberg-go/catalog/rest"
//
// Will register the REST catalog implementation.
package catalog
import (
"context"
"errors"
"fmt"
"iter"
"maps"
"strings"
"github.com/apache/iceberg-go"
iceinternal "github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/table"
)
type Type string
const (
REST Type = "rest"
Hive Type = "hive"
Glue Type = "glue"
DynamoDB Type = "dynamodb"
SQL Type = "sql"
)
var (
// ErrNoSuchTable is returned when a table does not exist in the catalog.
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
ErrTableAlreadyExists = errors.New("table already exists")
ErrCatalogNotFound = errors.New("catalog type not registered")
ErrNamespaceNotEmpty = errors.New("namespace is not empty")
ErrNoSuchView = errors.New("view does not exist")
ErrViewAlreadyExists = errors.New("view already exists")
)
type PropertiesUpdateSummary struct {
Removed []string `json:"removed"`
Updated []string `json:"updated"`
Missing []string `json:"missing"`
}
// CreateTableCfg represents the configuration used for CreateTable operations
type CreateTableCfg struct {
Location string
PartitionSpec *iceberg.PartitionSpec
SortOrder table.SortOrder
Properties iceberg.Properties
}
// Catalog for iceberg table operations like create, drop, load, list and others.
type Catalog interface {
// CatalogType returns the type of the catalog.
CatalogType() Type
// CreateTable creates a new iceberg table in the catalog using the provided identifier
// and schema. Options can be used to optionally provide location, partition spec, sort order,
// and custom properties.
CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...CreateTableOpt) (*table.Table, error)
// CommitTable commits the table metadata and updates to the catalog, returning the new metadata
CommitTable(context.Context, *table.Table, []table.Requirement, []table.Update) (table.Metadata, string, error)
// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error]
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
// DropTable tells the catalog to drop the table entirely.
DropTable(ctx context.Context, identifier table.Identifier) error
// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
// CheckTableExists returns if the table exists
CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error)
// ListNamespaces returns the list of available namespaces, optionally filtering by a
// parent namespace
ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
// CreateNamespace tells the catalog to create a new namespace with the given properties
CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
DropNamespace(ctx context.Context, namespace table.Identifier) error
// CheckNamespaceExists returns if the namespace exists
CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error)
// LoadNamespaceProperties returns the current properties in the catalog for
// a given namespace
LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}
func ToIdentifier(ident ...string) table.Identifier {
if len(ident) == 1 {
if ident[0] == "" {
return nil
}
return table.Identifier(strings.Split(ident[0], "."))
}
return table.Identifier(ident)
}
func TableNameFromIdent(ident table.Identifier) string {
if len(ident) == 0 {
return ""
}
return ident[len(ident)-1]
}
func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}
type CreateTableOpt func(*CreateTableCfg)
func WithLocation(location string) CreateTableOpt {
return func(cfg *CreateTableCfg) {
cfg.Location = strings.TrimRight(location, "/")
}
}
func WithPartitionSpec(spec *iceberg.PartitionSpec) CreateTableOpt {
return func(cfg *CreateTableCfg) {
cfg.PartitionSpec = spec
}
}
func WithSortOrder(order table.SortOrder) CreateTableOpt {
return func(cfg *CreateTableCfg) {
cfg.SortOrder = order
}
}
func WithProperties(props iceberg.Properties) CreateTableOpt {
return func(cfg *CreateTableCfg) {
cfg.Properties = props
}
}
//lint:ignore U1000 this is linked to by catalogs via go:linkname but we don't want to export it
func checkForOverlap(removals []string, updates iceberg.Properties) error {
overlap := []string{}
for _, key := range removals {
if _, ok := updates[key]; ok {
overlap = append(overlap, key)
}
}
if len(overlap) > 0 {
return fmt.Errorf("conflict between removals and updates for keys: %v", overlap)
}
return nil
}
//lint:ignore U1000 this is linked to by catalogs via go:linkname but we don't want to export it
func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals []string, updates iceberg.Properties) (iceberg.Properties, PropertiesUpdateSummary, error) {
if err := checkForOverlap(removals, updates); err != nil {
return nil, PropertiesUpdateSummary{}, err
}
var (
updatedProps = maps.Clone(currentProps)
removed = make([]string, 0, len(removals))
updated = make([]string, 0, len(updates))
)
for _, key := range removals {
if _, exists := updatedProps[key]; exists {
delete(updatedProps, key)
removed = append(removed, key)
}
}
for key, value := range updates {
if updatedProps[key] != value {
updated = append(updated, key)
updatedProps[key] = value
}
}
summary := PropertiesUpdateSummary{
Removed: removed,
Updated: updated,
Missing: iceinternal.Difference(removals, removed),
}
return updatedProps, summary, nil
}