catalog/glue/glue.go (619 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package glue import ( "context" "errors" "fmt" "iter" "maps" "strconv" _ "unsafe" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/internal" "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/apache/iceberg-go/utils" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/glue" "github.com/aws/aws-sdk-go-v2/service/glue/types" ) const ( // Use the same conventions as in the pyiceberg project. // See: https://github.com/apache/iceberg-python/blob/main/pyiceberg/catalog/__init__.py#L82-L96 glueTypeIceberg = "ICEBERG" databaseTypePropsKey = "database_type" tableTypePropsKey = "table_type" descriptionPropsKey = "Description" // Database location. locationPropsKey = "Location" // Table metadata location pointer. metadataLocationPropsKey = "metadata_location" // The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue // automatically uses the caller's AWS account ID by default. // See: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html CatalogIdKey = "glue.id" AccessKeyID = "glue.access-key-id" SecretAccessKey = "glue.secret-access-key" SessionToken = "glue.session-token" Region = "glue.region" Endpoint = "glue.endpoint" MaxRetries = "glue.max-retries" RetryMode = "glue.retry-mode" icebergFieldIDKey = "iceberg.field.id" icebergFieldOptionalKey = "iceberg.field.optional" icebergFieldCurrentKey = "iceberg.field.current" ) var _ catalog.Catalog = (*Catalog)(nil) func init() { catalog.Register("glue", catalog.RegistrarFunc(func(ctx context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) { awsConfig, err := toAwsConfig(ctx, props) if err != nil { return nil, err } return NewCatalog(WithAwsConfig(awsConfig), WithAwsProperties(AwsProperties(props))), nil })) } func toAwsConfig(ctx context.Context, p iceberg.Properties) (aws.Config, error) { opts := make([]func(*config.LoadOptions) error, 0) for k, v := range p { switch k { case Region: opts = append(opts, config.WithRegion(v)) case Endpoint: opts = append(opts, config.WithBaseEndpoint(v)) case MaxRetries: maxRetry, err := strconv.Atoi(v) if err != nil { return aws.Config{}, err } opts = append(opts, config.WithRetryMaxAttempts(maxRetry)) case RetryMode: m, err := aws.ParseRetryMode(v) if err != nil { return aws.Config{}, err } opts = append(opts, config.WithRetryMode(m)) } } key, secret, token := p[AccessKeyID], p[SecretAccessKey], p[SessionToken] if key != "" || secret != "" || token != "" { opts = append(opts, config.WithCredentialsProvider( credentials.NewStaticCredentialsProvider(key, secret, token))) } return config.LoadDefaultConfig(ctx, opts...) } type glueAPI interface { CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) DeleteTable(ctx context.Context, params *glue.DeleteTableInput, optFns ...func(*glue.Options)) (*glue.DeleteTableOutput, error) UpdateTable(ctx context.Context, params *glue.UpdateTableInput, optFns ...func(*glue.Options)) (*glue.UpdateTableOutput, error) GetDatabase(ctx context.Context, params *glue.GetDatabaseInput, optFns ...func(*glue.Options)) (*glue.GetDatabaseOutput, error) GetDatabases(ctx context.Context, params *glue.GetDatabasesInput, optFns ...func(*glue.Options)) (*glue.GetDatabasesOutput, error) CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput, optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error) DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput, optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error) UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error) } type Catalog struct { glueSvc glueAPI catalogId *string awsCfg *aws.Config props iceberg.Properties } // NewCatalog creates a new instance of glue.Catalog with the given options. func NewCatalog(opts ...Option) *Catalog { glueOps := &options{} for _, o := range opts { o(glueOps) } var catalogId *string if val, ok := glueOps.awsProperties[CatalogIdKey]; ok { catalogId = &val } else { catalogId = nil } return &Catalog{ glueSvc: glue.NewFromConfig(glueOps.awsConfig), catalogId: catalogId, awsCfg: &glueOps.awsConfig, props: iceberg.Properties(glueOps.awsProperties), } } // ListTables returns a list of Iceberg tables in the given Glue database. // // The namespace should just contain the Glue database name. func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] { return func(yield func(table.Identifier, error) bool) { database, err := identifierToGlueDatabase(namespace) if err != nil { yield(table.Identifier{}, err) return } paginator := glue.NewGetTablesPaginator(c.glueSvc, &glue.GetTablesInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { yield(table.Identifier{}, fmt.Errorf("failed to list tables in namespace %s: %w", database, err)) return } icebergTables := filterTableListByType(database, page.TableList, glueTypeIceberg) for _, tbl := range icebergTables { if !yield(tbl, nil) { return } } } } } // LoadTable loads a table from the catalog table details. // // The identifier should contain the Glue database name, then Glue table name. func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } if props == nil { props = map[string]string{} } glueTable, err := c.getTable(ctx, database, tableName) if err != nil { return nil, err } location, ok := glueTable.Parameters[metadataLocationPropsKey] if !ok { return nil, fmt.Errorf("missing metadata location for table %s.%s", database, tableName) } ctx = utils.WithAwsConfig(ctx, c.awsCfg) // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. iofs, err := io.LoadFS(ctx, props, location) if err != nil { return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err) } icebergTable, err := table.NewFromLocation(identifier, location, iofs, c) if err != nil { return nil, fmt.Errorf("failed to create table from location %s.%s: %w", database, tableName, err) } return icebergTable, nil } func (c *Catalog) CatalogType() catalog.Type { return catalog.Glue } // CreateTable creates a new Iceberg table in the Glue catalog. // This function will create the metadata file in S3 using the catalog and table properties, // to determine the bucket and key for the metadata location. func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, identifier, schema, opts...) if err != nil { return nil, err } database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } wfs, ok := staged.FS().(io.WriteFileIO) if !ok { return nil, errors.New("loaded filesystem IO does not support writing") } if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil { return nil, err } var tableDescription *string if desc := staged.Properties().Get("Description", ""); desc != "" { tableDescription = aws.String(desc) } tableInput := &types.TableInput{ Name: aws.String(tableName), Parameters: map[string]string{ tableTypePropsKey: glueTypeIceberg, metadataLocationPropsKey: staged.MetadataLocation(), }, TableType: aws.String("EXTERNAL_TABLE"), StorageDescriptor: &types.StorageDescriptor{ Location: aws.String(staged.Metadata().Location()), Columns: schemaToGlueColumns(schema, true), }, Description: tableDescription, } _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), TableInput: tableInput, }) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) } createdTable, err := c.LoadTable(ctx, identifier, nil) if err != nil { // Attempt to clean up the table if loading fails _, cleanupErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), Name: aws.String(tableName), }) if cleanupErr != nil { return nil, fmt.Errorf("failed to create table %s.%s and cleanup failed: %v (original error: %w)", database, tableName, cleanupErr, err) } return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) } return createdTable, nil } // RegisterTable registers a new table using existing metadata. func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLocation string) (*table.Table, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } // Load the metadata file to get table properties ctx = utils.WithAwsConfig(ctx, c.awsCfg) iofs, err := io.LoadFS(ctx, nil, metadataLocation) if err != nil { return nil, fmt.Errorf("failed to load metadata file at %s: %w", metadataLocation, err) } // Read the metadata file metadata, err := table.NewFromLocation([]string{tableName}, metadataLocation, iofs, c) if err != nil { return nil, fmt.Errorf("failed to read table metadata from %s: %w", metadataLocation, err) } tableInput := &types.TableInput{ Name: aws.String(tableName), Parameters: map[string]string{}, TableType: aws.String("EXTERNAL_TABLE"), StorageDescriptor: &types.StorageDescriptor{ Location: aws.String(metadataLocation), Columns: schemaToGlueColumns(metadata.Schema(), true), }, } _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), TableInput: tableInput, OpenTableFormatInput: &types.OpenTableFormatInput{ IcebergInput: &types.IcebergInput{ MetadataOperation: types.MetadataOperationCreate, }, }, }) if err != nil { return nil, fmt.Errorf("failed to register table %s.%s: %w", database, tableName, err) } return c.LoadTable(ctx, identifier, nil) } func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { // Load current table database, tableName, err := identifierToGlueTable(tbl.Identifier()) if err != nil { return nil, "", err } current, err := c.LoadTable(ctx, tbl.Identifier(), nil) if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) { return nil, "", err } // Create a staging table with the updates applied staged, err := internal.UpdateAndStageTable(ctx, tbl, tbl.Identifier(), requirements, updates, c) if err != nil { return nil, "", err } if current != nil && staged.Metadata().Equals(current.Metadata()) { return current.Metadata(), current.MetadataLocation(), nil } if err := internal.WriteMetadata(ctx, staged.Metadata(), staged.MetadataLocation(), staged.Properties()); err != nil { return nil, "", err } // Build and call Glue update request tableInput, err := buildGlueTableInput(ctx, database, tableName, staged, c) if err != nil { return nil, "", err } _, err = c.glueSvc.UpdateTable(ctx, &glue.UpdateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), TableInput: tableInput, }) if err != nil { return nil, "", err } return staged.Metadata(), staged.MetadataLocation(), err } // DropTable deletes an Iceberg table from the Glue catalog. func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error { database, tableName, err := identifierToGlueTable(identifier) if err != nil { return err } // Check if the table exists and is an Iceberg table. _, err = c.getTable(ctx, database, tableName) if err != nil { return err } params := &glue.DeleteTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), Name: aws.String(tableName), } _, err = c.glueSvc.DeleteTable(ctx, params) if err != nil { return fmt.Errorf("failed to drop table %s.%s: %w", database, tableName, err) } return nil } // RenameTable renames an Iceberg table in the Glue catalog. func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { fromDatabase, fromTable, err := identifierToGlueTable(from) if err != nil { return nil, err } toDatabase, toTable, err := identifierToGlueTable(to) if err != nil { return nil, err } if fromDatabase != toDatabase { return nil, fmt.Errorf("cannot rename table across namespaces: %s -> %s", fromDatabase, toDatabase) } // Fetch the existing Glue table to copy the metadata into the new table. fromGlueTable, err := c.getTable(ctx, fromDatabase, fromTable) if err != nil { return nil, fmt.Errorf("failed to fetch the table %s.%s: %w", fromDatabase, fromTable, err) } // Create the new table. _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(toDatabase), TableInput: &types.TableInput{ Name: aws.String(toTable), Owner: fromGlueTable.Owner, Description: fromGlueTable.Description, Parameters: fromGlueTable.Parameters, StorageDescriptor: fromGlueTable.StorageDescriptor, }, }) if err != nil { return nil, fmt.Errorf("failed to create the table %s.%s: %w", fromDatabase, fromTable, err) } // Drop the old table. _, err = c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(fromDatabase), Name: aws.String(fromTable), }) if err != nil { // Best-effort rollback the table creation. _, rollbackErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(toDatabase), Name: aws.String(toTable), }) if rollbackErr != nil { fmt.Printf("failed to rollback the new table %s.%s: %v", toDatabase, toTable, rollbackErr) } return nil, fmt.Errorf("failed to rename the table %s.%s: %w", fromDatabase, fromTable, err) } // Load the new table to return. renamedTable, err := c.LoadTable(ctx, TableIdentifier(toDatabase, toTable), nil) if err != nil { return nil, fmt.Errorf("failed to load renamed table %s.%s: %w", toDatabase, toTable, err) } return renamedTable, nil } // CheckTableExists returns if an Iceberg table exists in the Glue catalog. func (c *Catalog) CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { return false, err } _, err = c.getTable(ctx, database, tableName) if err != nil { if errors.Is(err, catalog.ErrNoSuchTable) { return false, nil } return false, err } return true, nil } // CreateNamespace creates a new Iceberg namespace in the Glue catalog. func (c *Catalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { database, err := identifierToGlueDatabase(namespace) if err != nil { return err } databaseParameters := map[string]string{ databaseTypePropsKey: glueTypeIceberg, } description := props[descriptionPropsKey] locationURI := props[locationPropsKey] if description != "" { databaseParameters[descriptionPropsKey] = description } if locationURI != "" { databaseParameters[locationPropsKey] = locationURI } databaseInput := &types.DatabaseInput{ Name: aws.String(database), Parameters: databaseParameters, } params := &glue.CreateDatabaseInput{CatalogId: c.catalogId, DatabaseInput: databaseInput} _, err = c.glueSvc.CreateDatabase(ctx, params) if err != nil { return fmt.Errorf("failed to create database %s: %w", database, err) } return nil } func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) { databaseName, err := identifierToGlueDatabase(namespace) if err != nil { return false, err } _, err = c.getDatabase(ctx, databaseName) if err != nil { if errors.Is(err, catalog.ErrNoSuchNamespace) { return false, nil } return false, err } return true, nil } // DropNamespace deletes an Iceberg namespace from the Glue catalog. func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) error { databaseName, err := identifierToGlueDatabase(namespace) if err != nil { return err } // Check if the database exists and is an iceberg database. _, err = c.getDatabase(ctx, databaseName) if err != nil { return err } params := &glue.DeleteDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName)} _, err = c.glueSvc.DeleteDatabase(ctx, params) if err != nil { return fmt.Errorf("failed to drop namespace %s: %w", databaseName, err) } return nil } // LoadNamespaceProperties loads the properties of an Iceberg namespace from the Glue catalog. func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) { databaseName, err := identifierToGlueDatabase(namespace) if err != nil { return nil, err } database, err := c.getDatabase(ctx, databaseName) if err != nil { return nil, err } props := make(map[string]string) if database.Parameters != nil { for k, v := range database.Parameters { props[k] = v } } return props, nil } // avoid circular dependency while still avoiding having to export the getUpdatedPropsAndUpdateSummary function // so that we can re-use it in the catalog implementations without duplicating the code. //go:linkname getUpdatedPropsAndUpdateSummary github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals []string, updates iceberg.Properties) (iceberg.Properties, catalog.PropertiesUpdateSummary, error) // UpdateNamespaceProperties updates the properties of an Iceberg namespace in the Glue catalog. // The removals list contains the keys to remove, and the updates map contains the keys and values to update. func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, removals []string, updates iceberg.Properties, ) (catalog.PropertiesUpdateSummary, error) { databaseName, err := identifierToGlueDatabase(namespace) if err != nil { return catalog.PropertiesUpdateSummary{}, err } database, err := c.getDatabase(ctx, databaseName) if err != nil { return catalog.PropertiesUpdateSummary{}, err } updatedProperties, propertiesUpdateSummary, err := getUpdatedPropsAndUpdateSummary(database.Parameters, removals, updates) if err != nil { return catalog.PropertiesUpdateSummary{}, err } _, err = c.glueSvc.UpdateDatabase(ctx, &glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName), DatabaseInput: &types.DatabaseInput{ Name: aws.String(databaseName), Parameters: updatedProperties, }}) if err != nil { return catalog.PropertiesUpdateSummary{}, fmt.Errorf("failed to update namespace properties %s: %w", databaseName, err) } return propertiesUpdateSummary, nil } // ListNamespaces returns a list of Iceberg namespaces from the given Glue catalog. func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) { params := &glue.GetDatabasesInput{ CatalogId: c.catalogId, } if parent != nil { return nil, errors.New("hierarchical namespace is not supported") } var icebergNamespaces []table.Identifier for { databasesResp, err := c.glueSvc.GetDatabases(ctx, params) if err != nil { return nil, fmt.Errorf("failed to list databases: %w", err) } icebergNamespaces = append(icebergNamespaces, filterDatabaseListByType(databasesResp.DatabaseList, glueTypeIceberg)...) if databasesResp.NextToken == nil { break } params.NextToken = databasesResp.NextToken } return icebergNamespaces, nil } // GetTable loads a table from the Glue Catalog using the given database and table name. func (c *Catalog) getTable(ctx context.Context, database, tableName string) (*types.Table, error) { tblRes, err := c.glueSvc.GetTable(ctx, &glue.GetTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), Name: aws.String(tableName), }, ) if err != nil { var notFoundErr *types.EntityNotFoundException if errors.As(err, &notFoundErr) { return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, catalog.ErrNoSuchTable) } return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) } if tblRes.Table.Parameters[tableTypePropsKey] != glueTypeIceberg { return nil, fmt.Errorf("table %s.%s is not an iceberg table", database, tableName) } return tblRes.Table, nil } // GetDatabase loads a database from the Glue Catalog using the given database name. func (c *Catalog) getDatabase(ctx context.Context, databaseName string) (*types.Database, error) { database, err := c.glueSvc.GetDatabase(ctx, &glue.GetDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName)}) if err != nil { var notFoundErr *types.EntityNotFoundException if errors.As(err, &notFoundErr) { return nil, fmt.Errorf("failed to get namespace %s: %w", databaseName, catalog.ErrNoSuchNamespace) } return nil, fmt.Errorf("failed to get namespace %s: %w", databaseName, err) } if database.Database.Parameters[databaseTypePropsKey] != glueTypeIceberg { return nil, fmt.Errorf("namespace %s is not an iceberg namespace", databaseName) } return database.Database, nil } func identifierToGlueTable(identifier table.Identifier) (string, string, error) { if len(identifier) != 2 { return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) } return identifier[0], identifier[1], nil } func identifierToGlueDatabase(identifier table.Identifier) (string, error) { if len(identifier) != 1 { return "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) } return identifier[0], nil } // TableIdentifier returns a glue table identifier for an Iceberg table in the format [database, table]. func TableIdentifier(database string, tableName string) table.Identifier { return []string{database, tableName} } // DatabaseIdentifier returns a database identifier for a Glue database in the format [database]. func DatabaseIdentifier(database string) table.Identifier { return []string{database} } func filterTableListByType(database string, tableList []types.Table, tableType string) []table.Identifier { var filtered []table.Identifier for _, tbl := range tableList { if tbl.Parameters[tableTypePropsKey] != tableType { continue } filtered = append(filtered, TableIdentifier(database, aws.ToString(tbl.Name))) } return filtered } func filterDatabaseListByType(databases []types.Database, databaseType string) []table.Identifier { var filtered []table.Identifier for _, database := range databases { if database.Parameters[databaseTypePropsKey] != databaseType { continue } filtered = append(filtered, DatabaseIdentifier(aws.ToString(database.Name))) } return filtered } func buildGlueTableInput(ctx context.Context, database string, tableName string, staged *table.StagedTable, cat *Catalog) (*types.TableInput, error) { glueTable, err := cat.getTable(ctx, database, tableName) if err != nil { return nil, err } glueProperties := prepareProperties(staged.Properties(), staged.MetadataLocation()) description := staged.Properties()["comment"] if description == "" { description = aws.ToString(glueTable.Description) } existingColumnMap := map[string]string{} for _, column := range glueTable.StorageDescriptor.Columns { existingColumnMap[*column.Name] = *column.Comment } var glueColumns []types.Column for _, column := range schemaToGlueColumns(staged.Metadata().CurrentSchema(), true) { col := types.Column{ Name: column.Name, Comment: column.Comment, Type: column.Type, } if column.Comment == nil || *column.Comment == "" { col.Comment = aws.String(existingColumnMap[*column.Name]) } glueColumns = append(glueColumns, col) } return &types.TableInput{ Name: aws.String(tableName), Description: aws.String(description), Parameters: glueProperties, StorageDescriptor: &types.StorageDescriptor{ Location: aws.String(staged.Location()), Columns: glueColumns, }, }, nil } func prepareProperties(icebergProperties iceberg.Properties, newMetadataLocation string) iceberg.Properties { glueProperties := maps.Clone(icebergProperties) glueProperties[tableTypePropsKey] = glueTypeIceberg glueProperties[metadataLocationPropsKey] = newMetadataLocation return glueProperties }