in catalog/glue/glue.go [243:306]
func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, identifier, schema, opts...)
if err != nil {
return nil, err
}
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
}
wfs, ok := staged.FS().(io.WriteFileIO)
if !ok {
return nil, errors.New("loaded filesystem IO does not support writing")
}
if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil {
return nil, err
}
var tableDescription *string
if desc := staged.Properties().Get("Description", ""); desc != "" {
tableDescription = aws.String(desc)
}
tableInput := &types.TableInput{
Name: aws.String(tableName),
Parameters: map[string]string{
tableTypePropsKey: glueTypeIceberg,
metadataLocationPropsKey: staged.MetadataLocation(),
},
TableType: aws.String("EXTERNAL_TABLE"),
StorageDescriptor: &types.StorageDescriptor{
Location: aws.String(staged.Metadata().Location()),
Columns: schemaToGlueColumns(schema, true),
},
Description: tableDescription,
}
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
TableInput: tableInput,
})
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
}
createdTable, err := c.LoadTable(ctx, identifier, nil)
if err != nil {
// Attempt to clean up the table if loading fails
_, cleanupErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
Name: aws.String(tableName),
})
if cleanupErr != nil {
return nil, fmt.Errorf("failed to create table %s.%s and cleanup failed: %v (original error: %w)",
database, tableName, cleanupErr, err)
}
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
}
return createdTable, nil
}