#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.
import getpass
import logging
import socket
import time
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Type,
    Union,
)
from urllib.parse import urlparse

from hive_metastore.ThriftHiveMetastore import Client
from hive_metastore.ttypes import (
    AlreadyExistsException,
    CheckLockRequest,
    EnvironmentContext,
    FieldSchema,
    InvalidOperationException,
    LockComponent,
    LockLevel,
    LockRequest,
    LockResponse,
    LockState,
    LockType,
    MetaException,
    NoSuchObjectException,
    SerDeInfo,
    StorageDescriptor,
    UnlockRequest,
)
from hive_metastore.ttypes import Database as HiveDatabase
from hive_metastore.ttypes import Table as HiveTable
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport

from pyiceberg.catalog import (
    EXTERNAL_TABLE,
    ICEBERG,
    LOCATION,
    METADATA_LOCATION,
    TABLE_TYPE,
    MetastoreCatalog,
    PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
    CommitFailedException,
    NamespaceAlreadyExistsError,
    NamespaceNotEmptyError,
    NoSuchIcebergTableError,
    NoSuchNamespaceError,
    NoSuchPropertyException,
    NoSuchTableError,
    TableAlreadyExistsError,
    WaitingForLockException,
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
    CommitTableResponse,
    StagedTable,
    Table,
    TableProperties,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
    TableRequirement,
    TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import (
    BinaryType,
    BooleanType,
    DateType,
    DecimalType,
    DoubleType,
    FixedType,
    FloatType,
    IntegerType,
    ListType,
    LongType,
    MapType,
    NestedField,
    PrimitiveType,
    StringType,
    StructType,
    TimestampType,
    TimestamptzType,
    TimeType,
    UnknownType,
    UUIDType,
)
from pyiceberg.utils.properties import property_as_bool, property_as_float

if TYPE_CHECKING:
    import pyarrow as pa


COMMENT = "comment"
OWNER = "owner"

# If set to true, HiveCatalog will operate in Hive2 compatibility mode
HIVE2_COMPATIBLE = "hive.hive2-compatible"
HIVE2_COMPATIBLE_DEFAULT = False

HIVE_KERBEROS_AUTH = "hive.kerberos-authentication"
HIVE_KERBEROS_AUTH_DEFAULT = False

LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
LOCK_CHECK_RETRIES = "lock-check-retries"
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1  # 100 milliseconds
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60  # 1 min
DEFAULT_LOCK_CHECK_RETRIES = 4
DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS"
DO_NOT_UPDATE_STATS_DEFAULT = "true"

logger = logging.getLogger(__name__)


class _HiveClient:
    """Helper class to nicely open and close the transport."""

    _transport: TTransport
    _ugi: Optional[List[str]]

    def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[bool] = HIVE_KERBEROS_AUTH_DEFAULT):
        self._uri = uri
        self._kerberos_auth = kerberos_auth
        self._ugi = ugi.split(":") if ugi else None
        self._transport = self._init_thrift_transport()

    def _init_thrift_transport(self) -> TTransport:
        url_parts = urlparse(self._uri)
        socket = TSocket.TSocket(url_parts.hostname, url_parts.port)
        if not self._kerberos_auth:
            return TTransport.TBufferedTransport(socket)
        else:
            return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive")

    def _client(self) -> Client:
        protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
        client = Client(protocol)
        if self._ugi:
            client.set_ugi(*self._ugi)
        return client

    def __enter__(self) -> Client:
        """Make sure the transport is initialized and open."""
        if not self._transport.isOpen():
            try:
                self._transport.open()
            except (TypeError, TTransport.TTransportException):
                # reinitialize _transport
                self._transport = self._init_thrift_transport()
                self._transport.open()
        return self._client()  # recreate the client

    def __exit__(
        self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
    ) -> None:
        """Close transport if it was opened."""
        if self._transport.isOpen():
            self._transport.close()


def _construct_hive_storage_descriptor(
    schema: Schema, location: Optional[str], hive2_compatible: bool = False
) -> StorageDescriptor:
    ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
    return StorageDescriptor(
        [
            FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter(hive2_compatible)), field.doc)
            for field in schema.fields
        ],
        location,
        "org.apache.hadoop.mapred.FileInputFormat",
        "org.apache.hadoop.mapred.FileOutputFormat",
        serdeInfo=ser_de_info,
    )


PROP_EXTERNAL = "EXTERNAL"
PROP_TABLE_TYPE = "table_type"
PROP_METADATA_LOCATION = "metadata_location"
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT}


