# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import importlib
import logging
import re
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Callable,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Type,
    Union,
    cast,
)

from pyiceberg.exceptions import (
    NamespaceAlreadyExistsError,
    NoSuchNamespaceError,
    NoSuchTableError,
    NotInstalledError,
    TableAlreadyExistsError,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
    DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
    CommitTableResponse,
    CreateTableTransaction,
    StagedTable,
    Table,
    TableProperties,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
    TableRequirement,
    TableUpdate,
    update_table_metadata,
)
from pyiceberg.typedef import (
    EMPTY_DICT,
    Identifier,
    Properties,
    RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
    import pyarrow as pa

logger = logging.getLogger(__name__)

_ENV_CONFIG = Config()

TOKEN = "token"
TYPE = "type"
PY_CATALOG_IMPL = "py-catalog-impl"
ICEBERG = "iceberg"
TABLE_TYPE = "table_type"
WAREHOUSE_LOCATION = "warehouse"
METADATA_LOCATION = "metadata_location"
PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
MANIFEST = "manifest"
MANIFEST_LIST = "manifest list"
PREVIOUS_METADATA = "previous metadata"
METADATA = "metadata"
URI = "uri"
LOCATION = "location"
EXTERNAL_TABLE = "EXTERNAL_TABLE"
BOTOCORE_SESSION = "botocore_session"

TABLE_METADATA_FILE_NAME_REGEX = re.compile(
    r"""
    (\d+)              # version number
    -                  # separator
    ([\w-]{36})        # UUID (36 characters, including hyphens)
    (?:\.\w+)?         # optional codec name
    \.metadata\.json   # file extension
    """,
    re.X,
)


class CatalogType(Enum):
    REST = "rest"
    HIVE = "hive"
    GLUE = "glue"
    DYNAMODB = "dynamodb"
    SQL = "sql"
    IN_MEMORY = "in-memory"


def load_rest(name: str, conf: Properties) -> Catalog:
    from pyiceberg.catalog.rest import RestCatalog

    return RestCatalog(name, **conf)


def load_hive(name: str, conf: Properties) -> Catalog:
    try:
        from pyiceberg.catalog.hive import HiveCatalog

        return HiveCatalog(name, **conf)
    except ImportError as exc:
        raise NotInstalledError("Apache Hive support not installed: pip install 'pyiceberg[hive]'") from exc


def load_glue(name: str, conf: Properties) -> Catalog:
    try:
        from pyiceberg.catalog.glue import GlueCatalog

        return GlueCatalog(name, **conf)
    except ImportError as exc:
        raise NotInstalledError("AWS glue support not installed: pip install 'pyiceberg[glue]'") from exc


def load_dynamodb(name: str, conf: Properties) -> Catalog:
    try:
        from pyiceberg.catalog.dynamodb import DynamoDbCatalog

        return DynamoDbCatalog(name, **conf)
    except ImportError as exc:
        raise NotInstalledError("AWS DynamoDB support not installed: pip install 'pyiceberg[dynamodb]'") from exc


def load_sql(name: str, conf: Properties) -> Catalog:
    try:
        from pyiceberg.catalog.sql import SqlCatalog

        return SqlCatalog(name, **conf)
    except ImportError as exc:
        raise NotInstalledError(
            "SQLAlchemy support not installed: pip install 'pyiceberg[sql-postgres]' or pip install 'pyiceberg[sql-sqlite]'"
        ) from exc


def load_in_memory(name: str, conf: Properties) -> Catalog:
    try:
        from pyiceberg.catalog.memory import InMemoryCatalog

        return InMemoryCatalog(name, **conf)
    except ImportError as exc:
        raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc


AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
    CatalogType.REST: load_rest,
    CatalogType.HIVE: load_hive,
    CatalogType.GLUE: load_glue,
    CatalogType.DYNAMODB: load_dynamodb,
    CatalogType.SQL: load_sql,
    CatalogType.IN_MEMORY: load_in_memory,
}


def infer_catalog_type(name: str, catalog_properties: RecursiveDict) -> Optional[CatalogType]:
    """Try to infer the type based on the dict.

    Args:
        name: Name of the catalog.
        catalog_properties: Catalog properties.

    Returns:
        The inferred type based on the provided properties.

    Raises:
        ValueError: Raises a ValueError in case properties are missing, or the wrong type.
    """
    if uri := catalog_properties.get("uri"):
        if isinstance(uri, str):
            if uri.startswith("http"):
                return CatalogType.REST
            elif uri.startswith("thrift"):
                return CatalogType.HIVE
            elif uri.startswith(("sqlite", "postgresql")):
                return CatalogType.SQL
            else:
                raise ValueError(f"Could not infer the catalog type from the uri: {uri}")
        else:
            raise ValueError(f"Expects the URI to be a string, got: {type(uri)}")
    raise ValueError(
        f"URI missing, please provide using --uri, the config or environment variable PYICEBERG_CATALOG__{name.upper()}__URI"
    )


