func()

in catalog/sql/sql.go [454:511]


func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
	fromNs := strings.Join(catalog.NamespaceFromIdent(from), ".")
	fromTbl := catalog.TableNameFromIdent(from)

	toNs := strings.Join(catalog.NamespaceFromIdent(to), ".")
	toTbl := catalog.TableNameFromIdent(to)

	exists, err := c.namespaceExists(ctx, toNs)
	if err != nil {
		return nil, err
	}
	if !exists {
		return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, toNs)
	}

	err = withWriteTx(ctx, c.db, func(ctx context.Context, tx bun.Tx) error {
		exists, err := tx.NewSelect().Model(&sqlIcebergTable{
			CatalogName:    c.name,
			TableNamespace: toNs,
			TableName:      toTbl,
		}).WherePK().Exists(ctx)
		if err != nil {
			return fmt.Errorf("error encountered checking existence of table '%s': %w", to, err)
		}

		if exists {
			return catalog.ErrTableAlreadyExists
		}

		res, err := tx.NewUpdate().Model(&sqlIcebergTable{
			CatalogName:    c.name,
			TableNamespace: fromNs,
			TableName:      fromTbl,
		}).WherePK().
			Set("table_namespace = ?", toNs).
			Set("table_name = ?", toTbl).
			Exec(ctx)
		if err != nil {
			return fmt.Errorf("error renaming table from '%s' to %s': %w", from, to, err)
		}

		n, err := res.RowsAffected()
		if err != nil {
			return fmt.Errorf("error renaming table from '%s' to %s': %w", from, to, err)
		}

		if n == 0 {
			return fmt.Errorf("%w: %s", catalog.ErrNoSuchTable, from)
		}

		return nil
	})
	if err != nil {
		return nil, err
	}

	return c.LoadTable(ctx, to, nil)
}