def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]:
    properties = {PROP_EXTERNAL: "TRUE", PROP_TABLE_TYPE: "ICEBERG", PROP_METADATA_LOCATION: metadata_location}
    if previous_metadata_location:
        properties[PROP_PREVIOUS_METADATA_LOCATION] = previous_metadata_location

    return properties


def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveDatabase:
    params = {}
    for key, value in properties.items():
        if key == COMMENT:
            database.description = value
        elif key == LOCATION:
            database.locationUri = value
        else:
            params[key] = value
    database.parameters = params
    return database


HIVE_PRIMITIVE_TYPES = {
    BooleanType: "boolean",
    IntegerType: "int",
    LongType: "bigint",
    FloatType: "float",
    DoubleType: "double",
    DateType: "date",
    TimeType: "string",
    TimestampType: "timestamp",
    TimestamptzType: "timestamp with local time zone",
    StringType: "string",
    UUIDType: "string",
    BinaryType: "binary",
    FixedType: "binary",
    UnknownType: "void",
}


class SchemaToHiveConverter(SchemaVisitor[str]):
    hive2_compatible: bool

    def __init__(self, hive2_compatible: bool):
        self.hive2_compatible = hive2_compatible

    def schema(self, schema: Schema, struct_result: str) -> str:
        return struct_result

    def struct(self, struct: StructType, field_results: List[str]) -> str:
        return f"struct<{','.join(field_results)}>"

    def field(self, field: NestedField, field_result: str) -> str:
        return f"{field.name}:{field_result}"

    def list(self, list_type: ListType, element_result: str) -> str:
        return f"array<{element_result}>"

    def map(self, map_type: MapType, key_result: str, value_result: str) -> str:
        # Key has to be primitive for Hive
        return f"map<{key_result},{value_result}>"

    def primitive(self, primitive: PrimitiveType) -> str:
        if isinstance(primitive, DecimalType):
            return f"decimal({primitive.precision},{primitive.scale})"
        elif self.hive2_compatible and isinstance(primitive, TimestamptzType):
            # Hive2 doesn't support timestamp with local time zone
            return "timestamp"
        else:
            return HIVE_PRIMITIVE_TYPES[type(primitive)]


