#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.


from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Union,
    cast,
)

import boto3
from botocore.config import Config
from mypy_boto3_glue.client import GlueClient
from mypy_boto3_glue.type_defs import (
    ColumnTypeDef,
    DatabaseInputTypeDef,
    DatabaseTypeDef,
    StorageDescriptorTypeDef,
    TableInputTypeDef,
    TableTypeDef,
)

from pyiceberg.catalog import (
    BOTOCORE_SESSION,
    EXTERNAL_TABLE,
    ICEBERG,
    LOCATION,
    METADATA_LOCATION,
    PREVIOUS_METADATA_LOCATION,
    TABLE_TYPE,
    MetastoreCatalog,
    PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
    CommitFailedException,
    NamespaceAlreadyExistsError,
    NamespaceNotEmptyError,
    NoSuchIcebergTableError,
    NoSuchNamespaceError,
    NoSuchPropertyException,
    NoSuchTableError,
    TableAlreadyExistsError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
    CommitTableResponse,
    Table,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
    TableRequirement,
    TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import (
    BinaryType,
    BooleanType,
    DateType,
    DecimalType,
    DoubleType,
    FixedType,
    FloatType,
    IntegerType,
    ListType,
    LongType,
    MapType,
    NestedField,
    PrimitiveType,
    StringType,
    StructType,
    TimestampType,
    TimestamptzType,
    TimeType,
    UUIDType,
)
from pyiceberg.utils.properties import get_first_property_value, property_as_bool

if TYPE_CHECKING:
    import pyarrow as pa


# There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue
# metastore to use based on the user's default AWS client credential and region setup. You can specify the Glue catalog
# ID through glue.id catalog property to point to a Glue catalog in a different AWS account. The Glue catalog ID is your
# numeric AWS account ID.
GLUE_ID = "glue.id"

# If Glue should skip archiving an old table version when creating a new version in a commit. By
# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default
# max number of archived table versions (can be increased). So for streaming use case with lots
# of commits, it is recommended to set this value to true.
GLUE_SKIP_ARCHIVE = "glue.skip-archive"
GLUE_SKIP_ARCHIVE_DEFAULT = True

# Configure an alternative endpoint of the Glue service for GlueCatalog to access.
# This could be used to use GlueCatalog with any glue-compatible metastore service that has a different endpoint
GLUE_CATALOG_ENDPOINT = "glue.endpoint"

ICEBERG_FIELD_ID = "iceberg.field.id"
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
ICEBERG_FIELD_CURRENT = "iceberg.field.current"

GLUE_PROFILE_NAME = "glue.profile-name"
GLUE_REGION = "glue.region"
GLUE_ACCESS_KEY_ID = "glue.access-key-id"
GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key"
GLUE_SESSION_TOKEN = "glue.session-token"
GLUE_MAX_RETRIES = "glue.max-retries"
GLUE_RETRY_MODE = "glue.retry-mode"

MAX_RETRIES = 10
STANDARD_RETRY_MODE = "standard"
ADAPTIVE_RETRY_MODE = "adaptive"
LEGACY_RETRY_MODE = "legacy"
EXISTING_RETRY_MODES = [STANDARD_RETRY_MODE, ADAPTIVE_RETRY_MODE, LEGACY_RETRY_MODE]


def _construct_parameters(
    metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
) -> Properties:
    new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
    new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
    if prev_metadata_location:
        new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location
    return new_parameters


GLUE_PRIMITIVE_TYPES = {
    BooleanType: "boolean",
    IntegerType: "int",
    LongType: "bigint",
    FloatType: "float",
    DoubleType: "double",
    DateType: "date",
    TimeType: "string",
    StringType: "string",
    UUIDType: "string",
    TimestampType: "timestamp",
    TimestamptzType: "timestamp",
    FixedType: "binary",
    BinaryType: "binary",
}


class _IcebergSchemaToGlueType(SchemaVisitor[str]):
    def schema(self, schema: Schema, struct_result: str) -> str:
        return struct_result

    def struct(self, struct: StructType, field_results: List[str]) -> str:
        return f"struct<{','.join(field_results)}>"

    def field(self, field: NestedField, field_result: str) -> str:
        return f"{field.name}:{field_result}"

    def list(self, list_type: ListType, element_result: str) -> str:
        return f"array<{element_result}>"

    def map(self, map_type: MapType, key_result: str, value_result: str) -> str:
        return f"map<{key_result},{value_result}>"

    def primitive(self, primitive: PrimitiveType) -> str:
        if isinstance(primitive, DecimalType):
            return f"decimal({primitive.precision},{primitive.scale})"
        if (primitive_type := type(primitive)) not in GLUE_PRIMITIVE_TYPES:
            return str(primitive)
        return GLUE_PRIMITIVE_TYPES[primitive_type]


def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]:
    results: Dict[str, ColumnTypeDef] = {}

    def _append_to_results(field: NestedField, is_current: bool) -> None:
        if field.name in results:
            return

        results[field.name] = cast(
            ColumnTypeDef,
            {
                "Name": field.name,
                "Type": visit(field.field_type, _IcebergSchemaToGlueType()),
                "Parameters": {
                    ICEBERG_FIELD_ID: str(field.field_id),
                    ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(),
                    ICEBERG_FIELD_CURRENT: str(is_current).lower(),
                },
            },
        )
        if field.doc:
            results[field.name]["Comment"] = field.doc

    if current_schema := metadata.schema_by_id(metadata.current_schema_id):
        for field in current_schema.columns:
            _append_to_results(field, True)

    for schema in metadata.schemas:
        if schema.schema_id == metadata.current_schema_id:
            continue
        for field in schema.columns:
            _append_to_results(field, False)

    return list(results.values())