def load_catalog(name: Optional[str] = None, **properties: Optional[str]) -> Catalog:
    """Load the catalog based on the properties.

    Will look up the properties from the config, based on the name.

    Args:
        name: The name of the catalog.
        properties: The properties that are used next to the configuration.

    Returns:
        An initialized Catalog.

    Raises:
        ValueError: Raises a ValueError in case properties are missing or malformed,
            or if it could not determine the catalog based on the properties.
    """
    if name is None:
        name = _ENV_CONFIG.get_default_catalog_name()

    env = _ENV_CONFIG.get_catalog_config(name)
    conf: RecursiveDict = merge_config(env or {}, cast(RecursiveDict, properties))

    catalog_type: Optional[CatalogType]
    provided_catalog_type = conf.get(TYPE)

    if catalog_impl := properties.get(PY_CATALOG_IMPL):
        if provided_catalog_type:
            raise ValueError(
                "Must not set both catalog type and py-catalog-impl configurations, "
                f"but found type {provided_catalog_type} and py-catalog-impl {catalog_impl}"
            )

        if catalog := _import_catalog(name, catalog_impl, properties):
            logger.info("Loaded Catalog: %s", catalog_impl)
            return catalog
        else:
            raise ValueError(f"Could not initialize Catalog: {catalog_impl}")

    catalog_type = None
    if provided_catalog_type and isinstance(provided_catalog_type, str):
        catalog_type = CatalogType(provided_catalog_type.lower())
    elif not provided_catalog_type:
        catalog_type = infer_catalog_type(name, conf)

    if catalog_type:
        return AVAILABLE_CATALOGS[catalog_type](name, cast(Dict[str, str], conf))

    raise ValueError(f"Could not initialize catalog with the following properties: {properties}")


def delete_files(io: FileIO, files_to_delete: Set[str], file_type: str) -> None:
    """Delete files.

    Log warnings if failing to delete any file.

    Args:
        io: The FileIO used to delete the object.
        files_to_delete: A set of file paths to be deleted.
        file_type: The type of the file.
    """
    for file in files_to_delete:
        try:
            io.delete(file)
        except OSError as exc:
            logger.warning(msg=f"Failed to delete {file_type} file {file}", exc_info=exc)


def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> None:
    """Delete data files linked to given manifests.

    Log warnings if failing to delete any file.

    Args:
        io: The FileIO used to delete the object.
        manifests_to_delete: A list of manifest contains paths of data files to be deleted.
    """
    deleted_files: dict[str, bool] = {}
    for manifest_file in manifests_to_delete:
        for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False):
            path = entry.data_file.file_path
            if not deleted_files.get(path, False):
                try:
                    io.delete(path)
                except OSError as exc:
                    logger.warning(msg=f"Failed to delete data file {path}", exc_info=exc)
                deleted_files[path] = True


def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Optional[Catalog]:
    try:
        path_parts = catalog_impl.split(".")
        if len(path_parts) < 2:
            raise ValueError(f"py-catalog-impl should be full path (module.CustomCatalog), got: {catalog_impl}")
        module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
        module = importlib.import_module(module_name)
        class_ = getattr(module, class_name)
        return class_(name, **properties)
    except ModuleNotFoundError as exc:
        logger.warning(f"Could not initialize Catalog: {catalog_impl}", exc_info=exc)
        return None


@dataclass
class PropertiesUpdateSummary:
    removed: List[str]
    updated: List[str]
    missing: List[str]


