# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import (
    TYPE_CHECKING,
    List,
    Optional,
    Set,
    Tuple,
    Union,
)

from sqlalchemy import (
    String,
    create_engine,
    delete,
    insert,
    select,
    union,
    update,
)
from sqlalchemy.exc import IntegrityError, NoResultFound, OperationalError, ProgrammingError
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    MappedAsDataclass,
    Session,
    mapped_column,
)

from pyiceberg.catalog import (
    METADATA_LOCATION,
    Catalog,
    MetastoreCatalog,
    PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
    CommitFailedException,
    NamespaceAlreadyExistsError,
    NamespaceNotEmptyError,
    NoSuchNamespaceError,
    NoSuchPropertyException,
    NoSuchTableError,
    TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
    TableRequirement,
    TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import strtobool

if TYPE_CHECKING:
    import pyarrow as pa

DEFAULT_ECHO_VALUE = "false"
DEFAULT_POOL_PRE_PING_VALUE = "false"
DEFAULT_INIT_CATALOG_TABLES = "true"


class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase):
    pass


class IcebergTables(SqlCatalogBaseTable):
    __tablename__ = "iceberg_tables"

    catalog_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
    table_namespace: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
    table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
    metadata_location: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True)
    previous_metadata_location: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True)


class IcebergNamespaceProperties(SqlCatalogBaseTable):
    __tablename__ = "iceberg_namespace_properties"
    # Catalog minimum Namespace Properties
    NAMESPACE_MINIMAL_PROPERTIES = {"exists": "true"}

    catalog_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
    namespace: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
    property_key: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
    property_value: Mapped[str] = mapped_column(String(1000), nullable=False)