class HiveCatalog(MetastoreCatalog):
    _client: _HiveClient

    def __init__(self, name: str, **properties: str):
        super().__init__(name, **properties)
        self._client = self._create_hive_client(properties)

        self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
        self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
        self._lock_check_retries = property_as_float(
            properties,
            LOCK_CHECK_RETRIES,
            DEFAULT_LOCK_CHECK_RETRIES,
        )

    @staticmethod
    def _create_hive_client(properties: Dict[str, str]) -> _HiveClient:
        last_exception = None
        for uri in properties["uri"].split(","):
            try:
                return _HiveClient(
                    uri,
                    properties.get("ugi"),
                    property_as_bool(properties, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_AUTH_DEFAULT),
                )
            except BaseException as e:
                last_exception = e
        if last_exception is not None:
            raise last_exception
        else:
            raise ValueError(f"Unable to connect to hive using uri: {properties['uri']}")

    def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
        properties: Dict[str, str] = table.parameters
        if TABLE_TYPE not in properties:
            raise NoSuchPropertyException(
                f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}"
            )

        table_type = properties[TABLE_TYPE]
        if table_type.lower() != ICEBERG:
            raise NoSuchIcebergTableError(
                f"Property table_type is {table_type}, expected {ICEBERG}: {table.dbName}.{table.tableName}"
            )

        if prop_metadata_location := properties.get(METADATA_LOCATION):
            metadata_location = prop_metadata_location
        else:
            raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing")

        io = self._load_file_io(location=metadata_location)
        file = io.new_input(metadata_location)
        metadata = FromInputFile.table_metadata(file)
        return Table(
            identifier=(table.dbName, table.tableName),
            metadata=metadata,
            metadata_location=metadata_location,
            io=self._load_file_io(metadata.properties, metadata_location),
            catalog=self,
        )

    def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
        identifier_tuple = table.name()
        database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
        current_time_millis = int(time.time() * 1000)

        return HiveTable(
            dbName=database_name,
            tableName=table_name,
            owner=table.properties[OWNER] if table.properties and OWNER in table.properties else getpass.getuser(),
            createTime=current_time_millis // 1000,
            lastAccessTime=current_time_millis // 1000,
            sd=_construct_hive_storage_descriptor(
                table.schema(),
                table.location(),
                property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
            ),
            tableType=EXTERNAL_TABLE,
            parameters=_construct_parameters(table.metadata_location),
        )

    def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None:
        try:
            open_client.create_table(hive_table)
        except AlreadyExistsException as e:
            raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e

    def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable:
        try:
            return open_client.get_table(dbname=database_name, tbl_name=table_name)
        except NoSuchObjectException as e:
            raise NoSuchTableError(f"Table does not exists: {table_name}") from e

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
        """Create a table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid.
        """
        properties = {**DEFAULT_PROPERTIES, **properties}
        staged_table = self._create_staged_table(
            identifier=identifier,
            schema=schema,
            location=location,
            partition_spec=partition_spec,
            sort_order=sort_order,
            properties=properties,
        )
        database_name, table_name = self.identifier_to_database_and_table(identifier)

        self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
        tbl = self._convert_iceberg_into_hive(staged_table)

        with self._client as open_client:
            self._create_hive_table(open_client, tbl)
            hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)

        return self._convert_hive_into_iceberg(hive_table)

    def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
        """Register a new table using existing metadata.

        Args:
            identifier Union[str, Identifier]: Table identifier for the table
            metadata_location str: The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier)
        io = self._load_file_io(location=metadata_location)
        metadata_file = io.new_input(metadata_location)
        staged_table = StagedTable(
            identifier=(database_name, table_name),
            metadata=FromInputFile.table_metadata(metadata_file),
            metadata_location=metadata_location,
            io=io,
            catalog=self,
        )
        tbl = self._convert_iceberg_into_hive(staged_table)
        with self._client as open_client:
            self._create_hive_table(open_client, tbl)
            hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)

        return self._convert_hive_into_iceberg(hive_table)

    def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        raise NotImplementedError

    def view_exists(self, identifier: Union[str, Identifier]) -> bool:
        raise NotImplementedError

    def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest:
        lock_component: LockComponent = LockComponent(
            level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True
        )

        lock_request: LockRequest = LockRequest(component=[lock_component], user=getpass.getuser(), hostname=socket.gethostname())

        return lock_request

    def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
        @retry(
            retry=retry_if_exception_type(WaitingForLockException),
            wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
            stop=stop_after_attempt(self._lock_check_retries),
            reraise=True,
        )
        def _do_wait_for_lock() -> LockResponse:
            response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
            if response.state == LockState.ACQUIRED:
                return response
            elif response.state == LockState.WAITING:
                msg = f"Wait on lock for {database_name}.{table_name}"
                logger.warning(msg)
                raise WaitingForLockException(msg)
            else:
                raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")

        return _do_wait_for_lock()

    def commit_table(
        self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
    ) -> CommitTableResponse:
        """Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        """
        table_identifier = table.name()
        database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
        # commit to hive
        # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
        with self._client as open_client:
            lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))

            try:
                if lock.state != LockState.ACQUIRED:
                    if lock.state == LockState.WAITING:
                        self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
                    else:
                        raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

                hive_table: Optional[HiveTable]
                current_table: Optional[Table]
                try:
                    hive_table = self._get_hive_table(open_client, database_name, table_name)
                    current_table = self._convert_hive_into_iceberg(hive_table)
                except NoSuchTableError:
                    hive_table = None
                    current_table = None

                updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
                if current_table and updated_staged_table.metadata == current_table.metadata:
                    # no changes, do nothing
                    return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
                self._write_metadata(
                    metadata=updated_staged_table.metadata,
                    io=updated_staged_table.io,
                    metadata_path=updated_staged_table.metadata_location,
                )

                if hive_table and current_table:
                    # Table exists, update it.
                    hive_table.parameters = _construct_parameters(
                        metadata_location=updated_staged_table.metadata_location,
                        previous_metadata_location=current_table.metadata_location,
                    )
                    open_client.alter_table_with_environment_context(
                        dbname=database_name,
                        tbl_name=table_name,
                        new_tbl=hive_table,
                        environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
                    )
                else:
                    # Table does not exist, create it.
                    hive_table = self._convert_iceberg_into_hive(
                        StagedTable(
                            identifier=(database_name, table_name),
                            metadata=updated_staged_table.metadata,
                            metadata_location=updated_staged_table.metadata_location,
                            io=updated_staged_table.io,
                            catalog=self,
                        )
                    )
                    self._create_hive_table(open_client, hive_table)
            except WaitingForLockException as e:
                raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
            finally:
                open_client.unlock(UnlockRequest(lockid=lock.lockid))

        return CommitTableResponse(
            metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
        )

    def load_table(self, identifier: Union[str, Identifier]) -> Table:
        """Load the table's metadata and return the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier: Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

        with self._client as open_client:
            hive_table = self._get_hive_table(open_client, database_name, table_name)

        return self._convert_hive_into_iceberg(hive_table)

    def drop_table(self, identifier: Union[str, Identifier]) -> None:
        """Drop a table.

        Args:
            identifier: Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
        try:
            with self._client as open_client:
                open_client.drop_table(dbname=database_name, name=table_name, deleteData=False)
        except NoSuchObjectException as e:
            # When the namespace doesn't exist, it throws the same error
            raise NoSuchTableError(f"Table does not exists: {table_name}") from e

    def purge_table(self, identifier: Union[str, Identifier]) -> None:
        # This requires to traverse the reachability set, and drop all the data files.
        raise NotImplementedError("Not yet implemented")

    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
        """Rename a fully classified table name.

        Args:
            from_identifier: Existing table identifier.
            to_identifier: New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            ValueError: When from table identifier is invalid.
            NoSuchTableError: When a table with the name does not exist.
            NoSuchNamespaceError: When the destination namespace doesn't exist.
        """
        from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
        to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
        try:
            with self._client as open_client:
                tbl = open_client.get_table(dbname=from_database_name, tbl_name=from_table_name)
                tbl.dbName = to_database_name
                tbl.tableName = to_table_name
                open_client.alter_table_with_environment_context(
                    dbname=from_database_name,
                    tbl_name=from_table_name,
                    new_tbl=tbl,
                    environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
                )
        except NoSuchObjectException as e:
            raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e
        except InvalidOperationException as e:
            raise NoSuchNamespaceError(f"Database does not exists: {to_database_name}") from e
        return self.load_table(to_identifier)

    def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
        """Create a namespace in the catalog.

        Args:
            namespace: Namespace identifier.
            properties: A string dictionary of properties for the given namespace.

        Raises:
            ValueError: If the identifier is invalid.
            AlreadyExistsError: If a namespace with the given name already exists.
        """
        database_name = self.identifier_to_database(namespace)
        hive_database = HiveDatabase(name=database_name, parameters=properties)

        try:
            with self._client as open_client:
                open_client.create_database(_annotate_namespace(hive_database, properties))
        except AlreadyExistsException as e:
            raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e

    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
        """Drop a namespace.

        Args:
            namespace: Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
            NamespaceNotEmptyError: If the namespace is not empty.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        try:
            with self._client as open_client:
                open_client.drop_database(database_name, deleteData=False, cascade=False)
        except InvalidOperationException as e:
            raise NamespaceNotEmptyError(f"Database {database_name} is not empty") from e
        except MetaException as e:
            raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e

    def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        """List Iceberg tables under the given namespace in the catalog.

        When the database doesn't exist, it will just return an empty list.

        Args:
            namespace: Database to list.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        with self._client as open_client:
            return [
                (database_name, table.tableName)
                for table in open_client.get_table_objects_by_name(
                    dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
                )
                if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
            ]

    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
        """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

        Returns:
            List[Identifier]: a List of namespace identifiers.
        """
        # Hierarchical namespace is not supported. Return an empty list
        if namespace:
            return []

        with self._client as open_client:
            return list(map(self.identifier_to_tuple, open_client.get_all_databases()))

    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
        """Get properties for a namespace.

        Args:
            namespace: Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        try:
            with self._client as open_client:
                database = open_client.get_database(name=database_name)
                properties = database.parameters
                properties[LOCATION] = database.locationUri
                if comment := database.description:
                    properties[COMMENT] = comment
                return properties
        except NoSuchObjectException as e:
            raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e

    def update_namespace_properties(
        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
    ) -> PropertiesUpdateSummary:
        """Remove provided property keys and update properties for a namespace.

        Args:
            namespace: Namespace identifier.
            removals: Set of property keys that need to be removed. Optional Argument.
            updates: Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist
            ValueError: If removals and updates have overlapping keys.
        """
        self._check_for_overlap(updates=updates, removals=removals)
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        with self._client as open_client:
            try:
                database = open_client.get_database(database_name)
                parameters = database.parameters
            except NoSuchObjectException as e:
                raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e

            removed: Set[str] = set()
            updated: Set[str] = set()

            if removals:
                for key in removals:
                    if key in parameters:
                        parameters[key] = None
                        removed.add(key)
            if updates:
                for key, value in updates.items():
                    parameters[key] = value
                    updated.add(key)

            open_client.alter_database(database_name, _annotate_namespace(database, parameters))

        expected_to_change = (removals or set()).difference(removed)

        return PropertiesUpdateSummary(removed=list(removed or []), updated=list(updated or []), missing=list(expected_to_change))

    def drop_view(self, identifier: Union[str, Identifier]) -> None:
        raise NotImplementedError