class Catalog(ABC):
    """Base Catalog for table operations like - create, drop, load, list and others.

    The catalog table APIs accept a table identifier, which is fully classified table name. The identifier can be a string or
    tuple of strings. If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is.

    The catalog namespace APIs follow a similar convention wherein they also accept a namespace identifier that can be a string
    or tuple of strings.

    Attributes:
        name (str): Name of the catalog.
        properties (Properties): Catalog properties.
    """

    name: str
    properties: Properties

    def __init__(self, name: str, **properties: str):
        self.name = name
        self.properties = properties

    @abstractmethod
    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
        """Create a table.

        Args:
            identifier (str | Identifier): Table identifier.
            schema (Schema): Table's schema.
            location (str | None): Location for the table. Optional Argument.
            partition_spec (PartitionSpec): PartitionSpec for the table.
            sort_order (SortOrder): SortOrder for the table.
            properties (Properties): Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            TableAlreadyExistsError: If a table with the name already exists.
        """

    @abstractmethod
    def create_table_transaction(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> CreateTableTransaction:
        """Create a CreateTableTransaction.

        Args:
            identifier (str | Identifier): Table identifier.
            schema (Schema): Table's schema.
            location (str | None): Location for the table. Optional Argument.
            partition_spec (PartitionSpec): PartitionSpec for the table.
            sort_order (SortOrder): SortOrder for the table.
            properties (Properties): Table properties that can be a string based dictionary.

        Returns:
            CreateTableTransaction: createTableTransaction instance.
        """

    def create_table_if_not_exists(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
        """Create a table if it does not exist.

        Args:
            identifier (str | Identifier): Table identifier.
            schema (Schema): Table's schema.
            location (str | None): Location for the table. Optional Argument.
            partition_spec (PartitionSpec): PartitionSpec for the table.
            sort_order (SortOrder): SortOrder for the table.
            properties (Properties): Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance if the table does not exist, else the existing
            table instance.
        """
        try:
            return self.create_table(identifier, schema, location, partition_spec, sort_order, properties)
        except TableAlreadyExistsError:
            return self.load_table(identifier)

    @abstractmethod
    def load_table(self, identifier: Union[str, Identifier]) -> Table:
        """Load the table's metadata and returns the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except NoSuchTableError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier (str | Identifier): Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        """

    @abstractmethod
    def table_exists(self, identifier: Union[str, Identifier]) -> bool:
        """Check if a table exists.

        Args:
            identifier (str | Identifier): Table identifier.

        Returns:
            bool: True if the table exists, False otherwise.
        """

    @abstractmethod
    def view_exists(self, identifier: Union[str, Identifier]) -> bool:
        """Check if a view exists.

        Args:
            identifier (str | Identifier): View identifier.

        Returns:
            bool: True if the view exists, False otherwise.
        """

    @abstractmethod
    def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
        """Register a new table using existing metadata.

        Args:
            identifier Union[str, Identifier]: Table identifier for the table
            metadata_location str: The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        """

    @abstractmethod
    def drop_table(self, identifier: Union[str, Identifier]) -> None:
        """Drop a table.

        Args:
            identifier (str | Identifier): Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        """

    @abstractmethod
    def purge_table(self, identifier: Union[str, Identifier]) -> None:
        """Drop a table and purge all data and metadata files.

        Note: This method only logs warning rather than raise exception when encountering file deletion failure.

        Args:
            identifier (str | Identifier): Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """

    @abstractmethod
    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
        """Rename a fully classified table name.

        Args:
            from_identifier (str | Identifier): Existing table identifier.
            to_identifier (str | Identifier): New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        """

    @abstractmethod
    def commit_table(
        self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
    ) -> CommitTableResponse:
        """Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
            CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
        """

    @abstractmethod
    def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
        """Create a namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier.
            properties (Properties): A string dictionary of properties for the given namespace.

        Raises:
            NamespaceAlreadyExistsError: If a namespace with the given name already exists.
        """

    def create_namespace_if_not_exists(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
        """Create a namespace if it does not exist.

        Args:
            namespace (str | Identifier): Namespace identifier.
            properties (Properties): A string dictionary of properties for the given namespace.
        """
        try:
            self.create_namespace(namespace, properties)
        except NamespaceAlreadyExistsError:
            pass

    @abstractmethod
    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
        """Drop a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
            NamespaceNotEmptyError: If the namespace is not empty.
        """

    @abstractmethod
    def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        """List tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """

    @abstractmethod
    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
        """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: a List of namespace identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """

    @abstractmethod
    def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        """List views under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """

    @abstractmethod
    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
        """Get properties for a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        """

    @abstractmethod
    def update_namespace_properties(
        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
    ) -> PropertiesUpdateSummary:
        """Remove provided property keys and updates properties for a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.
            removals (Set[str]): Set of property keys that need to be removed. Optional Argument.
            updates (Properties): Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
            ValueError: If removals and updates have overlapping keys.
        """

    @abstractmethod
    def drop_view(self, identifier: Union[str, Identifier]) -> None:
        """Drop a view.

        Args:
            identifier (str | Identifier): View identifier.

        Raises:
            NoSuchViewError: If a view with the given name does not exist.
        """

    @staticmethod
    def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
        """Parse an identifier to a tuple.

        If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is.

        Args:
            identifier (str | Identifier): an identifier, either a string or tuple of strings.

        Returns:
            Identifier: a tuple of strings.
        """
        return identifier if isinstance(identifier, tuple) else tuple(str.split(identifier, "."))

    @staticmethod
    def table_name_from(identifier: Union[str, Identifier]) -> str:
        """Extract table name from a table identifier.

        Args:
            identifier (str | Identifier: a table identifier.

        Returns:
            str: Table name.
        """
        return Catalog.identifier_to_tuple(identifier)[-1]

    @staticmethod
    def namespace_from(identifier: Union[str, Identifier]) -> Identifier:
        """Extract table namespace from a table identifier.

        Args:
            identifier (Union[str, Identifier]): a table identifier.

        Returns:
            Identifier: Namespace identifier.
        """
        return Catalog.identifier_to_tuple(identifier)[:-1]

    @staticmethod
    def namespace_to_string(
        identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError
    ) -> str:
        """Transform a namespace identifier into a string.

        Args:
            identifier (Union[str, Identifier]): a namespace identifier.
            err (Union[Type[ValueError], Type[NoSuchNamespaceError]]): the error type to raise when identifier is empty.

        Returns:
            Identifier: Namespace identifier.
        """
        tuple_identifier = Catalog.identifier_to_tuple(identifier)
        if len(tuple_identifier) < 1:
            raise err("Empty namespace identifier")

        # Check if any segment of the tuple is an empty string
        if any(segment.strip() == "" for segment in tuple_identifier):
            raise err("Namespace identifier contains an empty segment or a segment with only whitespace")

        return ".".join(segment.strip() for segment in tuple_identifier)

    @staticmethod
    def identifier_to_database(
        identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError
    ) -> str:
        tuple_identifier = Catalog.identifier_to_tuple(identifier)
        if len(tuple_identifier) != 1:
            raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}")

        return tuple_identifier[0]

    @staticmethod
    def identifier_to_database_and_table(
        identifier: Union[str, Identifier],
        err: Union[Type[ValueError], Type[NoSuchTableError], Type[NoSuchNamespaceError]] = ValueError,
    ) -> Tuple[str, str]:
        tuple_identifier = Catalog.identifier_to_tuple(identifier)
        if len(tuple_identifier) != 2:
            raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}")

        return tuple_identifier[0], tuple_identifier[1]

    def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
        return load_file_io({**self.properties, **properties}, location)

    @staticmethod
    def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
        if isinstance(schema, Schema):
            return schema
        try:
            import pyarrow as pa

            from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow

            downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
            if isinstance(schema, pa.Schema):
                schema: Schema = visit_pyarrow(  # type: ignore
                    schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
                )
                return schema
        except ModuleNotFoundError:
            pass
        raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")

    @staticmethod
    def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None:
        """Delete oldest metadata if config is set to true."""
        delete_after_commit: bool = property_as_bool(
            metadata.properties,
            TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
            TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT,
        )

        if delete_after_commit:
            removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log}
            current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log}
            removed_previous_metadata_files.difference_update(current_metadata_files)
            delete_files(io, removed_previous_metadata_files, METADATA)

    def __repr__(self) -> str:
        """Return the string representation of the Catalog class."""
        return f"{self.name} ({self.__class__})"