def _construct_table_input(
    table_name: str,
    metadata_location: str,
    properties: Properties,
    metadata: TableMetadata,
    glue_table: Optional[TableTypeDef] = None,
    prev_metadata_location: Optional[str] = None,
) -> TableInputTypeDef:
    table_input: TableInputTypeDef = {
        "Name": table_name,
        "TableType": EXTERNAL_TABLE,
        "Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
        "StorageDescriptor": {
            "Columns": _to_columns(metadata),
            "Location": metadata.location,
        },
    }

    if "Description" in properties:
        table_input["Description"] = properties["Description"]

    return table_input


def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef) -> TableInputTypeDef:
    rename_table_input: TableInputTypeDef = {"Name": to_table_name}
    # use the same Glue info to create the new table, pointing to the old metadata
    assert glue_table["TableType"]
    rename_table_input["TableType"] = glue_table["TableType"]
    if "Owner" in glue_table:
        rename_table_input["Owner"] = glue_table["Owner"]

    if "Parameters" in glue_table:
        rename_table_input["Parameters"] = glue_table["Parameters"]

    if "StorageDescriptor" in glue_table:
        # It turns out the output of StorageDescriptor is not the same as the input type
        # because the Column can have a different type, but for now it seems to work, so
        # silence the type error.
        rename_table_input["StorageDescriptor"] = cast(StorageDescriptorTypeDef, glue_table["StorageDescriptor"])

    if "Description" in glue_table:
        rename_table_input["Description"] = glue_table["Description"]

    return rename_table_input


def _construct_database_input(database_name: str, properties: Properties) -> DatabaseInputTypeDef:
    database_input: DatabaseInputTypeDef = {"Name": database_name}
    parameters = {}
    for k, v in properties.items():
        if k == "Description":
            database_input["Description"] = v
        elif k == LOCATION:
            database_input["LocationUri"] = v
        else:
            parameters[k] = v
    database_input["Parameters"] = parameters
    return database_input


def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None:
    """
    Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods.

    It's more ergonomic to do this than to pass the CatalogId as a parameter to every client call since it's an optional
    parameter and boto3 does not support 'None' values for missing parameters.
    """
    event_system = glue.meta.events

    def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
        if "CatalogId" not in params:
            params["CatalogId"] = glue_catalog_id

    event_system.register("provide-client-params.glue", add_glue_catalog_id)