class SqlCatalog(MetastoreCatalog):
    """Implementation of a SQL based catalog.

    In the `JDBCCatalog` implementation, a `Namespace` is composed of a list of strings separated by dots: `'ns1.ns2.ns3'`.
    And you can have as many levels as you want, but you need at least one.  The `SqlCatalog` honors the same convention.

    In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an optional `Namespace` and a table name.
    When a `Namespace` is present, the full name will be `'ns1.ns2.ns3.table'`.  A valid `TableIdentifier` could be `'name'` (no namespace).
    The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`.
    """

    def __init__(self, name: str, **properties: str):
        super().__init__(name, **properties)

        if not (uri_prop := self.properties.get("uri")):
            raise NoSuchPropertyException("SQL connection URI is required")

        echo_str = str(self.properties.get("echo", DEFAULT_ECHO_VALUE)).lower()
        echo = strtobool(echo_str) if echo_str != "debug" else "debug"
        pool_pre_ping = strtobool(self.properties.get("pool_pre_ping", DEFAULT_POOL_PRE_PING_VALUE))
        init_catalog_tables = strtobool(self.properties.get("init_catalog_tables", DEFAULT_INIT_CATALOG_TABLES))

        self.engine = create_engine(uri_prop, echo=echo, pool_pre_ping=pool_pre_ping)

        if init_catalog_tables:
            self._ensure_tables_exist()

    def _ensure_tables_exist(self) -> None:
        with Session(self.engine) as session:
            for table in [IcebergTables, IcebergNamespaceProperties]:
                stmt = select(1).select_from(table)
                try:
                    session.scalar(stmt)
                except (
                    OperationalError,
                    ProgrammingError,
                ):  # sqlalchemy returns OperationalError in case of sqlite and ProgrammingError with postgres.
                    self.create_tables()
                    return

    def create_tables(self) -> None:
        SqlCatalogBaseTable.metadata.create_all(self.engine)

    def destroy_tables(self) -> None:
        SqlCatalogBaseTable.metadata.drop_all(self.engine)

    def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
        # Check for expected properties.
        if not (metadata_location := orm_table.metadata_location):
            raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing")
        if not (table_namespace := orm_table.table_namespace):
            raise NoSuchTableError(f"Table property {IcebergTables.table_namespace} is missing")
        if not (table_name := orm_table.table_name):
            raise NoSuchTableError(f"Table property {IcebergTables.table_name} is missing")

        io = load_file_io(properties=self.properties, location=metadata_location)
        file = io.new_input(metadata_location)
        metadata = FromInputFile.table_metadata(file)
        return Table(
            identifier=Catalog.identifier_to_tuple(table_namespace) + (table_name,),
            metadata=metadata,
            metadata_location=metadata_location,
            io=self._load_file_io(metadata.properties, metadata_location),
            catalog=self,
        )

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
        """
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        """
        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore

        namespace_identifier = Catalog.namespace_from(identifier)
        table_name = Catalog.table_name_from(identifier)
        if not self._namespace_exists(namespace_identifier):
            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace_identifier}")

        namespace = Catalog.namespace_to_string(namespace_identifier)
        location = self._resolve_table_location(location, namespace, table_name)
        location_provider = load_location_provider(table_location=location, table_properties=properties)
        metadata_location = location_provider.new_table_metadata_file_location()
        metadata = new_table_metadata(
            location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
        )
        io = load_file_io(properties=self.properties, location=metadata_location)
        self._write_metadata(metadata, io, metadata_location)

        with Session(self.engine) as session:
            try:
                session.add(
                    IcebergTables(
                        catalog_name=self.name,
                        table_namespace=namespace,
                        table_name=table_name,
                        metadata_location=metadata_location,
                        previous_metadata_location=None,
                    )
                )
                session.commit()
            except IntegrityError as e:
                raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e

        return self.load_table(identifier=identifier)

    def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
        """Register a new table using existing metadata.

        Args:
            identifier Union[str, Identifier]: Table identifier for the table
            metadata_location str: The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
            NoSuchNamespaceError: If namespace does not exist
        """
        namespace_tuple = Catalog.namespace_from(identifier)
        namespace = Catalog.namespace_to_string(namespace_tuple)
        table_name = Catalog.table_name_from(identifier)
        if not self._namespace_exists(namespace):
            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

        with Session(self.engine) as session:
            try:
                session.add(
                    IcebergTables(
                        catalog_name=self.name,
                        table_namespace=namespace,
                        table_name=table_name,
                        metadata_location=metadata_location,
                        previous_metadata_location=None,
                    )
                )
                session.commit()
            except IntegrityError as e:
                raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e

        return self.load_table(identifier=identifier)

    def load_table(self, identifier: Union[str, Identifier]) -> Table:
        """Load the table's metadata and return the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except NoSuchTableError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier (str | Identifier): Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        """
        namespace_tuple = Catalog.namespace_from(identifier)
        namespace = Catalog.namespace_to_string(namespace_tuple)
        table_name = Catalog.table_name_from(identifier)
        with Session(self.engine) as session:
            stmt = select(IcebergTables).where(
                IcebergTables.catalog_name == self.name,
                IcebergTables.table_namespace == namespace,
                IcebergTables.table_name == table_name,
            )
            result = session.scalar(stmt)
        if result:
            return self._convert_orm_to_iceberg(result)
        raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")

    def drop_table(self, identifier: Union[str, Identifier]) -> None:
        """Drop a table.

        Args:
            identifier (str | Identifier): Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        """
        namespace_tuple = Catalog.namespace_from(identifier)
        namespace = Catalog.namespace_to_string(namespace_tuple)
        table_name = Catalog.table_name_from(identifier)
        with Session(self.engine) as session:
            if self.engine.dialect.supports_sane_rowcount:
                res = session.execute(
                    delete(IcebergTables).where(
                        IcebergTables.catalog_name == self.name,
                        IcebergTables.table_namespace == namespace,
                        IcebergTables.table_name == table_name,
                    )
                )
                if res.rowcount < 1:
                    raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")
            else:
                try:
                    tbl = (
                        session.query(IcebergTables)
                        .with_for_update(of=IcebergTables)
                        .filter(
                            IcebergTables.catalog_name == self.name,
                            IcebergTables.table_namespace == namespace,
                            IcebergTables.table_name == table_name,
                        )
                        .one()
                    )
                    session.delete(tbl)
                except NoResultFound as e:
                    raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") from e
            session.commit()

    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
        """Rename a fully classified table name.

        Args:
            from_identifier (str | Identifier): Existing table identifier.
            to_identifier (str | Identifier): New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
            TableAlreadyExistsError: If a table with the new name already exist.
            NoSuchNamespaceError: If the target namespace does not exist.
        """
        from_namespace_tuple = Catalog.namespace_from(from_identifier)
        from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
        from_table_name = Catalog.table_name_from(from_identifier)
        to_namespace_tuple = Catalog.namespace_from(to_identifier)
        to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
        to_table_name = Catalog.table_name_from(to_identifier)
        if not self._namespace_exists(to_namespace):
            raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}")
        with Session(self.engine) as session:
            try:
                if self.engine.dialect.supports_sane_rowcount:
                    stmt = (
                        update(IcebergTables)
                        .where(
                            IcebergTables.catalog_name == self.name,
                            IcebergTables.table_namespace == from_namespace,
                            IcebergTables.table_name == from_table_name,
                        )
                        .values(table_namespace=to_namespace, table_name=to_table_name)
                    )
                    result = session.execute(stmt)
                    if result.rowcount < 1:
                        raise NoSuchTableError(f"Table does not exist: {from_table_name}")
                else:
                    try:
                        tbl = (
                            session.query(IcebergTables)
                            .with_for_update(of=IcebergTables)
                            .filter(
                                IcebergTables.catalog_name == self.name,
                                IcebergTables.table_namespace == from_namespace,
                                IcebergTables.table_name == from_table_name,
                            )
                            .one()
                        )
                        tbl.table_namespace = to_namespace
                        tbl.table_name = to_table_name
                    except NoResultFound as e:
                        raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e
                session.commit()
            except IntegrityError as e:
                raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
        return self.load_table(to_identifier)

    def commit_table(
        self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
    ) -> CommitTableResponse:
        """Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        """
        table_identifier = table.name()
        namespace_tuple = Catalog.namespace_from(table_identifier)
        namespace = Catalog.namespace_to_string(namespace_tuple)
        table_name = Catalog.table_name_from(table_identifier)

        current_table: Optional[Table]
        try:
            current_table = self.load_table(table_identifier)
        except NoSuchTableError:
            current_table = None

        updated_staged_table = self._update_and_stage_table(current_table, table.name(), requirements, updates)
        if current_table and updated_staged_table.metadata == current_table.metadata:
            # no changes, do nothing
            return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
        self._write_metadata(
            metadata=updated_staged_table.metadata,
            io=updated_staged_table.io,
            metadata_path=updated_staged_table.metadata_location,
        )

        with Session(self.engine) as session:
            if current_table:
                # table exists, update it
                if self.engine.dialect.supports_sane_rowcount:
                    stmt = (
                        update(IcebergTables)
                        .where(
                            IcebergTables.catalog_name == self.name,
                            IcebergTables.table_namespace == namespace,
                            IcebergTables.table_name == table_name,
                            IcebergTables.metadata_location == current_table.metadata_location,
                        )
                        .values(
                            metadata_location=updated_staged_table.metadata_location,
                            previous_metadata_location=current_table.metadata_location,
                        )
                    )
                    result = session.execute(stmt)
                    if result.rowcount < 1:
                        raise CommitFailedException(f"Table has been updated by another process: {namespace}.{table_name}")
                else:
                    try:
                        tbl = (
                            session.query(IcebergTables)
                            .with_for_update(of=IcebergTables)
                            .filter(
                                IcebergTables.catalog_name == self.name,
                                IcebergTables.table_namespace == namespace,
                                IcebergTables.table_name == table_name,
                                IcebergTables.metadata_location == current_table.metadata_location,
                            )
                            .one()
                        )
                        tbl.metadata_location = updated_staged_table.metadata_location
                        tbl.previous_metadata_location = current_table.metadata_location
                    except NoResultFound as e:
                        raise CommitFailedException(f"Table has been updated by another process: {namespace}.{table_name}") from e
                session.commit()
            else:
                # table does not exist, create it
                try:
                    session.add(
                        IcebergTables(
                            catalog_name=self.name,
                            table_namespace=namespace,
                            table_name=table_name,
                            metadata_location=updated_staged_table.metadata_location,
                            previous_metadata_location=None,
                        )
                    )
                    session.commit()
                except IntegrityError as e:
                    raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e

        return CommitTableResponse(
            metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
        )

    def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool:
        namespace_tuple = Catalog.identifier_to_tuple(identifier)
        namespace = Catalog.namespace_to_string(namespace_tuple, NoSuchNamespaceError)
        namespace_starts_with = namespace.replace("!", "!!").replace("_", "!_").replace("%", "!%") + ".%"

        with Session(self.engine) as session:
            stmt = (
                select(IcebergTables)
                .where(
                    IcebergTables.catalog_name == self.name,
                    (IcebergTables.table_namespace == namespace)
                    | (IcebergTables.table_namespace.like(namespace_starts_with, escape="!")),
                )
                .limit(1)
            )
            result = session.execute(stmt).all()
            if result:
                return True
            stmt = (
                select(IcebergNamespaceProperties)
                .where(
                    IcebergNamespaceProperties.catalog_name == self.name,
                    (IcebergNamespaceProperties.namespace == namespace)
                    | (IcebergNamespaceProperties.namespace.like(namespace_starts_with, escape="!")),
                )
                .limit(1)
            )
            result = session.execute(stmt).all()
            if result:
                return True
        return False

    def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
        """Create a namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier.
            properties (Properties): A string dictionary of properties for the given namespace.

        Raises:
            NamespaceAlreadyExistsError: If a namespace with the given name already exists.
        """
        if self._namespace_exists(namespace):
            raise NamespaceAlreadyExistsError(f"Namespace {namespace} already exists")

        if not properties:
            properties = IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES
        create_properties = properties if properties else IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES
        with Session(self.engine) as session:
            for key, value in create_properties.items():
                session.add(
                    IcebergNamespaceProperties(
                        catalog_name=self.name,
                        namespace=Catalog.namespace_to_string(namespace, NoSuchNamespaceError),
                        property_key=key,
                        property_value=value,
                    )
                )
            session.commit()

    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
        """Drop a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
            NamespaceNotEmptyError: If the namespace is not empty.
        """
        if not self._namespace_exists(namespace):
            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

        namespace_str = Catalog.namespace_to_string(namespace)
        if tables := self.list_tables(namespace):
            raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(tables)} tables exist.")

        with Session(self.engine) as session:
            session.execute(
                delete(IcebergNamespaceProperties).where(
                    IcebergNamespaceProperties.catalog_name == self.name,
                    IcebergNamespaceProperties.namespace == namespace_str,
                )
            )
            session.commit()

    def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        """List tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """
        if namespace and not self._namespace_exists(namespace):
            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

        namespace = Catalog.namespace_to_string(namespace)
        stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace)
        with Session(self.engine) as session:
            result = session.scalars(stmt)
            return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result]

    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
        """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: a List of namespace identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """
        if namespace and not self._namespace_exists(namespace):
            raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

        table_stmt = select(IcebergTables.table_namespace).where(IcebergTables.catalog_name == self.name)
        namespace_stmt = select(IcebergNamespaceProperties.namespace).where(IcebergNamespaceProperties.catalog_name == self.name)
        if namespace:
            namespace_like = Catalog.namespace_to_string(namespace, NoSuchNamespaceError) + "%"
            table_stmt = table_stmt.where(IcebergTables.table_namespace.like(namespace_like))
            namespace_stmt = namespace_stmt.where(IcebergNamespaceProperties.namespace.like(namespace_like))
        stmt = union(
            table_stmt,
            namespace_stmt,
        )
        with Session(self.engine) as session:
            namespace_tuple = Catalog.identifier_to_tuple(namespace)
            sub_namespaces_level_length = len(namespace_tuple) + 1

            namespaces = list(
                {  # only get distinct namespaces
                    ns[:sub_namespaces_level_length]  # truncate to the required level
                    for ns in {Catalog.identifier_to_tuple(ns) for ns in session.execute(stmt).scalars()}
                    if len(ns) >= sub_namespaces_level_length  # only get sub namespaces/children
                    and ns[: sub_namespaces_level_length - 1] == namespace_tuple
                    # exclude fuzzy matches when `namespace` contains `%` or `_`
                }
            )

            return namespaces

    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
        """Get properties for a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """
        namespace_str = Catalog.namespace_to_string(namespace)
        if not self._namespace_exists(namespace):
            raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists")

        stmt = select(IcebergNamespaceProperties).where(
            IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == namespace_str
        )
        with Session(self.engine) as session:
            result = session.scalars(stmt)
            return {props.property_key: props.property_value for props in result}

    def update_namespace_properties(
        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
    ) -> PropertiesUpdateSummary:
        """Remove provided property keys and update properties for a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.
            removals (Set[str]): Set of property keys that need to be removed. Optional Argument.
            updates (Properties): Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
            ValueError: If removals and updates have overlapping keys.
        """
        namespace_str = Catalog.namespace_to_string(namespace)
        if not self._namespace_exists(namespace):
            raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists")

        current_properties = self.load_namespace_properties(namespace=namespace)
        properties_update_summary = self._get_updated_props_and_update_summary(
            current_properties=current_properties, removals=removals, updates=updates
        )[0]

        with Session(self.engine) as session:
            if removals:
                delete_stmt = delete(IcebergNamespaceProperties).where(
                    IcebergNamespaceProperties.catalog_name == self.name,
                    IcebergNamespaceProperties.namespace == namespace_str,
                    IcebergNamespaceProperties.property_key.in_(removals),
                )
                session.execute(delete_stmt)

            if updates:
                # SQLAlchemy does not (yet) support engine agnostic UPSERT
                # https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-upsert-statements
                # This is not a problem since it runs in a single transaction
                delete_stmt = delete(IcebergNamespaceProperties).where(
                    IcebergNamespaceProperties.catalog_name == self.name,
                    IcebergNamespaceProperties.namespace == namespace_str,
                    IcebergNamespaceProperties.property_key.in_(set(updates.keys())),
                )
                session.execute(delete_stmt)
                insert_stmt_values = [
                    {
                        IcebergNamespaceProperties.catalog_name: self.name,
                        IcebergNamespaceProperties.namespace: namespace_str,
                        IcebergNamespaceProperties.property_key: property_key,
                        IcebergNamespaceProperties.property_value: property_value,
                    }
                    for property_key, property_value in updates.items()
                ]
                insert_stmt = insert(IcebergNamespaceProperties).values(insert_stmt_values)
                session.execute(insert_stmt)
            session.commit()
        return properties_update_summary

    def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        raise NotImplementedError

    def view_exists(self, identifier: Union[str, Identifier]) -> bool:
        raise NotImplementedError

    def drop_view(self, identifier: Union[str, Identifier]) -> None:
        raise NotImplementedError
