in catalog/sql/sql.go [315:381]
func (c *Catalog) CommitTable(ctx context.Context, tbl *table.Table, reqs []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
ns := catalog.NamespaceFromIdent(tbl.Identifier())
tblName := catalog.TableNameFromIdent(tbl.Identifier())
current, err := c.LoadTable(ctx, tbl.Identifier(), nil)
if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
return nil, "", err
}
staged, err := internal.UpdateAndStageTable(ctx, current, tbl.Identifier(), reqs, updates, c)
if err != nil {
return nil, "", err
}
if current != nil && staged.Metadata().Equals(current.Metadata()) {
// no changes, do nothing
return current.Metadata(), current.MetadataLocation(), nil
}
if err := internal.WriteMetadata(ctx, staged.Metadata(), staged.MetadataLocation(), staged.Properties()); err != nil {
return nil, "", err
}
err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error {
if current != nil {
res, err := tx.NewUpdate().Model(&sqlIcebergTable{
CatalogName: c.name,
TableNamespace: strings.Join(ns, "."),
TableName: tblName,
MetadataLocation: sql.NullString{Valid: true, String: staged.MetadataLocation()},
PreviousMetadataLocation: sql.NullString{Valid: true, String: current.MetadataLocation()},
}).WherePK().Where("metadata_location = ?", current.MetadataLocation()).
Exec(ctx)
if err != nil {
return fmt.Errorf("error updating table information: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error updating table information: %w", err)
}
if n == 0 {
return fmt.Errorf("table has been updated by another process: %s.%s", strings.Join(ns, "."), tblName)
}
return nil
}
_, err := tx.NewInsert().Model(&sqlIcebergTable{
CatalogName: c.name,
TableNamespace: strings.Join(ns, "."),
TableName: tblName,
MetadataLocation: sql.NullString{Valid: true, String: staged.MetadataLocation()},
}).Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
return nil
})
if err != nil {
return nil, "", err
}
return staged.Metadata(), staged.MetadataLocation(), nil
}