catalog/internal/utils.go (182 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package internal import ( "context" "encoding/json" "errors" "fmt" "maps" "net/url" "path" "regexp" "strconv" "strings" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/google/uuid" ) func GetMetadataLoc(location string, newVersion uint) string { return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json", location, newVersion, uuid.New().String()) } func WriteTableMetadata(metadata table.Metadata, fs io.WriteFileIO, loc string) error { out, err := fs.Create(loc) if err != nil { return nil } defer out.Close() return json.NewEncoder(out).Encode(metadata) } func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, props iceberg.Properties) error { fs, err := io.LoadFS(ctx, props, loc) if err != nil { return err } wfs, ok := fs.(io.WriteFileIO) if !ok { return errors.New("filesystem IO does not support writing") } out, err := wfs.Create(loc) if err != nil { return nil } defer out.Close() return json.NewEncoder(out).Encode(metadata) } func UpdateTableMetadata(base table.Metadata, updates []table.Update, metadataLoc string) (table.Metadata, error) { bldr, err := table.MetadataBuilderFromBase(base) if err != nil { return nil, err } for _, update := range updates { if err := update.Apply(bldr); err != nil { return nil, err } } if bldr.HasChanges() { if metadataLoc != "" { maxMetadataLogEntries := max(1, base.Properties().GetInt( table.MetadataPreviousVersionsMaxKey, table.MetadataPreviousVersionsMaxDefault)) bldr.TrimMetadataLogs(maxMetadataLogEntries + 1). AppendMetadataLog(table.MetadataLogEntry{ MetadataFile: metadataLoc, TimestampMs: base.LastUpdatedMillis(), }) } if base.LastUpdatedMillis() == bldr.LastUpdatedMS() { bldr.SetLastUpdatedMS() } } return bldr.Build() } func CreateStagedTable(ctx context.Context, catprops iceberg.Properties, nspropsFn GetNamespacePropsFn, ident table.Identifier, sc *iceberg.Schema, opts ...catalog.CreateTableOpt) (table.StagedTable, error) { var cfg catalog.CreateTableCfg for _, opt := range opts { opt(&cfg) } dbIdent := catalog.NamespaceFromIdent(ident) tblname := catalog.TableNameFromIdent(ident) dbname := strings.Join(dbIdent, ".") loc, err := ResolveTableLocation(ctx, cfg.Location, dbname, tblname, catprops, nspropsFn) if err != nil { return table.StagedTable{}, err } provider, err := table.LoadLocationProvider(loc, cfg.Properties) if err != nil { return table.StagedTable{}, err } metadataLoc, err := provider.NewTableMetadataFileLocation(0) if err != nil { return table.StagedTable{}, err } metadata, err := table.NewMetadata(sc, cfg.PartitionSpec, cfg.SortOrder, loc, cfg.Properties) if err != nil { return table.StagedTable{}, err } ioProps := maps.Clone(catprops) maps.Copy(ioProps, cfg.Properties) fs, err := io.LoadFS(ctx, ioProps, metadataLoc) if err != nil { return table.StagedTable{}, err } return table.StagedTable{ Table: table.New(ident, metadata, metadataLoc, fs, nil), }, nil } type GetNamespacePropsFn func(context.Context, table.Identifier) (iceberg.Properties, error) func ResolveTableLocation(ctx context.Context, loc, dbname, tablename string, catprops iceberg.Properties, nsprops GetNamespacePropsFn) (string, error) { if len(loc) == 0 { dbprops, err := nsprops(ctx, strings.Split(dbname, ".")) if err != nil { return "", err } return getDefaultWarehouseLocation(dbname, tablename, dbprops, catprops) } return strings.TrimSuffix(loc, "/"), nil } func getDefaultWarehouseLocation(dbname, tablename string, nsprops, catprops iceberg.Properties) (string, error) { if dblocation := nsprops.Get("location", ""); dblocation != "" { return url.JoinPath(dblocation, tablename) } if warehousepath := catprops.Get("warehouse", ""); warehousepath != "" { return url.JoinPath(warehousepath, dbname+".db", tablename) } return "", errors.New("no default path set, please specify a location when creating a table") } // (\d+) -> version number // - -> separator // ([\w-]{36}) -> UUID (36 characters, including hyphens) // (?:\.\w+)? -> optional codec name // \.metadata\.json -> file extension var tableMetadataFileNameRegex = regexp.MustCompile(`^(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json`) func ParseMetadataVersion(location string) int { fileName := path.Base(location) matches := tableMetadataFileNameRegex.FindStringSubmatch(fileName) if len(matches) != 3 { return -1 } if _, err := uuid.Parse(matches[2]); err != nil { return -1 } v, err := strconv.Atoi(matches[1]) if err != nil { return -1 } return v } func UpdateAndStageTable(ctx context.Context, current *table.Table, ident table.Identifier, reqs []table.Requirement, updates []table.Update, cat table.CatalogIO) (*table.StagedTable, error) { var ( baseMeta table.Metadata metadataLoc string ) if current != nil { for _, r := range reqs { if err := r.Validate(current.Metadata()); err != nil { return nil, err } } baseMeta = current.Metadata() metadataLoc = current.MetadataLocation() } else { var err error baseMeta, err = table.NewMetadata(iceberg.NewSchema(0), nil, table.UnsortedSortOrder, "", nil) if err != nil { return nil, err } } updated, err := UpdateTableMetadata(baseMeta, updates, metadataLoc) if err != nil { return nil, err } provider, err := table.LoadLocationProvider(updated.Location(), updated.Properties()) if err != nil { return nil, err } newVersion := ParseMetadataVersion(metadataLoc) + 1 newLocation, err := provider.NewTableMetadataFileLocation(newVersion) if err != nil { return nil, err } fs, err := io.LoadFS(ctx, updated.Properties(), newLocation) if err != nil { return nil, err } return &table.StagedTable{Table: table.New(ident, updated, newLocation, fs, cat)}, nil }