func()

in catalog/glue/glue.go [243:306]


func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
	staged, err := internal.CreateStagedTable(ctx, c.props, c.LoadNamespaceProperties, identifier, schema, opts...)
	if err != nil {
		return nil, err
	}

	database, tableName, err := identifierToGlueTable(identifier)
	if err != nil {
		return nil, err
	}

	wfs, ok := staged.FS().(io.WriteFileIO)
	if !ok {
		return nil, errors.New("loaded filesystem IO does not support writing")
	}

	if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation()); err != nil {
		return nil, err
	}

	var tableDescription *string
	if desc := staged.Properties().Get("Description", ""); desc != "" {
		tableDescription = aws.String(desc)
	}

	tableInput := &types.TableInput{
		Name: aws.String(tableName),
		Parameters: map[string]string{
			tableTypePropsKey:        glueTypeIceberg,
			metadataLocationPropsKey: staged.MetadataLocation(),
		},
		TableType: aws.String("EXTERNAL_TABLE"),
		StorageDescriptor: &types.StorageDescriptor{
			Location: aws.String(staged.Metadata().Location()),
			Columns:  schemaToGlueColumns(schema, true),
		},
		Description: tableDescription,
	}
	_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
		CatalogId:    c.catalogId,
		DatabaseName: aws.String(database),
		TableInput:   tableInput,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
	}
	createdTable, err := c.LoadTable(ctx, identifier, nil)
	if err != nil {
		// Attempt to clean up the table if loading fails
		_, cleanupErr := c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{
			CatalogId:    c.catalogId,
			DatabaseName: aws.String(database),
			Name:         aws.String(tableName),
		})
		if cleanupErr != nil {
			return nil, fmt.Errorf("failed to create table %s.%s and cleanup failed: %v (original error: %w)",
				database, tableName, cleanupErr, err)
		}

		return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
	}

	return createdTable, nil
}