class GlueCatalog(MetastoreCatalog):
    glue: GlueClient

    def __init__(self, name: str, client: Optional[GlueClient] = None, **properties: Any):
        """Glue Catalog.

        You either need to provide a boto3 glue client, or one will be constructed from the properties.

        Args:
            name: Name to identify the catalog.
            client: An optional boto3 glue client.
            properties: Properties for glue client construction and configuration.
        """
        super().__init__(name, **properties)

        if client:
            self.glue = client
        else:
            retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE)

            session = boto3.Session(
                profile_name=properties.get(GLUE_PROFILE_NAME),
                region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION),
                botocore_session=properties.get(BOTOCORE_SESSION),
                aws_access_key_id=get_first_property_value(properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
                aws_secret_access_key=get_first_property_value(properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
                aws_session_token=get_first_property_value(properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN),
            )
            self.glue: GlueClient = session.client(
                "glue",
                endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT),
                config=Config(
                    retries={
                        "max_attempts": properties.get(GLUE_MAX_RETRIES, MAX_RETRIES),
                        "mode": retry_mode_prop_value if retry_mode_prop_value in EXISTING_RETRY_MODES else STANDARD_RETRY_MODE,
                    }
                ),
            )

            if glue_catalog_id := properties.get(GLUE_ID):
                _register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)

    def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
        properties: Properties = glue_table["Parameters"]

        assert glue_table["DatabaseName"]
        assert glue_table["Parameters"]
        database_name = glue_table["DatabaseName"]
        table_name = glue_table["Name"]

        if TABLE_TYPE not in properties:
            raise NoSuchPropertyException(
                f"Property {TABLE_TYPE} missing, could not determine type: {database_name}.{table_name}"
            )
        glue_table_type = properties[TABLE_TYPE]

        if glue_table_type.lower() != ICEBERG:
            raise NoSuchIcebergTableError(
                f"Property table_type is {glue_table_type}, expected {ICEBERG}: {database_name}.{table_name}"
            )

        if METADATA_LOCATION not in properties:
            raise NoSuchPropertyException(
                f"Table property {METADATA_LOCATION} is missing, cannot find metadata for: {database_name}.{table_name}"
            )
        metadata_location = properties[METADATA_LOCATION]

        io = self._load_file_io(location=metadata_location)
        file = io.new_input(metadata_location)
        metadata = FromInputFile.table_metadata(file)
        return Table(
            identifier=(database_name, table_name),
            metadata=metadata,
            metadata_location=metadata_location,
            io=self._load_file_io(metadata.properties, metadata_location),
            catalog=self,
        )

    def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None:
        try:
            self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
        except self.glue.exceptions.AlreadyExistsException as e:
            raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e

    def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
        try:
            self.glue.update_table(
                DatabaseName=database_name,
                TableInput=table_input,
                SkipArchive=property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
                VersionId=version_id,
            )
        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e
        except self.glue.exceptions.ConcurrentModificationException as e:
            raise CommitFailedException(
                f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
            ) from e

    def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
        try:
            load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
            return load_table_response["Table"]
        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
        """
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        """
        staged_table = self._create_staged_table(
            identifier=identifier,
            schema=schema,
            location=location,
            partition_spec=partition_spec,
            sort_order=sort_order,
            properties=properties,
        )
        database_name, table_name = self.identifier_to_database_and_table(identifier)

        self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
        table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
        self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

        return self.load_table(identifier=identifier)

    def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
        """Register a new table using existing metadata.

        Args:
            identifier Union[str, Identifier]: Table identifier for the table
            metadata_location str: The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier)
        properties = EMPTY_DICT
        io = self._load_file_io(location=metadata_location)
        file = io.new_input(metadata_location)
        metadata = FromInputFile.table_metadata(file)
        table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
        self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
        return self.load_table(identifier=identifier)

    def commit_table(
        self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
    ) -> CommitTableResponse:
        """Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        """
        table_identifier = table.name()
        database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)

        current_glue_table: Optional[TableTypeDef]
        glue_table_version_id: Optional[str]
        current_table: Optional[Table]
        try:
            current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
            glue_table_version_id = current_glue_table.get("VersionId")
            current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
        except NoSuchTableError:
            current_glue_table = None
            glue_table_version_id = None
            current_table = None

        updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
        if current_table and updated_staged_table.metadata == current_table.metadata:
            # no changes, do nothing
            return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
        self._write_metadata(
            metadata=updated_staged_table.metadata,
            io=updated_staged_table.io,
            metadata_path=updated_staged_table.metadata_location,
        )

        if current_table:
            # table exists, update the table
            if not glue_table_version_id:
                raise CommitFailedException(
                    f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
                )

            # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
            # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
            update_table_input = _construct_table_input(
                table_name=table_name,
                metadata_location=updated_staged_table.metadata_location,
                properties=updated_staged_table.properties,
                metadata=updated_staged_table.metadata,
                glue_table=current_glue_table,
                prev_metadata_location=current_table.metadata_location,
            )
            self._update_glue_table(
                database_name=database_name,
                table_name=table_name,
                table_input=update_table_input,
                version_id=glue_table_version_id,
            )
        else:
            # table does not exist, create the table
            create_table_input = _construct_table_input(
                table_name=table_name,
                metadata_location=updated_staged_table.metadata_location,
                properties=updated_staged_table.properties,
                metadata=updated_staged_table.metadata,
            )
            self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

        return CommitTableResponse(
            metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
        )

    def load_table(self, identifier: Union[str, Identifier]) -> Table:
        """Load the table's metadata and returns the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier: Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

        return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))

    def drop_table(self, identifier: Union[str, Identifier]) -> None:
        """Drop a table.

        Args:
            identifier: Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
        try:
            self.glue.delete_table(DatabaseName=database_name, Name=table_name)
        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
        """Rename a fully classified table name.

        This method can only rename Iceberg tables in AWS Glue.

        Args:
            from_identifier: Existing table identifier.
            to_identifier: New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            ValueError: When from table identifier is invalid.
            NoSuchTableError: When a table with the name does not exist.
            NoSuchIcebergTableError: When from table is not a valid iceberg table.
            NoSuchPropertyException: When from table miss some required properties.
            NoSuchNamespaceError: When the destination namespace doesn't exist.
        """
        from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
        to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
        try:
            get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchTableError(f"Table does not exist: {from_database_name}.{from_table_name}") from e

        glue_table = get_table_response["Table"]

        try:
            # verify that from_identifier is a valid iceberg table
            self._convert_glue_to_iceberg(glue_table=glue_table)
        except NoSuchPropertyException as e:
            raise NoSuchPropertyException(
                f"Failed to rename table {from_database_name}.{from_table_name} since it is missing required properties"
            ) from e
        except NoSuchIcebergTableError as e:
            raise NoSuchIcebergTableError(
                f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table"
            ) from e

        rename_table_input = _construct_rename_table_input(to_table_name=to_table_name, glue_table=glue_table)
        self._create_glue_table(database_name=to_database_name, table_name=to_table_name, table_input=rename_table_input)

        try:
            self.drop_table(from_identifier)
        except Exception as e:
            log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. "

            try:
                self.drop_table(to_identifier)
                log_message += f"Rolled back table creation for {to_database_name}.{to_table_name}."
            except NoSuchTableError:
                log_message += (
                    f"Failed to roll back table creation for {to_database_name}.{to_table_name}. Please clean up manually"
                )

            raise ValueError(log_message) from e

        return self.load_table(to_identifier)

    def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
        """Create a namespace in the catalog.

        Args:
            namespace: Namespace identifier.
            properties: A string dictionary of properties for the given namespace.

        Raises:
            ValueError: If the identifier is invalid.
            AlreadyExistsError: If a namespace with the given name already exists.
        """
        database_name = self.identifier_to_database(namespace)
        try:
            self.glue.create_database(DatabaseInput=_construct_database_input(database_name, properties))
        except self.glue.exceptions.AlreadyExistsException as e:
            raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e

    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
        """Drop a namespace.

        A Glue namespace can only be dropped if it is empty.

        Args:
            namespace: Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
            NamespaceNotEmptyError: If the namespace is not empty.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        try:
            table_list = self.list_tables(namespace=database_name)
        except NoSuchNamespaceError as e:
            raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e

        if len(table_list) > 0:
            raise NamespaceNotEmptyError(f"Database {database_name} is not empty")

        self.glue.delete_database(Name=database_name)

    def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        """List Iceberg tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        table_list: List[TableTypeDef] = []
        next_token: Optional[str] = None
        try:
            while True:
                table_list_response = (
                    self.glue.get_tables(DatabaseName=database_name)
                    if not next_token
                    else self.glue.get_tables(DatabaseName=database_name, NextToken=next_token)
                )
                table_list.extend(table_list_response["TableList"])
                next_token = table_list_response.get("NextToken")
                if not next_token:
                    break

        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
        return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]

    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
        """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

        Returns:
            List[Identifier]: a List of namespace identifiers.
        """
        # Hierarchical namespace is not supported. Return an empty list
        if namespace:
            return []

        database_list: List[DatabaseTypeDef] = []
        next_token: Optional[str] = None

        while True:
            databases_response = self.glue.get_databases() if not next_token else self.glue.get_databases(NextToken=next_token)
            database_list.extend(databases_response["DatabaseList"])
            next_token = databases_response.get("NextToken")
            if not next_token:
                break

        return [self.identifier_to_tuple(database["Name"]) for database in database_list]

    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
        """Get properties for a namespace.

        Args:
            namespace: Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        try:
            database_response = self.glue.get_database(Name=database_name)
        except self.glue.exceptions.EntityNotFoundException as e:
            raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
        except self.glue.exceptions.InvalidInputException as e:
            raise NoSuchNamespaceError(f"Invalid input for namespace {database_name}") from e

        database = database_response["Database"]

        properties = dict(database.get("Parameters", {}))
        if "LocationUri" in database:
            properties["location"] = database["LocationUri"]
        if "Description" in database:
            properties["Description"] = database["Description"]

        return properties

    def update_namespace_properties(
        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
    ) -> PropertiesUpdateSummary:
        """Remove provided property keys and updates properties for a namespace.

        Args:
            namespace: Namespace identifier.
            removals: Set of property keys that need to be removed. Optional Argument.
            updates: Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist， or identifier is invalid.
            ValueError: If removals and updates have overlapping keys.
        """
        current_properties = self.load_namespace_properties(namespace=namespace)
        properties_update_summary, updated_properties = self._get_updated_props_and_update_summary(
            current_properties=current_properties, removals=removals, updates=updates
        )

        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        self.glue.update_database(Name=database_name, DatabaseInput=_construct_database_input(database_name, updated_properties))

        return properties_update_summary

    def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        raise NotImplementedError

    def drop_view(self, identifier: Union[str, Identifier]) -> None:
        raise NotImplementedError

    def view_exists(self, identifier: Union[str, Identifier]) -> bool:
        raise NotImplementedError

    @staticmethod
    def __is_iceberg_table(table: TableTypeDef) -> bool:
        return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG
