def rename_table()

in pyiceberg/catalog/sql.py [0:0]


    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
        """Rename a fully classified table name.

        Args:
            from_identifier (str | Identifier): Existing table identifier.
            to_identifier (str | Identifier): New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
            TableAlreadyExistsError: If a table with the new name already exist.
            NoSuchNamespaceError: If the target namespace does not exist.
        """
        from_namespace_tuple = Catalog.namespace_from(from_identifier)
        from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
        from_table_name = Catalog.table_name_from(from_identifier)
        to_namespace_tuple = Catalog.namespace_from(to_identifier)
        to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
        to_table_name = Catalog.table_name_from(to_identifier)
        if not self._namespace_exists(to_namespace):
            raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}")
        with Session(self.engine) as session:
            try:
                if self.engine.dialect.supports_sane_rowcount:
                    stmt = (
                        update(IcebergTables)
                        .where(
                            IcebergTables.catalog_name == self.name,
                            IcebergTables.table_namespace == from_namespace,
                            IcebergTables.table_name == from_table_name,
                        )
                        .values(table_namespace=to_namespace, table_name=to_table_name)
                    )
                    result = session.execute(stmt)
                    if result.rowcount < 1:
                        raise NoSuchTableError(f"Table does not exist: {from_table_name}")
                else:
                    try:
                        tbl = (
                            session.query(IcebergTables)
                            .with_for_update(of=IcebergTables)
                            .filter(
                                IcebergTables.catalog_name == self.name,
                                IcebergTables.table_namespace == from_namespace,
                                IcebergTables.table_name == from_table_name,
                            )
                            .one()
                        )
                        tbl.table_namespace = to_namespace
                        tbl.table_name = to_table_name
                    except NoResultFound as e:
                        raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e
                session.commit()
            except IntegrityError as e:
                raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
        return self.load_table(to_identifier)