in catalog/sql/sql.go [454:511]
func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
fromNs := strings.Join(catalog.NamespaceFromIdent(from), ".")
fromTbl := catalog.TableNameFromIdent(from)
toNs := strings.Join(catalog.NamespaceFromIdent(to), ".")
toTbl := catalog.TableNameFromIdent(to)
exists, err := c.namespaceExists(ctx, toNs)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, toNs)
}
err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error {
exists, err := tx.NewSelect().Model(&sqlIcebergTable{
CatalogName: c.name,
TableNamespace: toNs,
TableName: toTbl,
}).WherePK().Exists(ctx)
if err != nil {
return fmt.Errorf("error encountered checking existence of table '%s': %w", to, err)
}
if exists {
return catalog.ErrTableAlreadyExists
}
res, err := tx.NewUpdate().Model(&sqlIcebergTable{
CatalogName: c.name,
TableNamespace: fromNs,
TableName: fromTbl,
}).WherePK().
Set("table_namespace = ?", toNs).
Set("table_name = ?", toTbl).
Exec(ctx)
if err != nil {
return fmt.Errorf("error renaming table from '%s' to %s': %w", from, to, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error renaming table from '%s' to %s': %w", from, to, err)
}
if n == 0 {
return fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, from)
}
return nil
})
if err != nil {
return nil, err
}
return c.LoadTable(ctx, to, nil)
}