class MetastoreCatalog(Catalog, ABC):
    def __init__(self, name: str, **properties: str):
        super().__init__(name, **properties)

    def create_table_transaction(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> CreateTableTransaction:
        return CreateTableTransaction(
            self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
        )

    def table_exists(self, identifier: Union[str, Identifier]) -> bool:
        try:
            self.load_table(identifier)
            return True
        except NoSuchTableError:
            return False

    def purge_table(self, identifier: Union[str, Identifier]) -> None:
        table = self.load_table(identifier)
        self.drop_table(identifier)
        io = load_file_io(self.properties, table.metadata_location)
        metadata = table.metadata
        manifest_lists_to_delete = set()
        manifests_to_delete: List[ManifestFile] = []
        for snapshot in metadata.snapshots:
            manifests_to_delete += snapshot.manifests(io)
            manifest_lists_to_delete.add(snapshot.manifest_list)

        manifest_paths_to_delete = {manifest.manifest_path for manifest in manifests_to_delete}
        prev_metadata_files = {log.metadata_file for log in metadata.metadata_log}

        delete_data_files(io, manifests_to_delete)
        delete_files(io, manifest_paths_to_delete, MANIFEST)
        delete_files(io, manifest_lists_to_delete, MANIFEST_LIST)
        delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
        delete_files(io, {table.metadata_location}, METADATA)

    def _create_staged_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> StagedTable:
        """Create a table and return the table instance without committing the changes.

        Args:
            identifier (str | Identifier): Table identifier.
            schema (Schema): Table's schema.
            location (str | None): Location for the table. Optional Argument.
            partition_spec (PartitionSpec): PartitionSpec for the table.
            sort_order (SortOrder): SortOrder for the table.
            properties (Properties): Table properties that can be a string based dictionary.

        Returns:
            StagedTable: the created staged table instance.
        """
        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore

        database_name, table_name = self.identifier_to_database_and_table(identifier)

        location = self._resolve_table_location(location, database_name, table_name)
        provider = load_location_provider(location, properties)
        metadata_location = provider.new_table_metadata_file_location()
        metadata = new_table_metadata(
            location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
        )
        io = self._load_file_io(properties=properties, location=metadata_location)
        return StagedTable(
            identifier=(database_name, table_name),
            metadata=metadata,
            metadata_location=metadata_location,
            io=io,
            catalog=self,
        )

    def _update_and_stage_table(
        self,
        current_table: Optional[Table],
        table_identifier: Identifier,
        requirements: Tuple[TableRequirement, ...],
        updates: Tuple[TableUpdate, ...],
    ) -> StagedTable:
        for requirement in requirements:
            requirement.validate(current_table.metadata if current_table else None)

        updated_metadata = update_table_metadata(
            base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
            updates=updates,
            enforce_validation=current_table is None,
            metadata_location=current_table.metadata_location if current_table else None,
        )

        new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
        provider = load_location_provider(updated_metadata.location, updated_metadata.properties)
        new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version)

        return StagedTable(
            identifier=table_identifier,
            metadata=updated_metadata,
            metadata_location=new_metadata_location,
            io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
            catalog=self,
        )

    def _get_updated_props_and_update_summary(
        self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties
    ) -> Tuple[PropertiesUpdateSummary, Properties]:
        self._check_for_overlap(updates=updates, removals=removals)
        updated_properties = dict(current_properties)

        removed: Set[str] = set()
        updated: Set[str] = set()

        if removals:
            for key in removals:
                if key in updated_properties:
                    updated_properties.pop(key)
                    removed.add(key)
        if updates:
            for key, value in updates.items():
                updated_properties[key] = value
                updated.add(key)

        expected_to_change = (removals or set()).difference(removed)
        properties_update_summary = PropertiesUpdateSummary(
            removed=list(removed or []), updated=list(updated or []), missing=list(expected_to_change)
        )

        return properties_update_summary, updated_properties

    def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str) -> str:
        if not location:
            return self._get_default_warehouse_location(database_name, table_name)
        return location.rstrip("/")

    def _get_default_warehouse_location(self, database_name: str, table_name: str) -> str:
        database_properties = self.load_namespace_properties(database_name)
        if database_location := database_properties.get(LOCATION):
            database_location = database_location.rstrip("/")
            return f"{database_location}/{table_name}"

        if warehouse_path := self.properties.get(WAREHOUSE_LOCATION):
            warehouse_path = warehouse_path.rstrip("/")
            return f"{warehouse_path}/{database_name}.db/{table_name}"

        raise ValueError("No default path is set, please specify a location when creating a table")

    @staticmethod
    def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None:
        ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))

    @staticmethod
    def _parse_metadata_version(metadata_location: str) -> int:
        """Parse the version from the metadata location.

        The version is the first part of the file name, before the first dash.
        For example, the version of the metadata file
        `s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
        is 1.
        If the path does not comply with the pattern, the version is defaulted to be -1, ensuring
        that the next metadata file is treated as having version 0.

        Args:
            metadata_location (str): The location of the metadata file.

        Returns:
            int: The version of the metadata file. -1 if the file name does not have valid version string
        """
        file_name = metadata_location.split("/")[-1]
        if file_name_match := TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
            try:
                uuid.UUID(file_name_match.group(2))
            except ValueError:
                return -1
            return int(file_name_match.group(1))
        else:
            return -1

    @staticmethod
    def _check_for_overlap(removals: Optional[Set[str]], updates: Properties) -> None:
        if updates and removals:
            overlap = set(removals) & set(updates.keys())
            if overlap:
                raise ValueError(f"Updates and deletes have an overlap: {overlap}")

    @staticmethod
    def _empty_table_metadata() -> TableMetadata:
        """Return an empty TableMetadata instance.

        It is used to build a TableMetadata from a sequence of initial TableUpdates.
        It is a V1 TableMetadata because there will be a UpgradeFormatVersionUpdate in
        initial changes to bump the metadata to the target version.

        Returns:
            TableMetadata: An empty TableMetadata instance.
        """
        return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema())
