#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.
import uuid
from time import time
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Union,
)

import boto3

from pyiceberg.catalog import (
    BOTOCORE_SESSION,
    ICEBERG,
    METADATA_LOCATION,
    PREVIOUS_METADATA_LOCATION,
    TABLE_TYPE,
    MetastoreCatalog,
    PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
    ConditionalCheckFailedException,
    GenericDynamoDbError,
    NamespaceAlreadyExistsError,
    NamespaceNotEmptyError,
    NoSuchIcebergTableError,
    NoSuchNamespaceError,
    NoSuchPropertyException,
    NoSuchTableError,
    TableAlreadyExistsError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
    TableRequirement,
    TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.utils.properties import get_first_property_value

if TYPE_CHECKING:
    import pyarrow as pa

DYNAMODB_CLIENT = "dynamodb"

DYNAMODB_COL_IDENTIFIER = "identifier"
DYNAMODB_COL_NAMESPACE = "namespace"
DYNAMODB_COL_VERSION = "v"
DYNAMODB_COL_UPDATED_AT = "updated_at"
DYNAMODB_COL_CREATED_AT = "created_at"
DYNAMODB_NAMESPACE = "NAMESPACE"
DYNAMODB_NAMESPACE_GSI = "namespace-identifier"
DYNAMODB_PAY_PER_REQUEST = "PAY_PER_REQUEST"

DYNAMODB_TABLE_NAME = "table-name"
DYNAMODB_TABLE_NAME_DEFAULT = "iceberg"

PROPERTY_KEY_PREFIX = "p."

ACTIVE = "ACTIVE"
ITEM = "Item"

DYNAMODB_PROFILE_NAME = "dynamodb.profile-name"
DYNAMODB_REGION = "dynamodb.region"
DYNAMODB_ACCESS_KEY_ID = "dynamodb.access-key-id"
DYNAMODB_SECRET_ACCESS_KEY = "dynamodb.secret-access-key"
DYNAMODB_SESSION_TOKEN = "dynamodb.session-token"


class DynamoDbCatalog(MetastoreCatalog):
    def __init__(self, name: str, **properties: str):
        super().__init__(name, **properties)

        session = boto3.Session(
            profile_name=properties.get(DYNAMODB_PROFILE_NAME),
            region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
            botocore_session=properties.get(BOTOCORE_SESSION),
            aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
            aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
            aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
        )
        self.dynamodb = session.client(DYNAMODB_CLIENT)
        self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
        self._ensure_catalog_table_exists_or_create()

    def _ensure_catalog_table_exists_or_create(self) -> None:
        if self._dynamodb_table_exists():
            return None

        try:
            self.dynamodb.create_table(
                TableName=self.dynamodb_table_name,
                AttributeDefinitions=CREATE_CATALOG_ATTRIBUTE_DEFINITIONS,
                KeySchema=CREATE_CATALOG_KEY_SCHEMA,
                GlobalSecondaryIndexes=CREATE_CATALOG_GLOBAL_SECONDARY_INDEXES,
                BillingMode=DYNAMODB_PAY_PER_REQUEST,
            )
        except (
            self.dynamodb.exceptions.ResourceInUseException,
            self.dynamodb.exceptions.LimitExceededException,
            self.dynamodb.exceptions.InternalServerError,
        ) as e:
            raise GenericDynamoDbError(e.message) from e

    def _dynamodb_table_exists(self) -> bool:
        try:
            response = self.dynamodb.describe_table(TableName=self.dynamodb_table_name)
        except self.dynamodb.exceptions.ResourceNotFoundException:
            return False
        except self.dynamodb.exceptions.InternalServerError as e:
            raise GenericDynamoDbError(e.message) from e

        if response["Table"]["TableStatus"] != ACTIVE:
            raise GenericDynamoDbError(f"DynamoDB table for catalog {self.dynamodb_table_name} is not {ACTIVE}")
        else:
            return True

    def create_table(
        self,
        identifier: Union[str, Identifier],
        schema: Union[Schema, "pa.Schema"],
        location: Optional[str] = None,
        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
        sort_order: SortOrder = UNSORTED_SORT_ORDER,
        properties: Properties = EMPTY_DICT,
    ) -> Table:
        """
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        """
        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore

        database_name, table_name = self.identifier_to_database_and_table(identifier)

        location = self._resolve_table_location(location, database_name, table_name)
        provider = load_location_provider(table_location=location, table_properties=properties)
        metadata_location = provider.new_table_metadata_file_location()

        metadata = new_table_metadata(
            location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
        )
        io = load_file_io(properties=self.properties, location=metadata_location)
        self._write_metadata(metadata, io, metadata_location)

        self._ensure_namespace_exists(database_name=database_name)

        try:
            self._put_dynamo_item(
                item=_get_create_table_item(
                    database_name=database_name, table_name=table_name, properties=properties, metadata_location=metadata_location
                ),
                condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})",
            )
        except ConditionalCheckFailedException as e:
            raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e

        return self.load_table(identifier=identifier)

    def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
        """Register a new table using existing metadata.

        Args:
            identifier Union[str, Identifier]: Table identifier for the table
            metadata_location str: The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        """
        raise NotImplementedError

    def commit_table(
        self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
    ) -> CommitTableResponse:
        """Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        """
        raise NotImplementedError

    def load_table(self, identifier: Union[str, Identifier]) -> Table:
        """
        Load the table's metadata and returns the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier: Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
        dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
        return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)

    def drop_table(self, identifier: Union[str, Identifier]) -> None:
        """Drop a table.

        Args:
            identifier: Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        """
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

        try:
            self._delete_dynamo_item(
                namespace=database_name,
                identifier=f"{database_name}.{table_name}",
                condition_expression=f"attribute_exists({DYNAMODB_COL_IDENTIFIER})",
            )
        except ConditionalCheckFailedException as e:
            raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
        """Rename a fully classified table name.

        This method can only rename Iceberg tables in AWS Glue.

        Args:
            from_identifier: Existing table identifier.
            to_identifier: New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            ValueError: When from table identifier is invalid.
            NoSuchTableError: When a table with the name does not exist.
            NoSuchIcebergTableError: When from table is not a valid iceberg table.
            NoSuchPropertyException: When from table miss some required properties.
            NoSuchNamespaceError: When the destination namespace doesn't exist.
        """
        from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
        to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

        from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name)

        try:
            # Verify that from_identifier is a valid iceberg table
            self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=from_table_item)
        except NoSuchPropertyException as e:
            raise NoSuchPropertyException(
                f"Failed to rename table {from_database_name}.{from_table_name} since it is missing required properties"
            ) from e
        except NoSuchIcebergTableError as e:
            raise NoSuchIcebergTableError(
                f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table"
            ) from e

        self._ensure_namespace_exists(database_name=from_database_name)
        self._ensure_namespace_exists(database_name=to_database_name)

        try:
            self._put_dynamo_item(
                item=_get_rename_table_item(
                    from_dynamo_table_item=from_table_item, to_database_name=to_database_name, to_table_name=to_table_name
                ),
                condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})",
            )
        except ConditionalCheckFailedException as e:
            raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e

        try:
            self.drop_table(from_identifier)
        except (NoSuchTableError, GenericDynamoDbError) as e:
            log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. "

            try:
                self.drop_table(to_identifier)
                log_message += f"Rolled back table creation for {to_database_name}.{to_table_name}."
            except (NoSuchTableError, GenericDynamoDbError):
                log_message += (
                    f"Failed to roll back table creation for {to_database_name}.{to_table_name}. Please clean up manually"
                )

            raise ValueError(log_message) from e

        return self.load_table(to_identifier)

    def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
        """Create a namespace in the catalog.

        Args:
            namespace: Namespace identifier.
            properties: A string dictionary of properties for the given namespace.

        Raises:
            ValueError: If the identifier is invalid.
            AlreadyExistsError: If a namespace with the given name already exists.
        """
        database_name = self.identifier_to_database(namespace)

        try:
            self._put_dynamo_item(
                item=_get_create_database_item(database_name=database_name, properties=properties),
                condition_expression=f"attribute_not_exists({DYNAMODB_COL_NAMESPACE})",
            )
        except ConditionalCheckFailedException as e:
            raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e

    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
        """Drop a namespace.

        A Glue namespace can only be dropped if it is empty.

        Args:
            namespace: Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
            NamespaceNotEmptyError: If the namespace is not empty.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        table_identifiers = self.list_tables(namespace=database_name)

        if len(table_identifiers) > 0:
            raise NamespaceNotEmptyError(f"Database {database_name} is not empty")

        try:
            self._delete_dynamo_item(
                namespace=database_name,
                identifier=DYNAMODB_NAMESPACE,
                condition_expression=f"attribute_exists({DYNAMODB_COL_IDENTIFIER})",
            )
        except ConditionalCheckFailedException as e:
            raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e

    def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        """List Iceberg tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)

        paginator = self.dynamodb.get_paginator("query")

        try:
            page_iterator = paginator.paginate(
                TableName=self.dynamodb_table_name,
                IndexName=DYNAMODB_NAMESPACE_GSI,
                KeyConditionExpression=f"{DYNAMODB_COL_NAMESPACE} = :namespace ",
                ExpressionAttributeValues={
                    ":namespace": {
                        "S": database_name,
                    }
                },
            )
        except (
            self.dynamodb.exceptions.ProvisionedThroughputExceededException,
            self.dynamodb.exceptions.RequestLimitExceeded,
            self.dynamodb.exceptions.InternalServerError,
            self.dynamodb.exceptions.ResourceNotFoundException,
        ) as e:
            raise GenericDynamoDbError(e.message) from e

        table_identifiers = []
        for page in page_iterator:
            for item in page["Items"]:
                _dict = _convert_dynamo_item_to_regular_dict(item)
                identifier_col = _dict[DYNAMODB_COL_IDENTIFIER]
                if identifier_col == DYNAMODB_NAMESPACE:
                    continue

                table_identifiers.append(self.identifier_to_tuple(identifier_col))

        return table_identifiers

    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
        """List top-level namespaces from the catalog.

        We do not support hierarchical namespace.

        Returns:
            List[Identifier]: a List of namespace identifiers.
        """
        # Hierarchical namespace is not supported. Return an empty list
        if namespace:
            return []

        paginator = self.dynamodb.get_paginator("query")

        try:
            page_iterator = paginator.paginate(
                TableName=self.dynamodb_table_name,
                ConsistentRead=True,
                KeyConditionExpression=f"{DYNAMODB_COL_IDENTIFIER} = :identifier",
                ExpressionAttributeValues={
                    ":identifier": {
                        "S": DYNAMODB_NAMESPACE,
                    }
                },
            )
        except (
            self.dynamodb.exceptions.ProvisionedThroughputExceededException,
            self.dynamodb.exceptions.RequestLimitExceeded,
            self.dynamodb.exceptions.InternalServerError,
            self.dynamodb.exceptions.ResourceNotFoundException,
        ) as e:
            raise GenericDynamoDbError(e.message) from e

        database_identifiers = []
        for page in page_iterator:
            for item in page["Items"]:
                _dict = _convert_dynamo_item_to_regular_dict(item)
                namespace_col = _dict[DYNAMODB_COL_NAMESPACE]
                database_identifiers.append(self.identifier_to_tuple(namespace_col))

        return database_identifiers

    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
        """
        Get properties for a namespace.

        Args:
            namespace: Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        namespace_item = self._get_iceberg_namespace_item(database_name=database_name)
        namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item)
        return _get_namespace_properties(namespace_dict=namespace_dict)

    def update_namespace_properties(
        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
    ) -> PropertiesUpdateSummary:
        """
        Remove or update provided property keys for a namespace.

        Args:
            namespace: Namespace identifier
            removals: Set of property keys that need to be removed. Optional Argument.
            updates: Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist， or identifier is invalid.
            ValueError: If removals and updates have overlapping keys.
        """
        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
        namespace_item = self._get_iceberg_namespace_item(database_name=database_name)
        namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item)
        current_properties = _get_namespace_properties(namespace_dict=namespace_dict)

        properties_update_summary, updated_properties = self._get_updated_props_and_update_summary(
            current_properties=current_properties, removals=removals, updates=updates
        )

        try:
            self._put_dynamo_item(
                item=_get_update_database_item(
                    namespace_item=namespace_item,
                    updated_properties=updated_properties,
                ),
                condition_expression=f"attribute_exists({DYNAMODB_COL_NAMESPACE})",
            )
        except ConditionalCheckFailedException as e:
            raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e

        return properties_update_summary

    def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
        raise NotImplementedError

    def drop_view(self, identifier: Union[str, Identifier]) -> None:
        raise NotImplementedError

    def view_exists(self, identifier: Union[str, Identifier]) -> bool:
        raise NotImplementedError

    def _get_iceberg_table_item(self, database_name: str, table_name: str) -> Dict[str, Any]:
        try:
            return self._get_dynamo_item(identifier=f"{database_name}.{table_name}", namespace=database_name)
        except ValueError as e:
            raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

    def _get_iceberg_namespace_item(self, database_name: str) -> Dict[str, Any]:
        try:
            return self._get_dynamo_item(identifier=DYNAMODB_NAMESPACE, namespace=database_name)
        except ValueError as e:
            raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}") from e

    def _ensure_namespace_exists(self, database_name: str) -> Dict[str, Any]:
        return self._get_iceberg_namespace_item(database_name)

    def _get_dynamo_item(self, identifier: str, namespace: str) -> Dict[str, Any]:
        try:
            response = self.dynamodb.get_item(
                TableName=self.dynamodb_table_name,
                ConsistentRead=True,
                Key={
                    DYNAMODB_COL_IDENTIFIER: {
                        "S": identifier,
                    },
                    DYNAMODB_COL_NAMESPACE: {
                        "S": namespace,
                    },
                },
            )
            if ITEM in response:
                return response[ITEM]
            else:
                raise ValueError(f"Item not found. identifier: {identifier} - namespace: {namespace}")
        except self.dynamodb.exceptions.ResourceNotFoundException as e:
            raise ValueError(f"Item not found. identifier: {identifier} - namespace: {namespace}") from e
        except (
            self.dynamodb.exceptions.ProvisionedThroughputExceededException,
            self.dynamodb.exceptions.RequestLimitExceeded,
            self.dynamodb.exceptions.InternalServerError,
        ) as e:
            raise GenericDynamoDbError(e.message) from e

    def _put_dynamo_item(self, item: Dict[str, Any], condition_expression: str) -> None:
        try:
            self.dynamodb.put_item(TableName=self.dynamodb_table_name, Item=item, ConditionExpression=condition_expression)
        except self.dynamodb.exceptions.ConditionalCheckFailedException as e:
            raise ConditionalCheckFailedException(f"Condition expression check failed: {condition_expression} - {item}") from e
        except (
            self.dynamodb.exceptions.ProvisionedThroughputExceededException,
            self.dynamodb.exceptions.RequestLimitExceeded,
            self.dynamodb.exceptions.InternalServerError,
            self.dynamodb.exceptions.ResourceNotFoundException,
            self.dynamodb.exceptions.ItemCollectionSizeLimitExceededException,
            self.dynamodb.exceptions.TransactionConflictException,
        ) as e:
            raise GenericDynamoDbError(e.message) from e

    def _delete_dynamo_item(self, namespace: str, identifier: str, condition_expression: str) -> None:
        try:
            self.dynamodb.delete_item(
                TableName=self.dynamodb_table_name,
                Key={
                    DYNAMODB_COL_IDENTIFIER: {
                        "S": identifier,
                    },
                    DYNAMODB_COL_NAMESPACE: {
                        "S": namespace,
                    },
                },
                ConditionExpression=condition_expression,
            )
        except self.dynamodb.exceptions.ConditionalCheckFailedException as e:
            raise ConditionalCheckFailedException(
                f"Condition expression check failed: {condition_expression} - {identifier}"
            ) from e
        except (
            self.dynamodb.exceptions.ProvisionedThroughputExceededException,
            self.dynamodb.exceptions.RequestLimitExceeded,
            self.dynamodb.exceptions.InternalServerError,
            self.dynamodb.exceptions.ResourceNotFoundException,
            self.dynamodb.exceptions.ItemCollectionSizeLimitExceededException,
            self.dynamodb.exceptions.TransactionConflictException,
        ) as e:
            raise GenericDynamoDbError(e.message) from e

    def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[str, Any]) -> Table:
        table_dict = _convert_dynamo_item_to_regular_dict(dynamo_table_item)

        for prop in [_add_property_prefix(prop) for prop in (TABLE_TYPE, METADATA_LOCATION)] + [
            DYNAMODB_COL_IDENTIFIER,
            DYNAMODB_COL_NAMESPACE,
            DYNAMODB_COL_CREATED_AT,
        ]:
            if prop not in table_dict.keys():
                raise NoSuchPropertyException(f"Iceberg required property {prop} is missing: {dynamo_table_item}")

        table_type = table_dict[_add_property_prefix(TABLE_TYPE)]
        identifier = table_dict[DYNAMODB_COL_IDENTIFIER]
        metadata_location = table_dict[_add_property_prefix(METADATA_LOCATION)]
        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

        if table_type.lower() != ICEBERG:
            raise NoSuchIcebergTableError(
                f"Property table_type is {table_type}, expected {ICEBERG}: {database_name}.{table_name}"
            )

        io = load_file_io(properties=self.properties, location=metadata_location)
        file = io.new_input(metadata_location)
        metadata = FromInputFile.table_metadata(file)
        return Table(
            identifier=(database_name, table_name),
            metadata=metadata,
            metadata_location=metadata_location,
            io=self._load_file_io(metadata.properties, metadata_location),
            catalog=self,
        )


def _get_create_table_item(database_name: str, table_name: str, properties: Properties, metadata_location: str) -> Dict[str, Any]:
    current_timestamp_ms = str(round(time() * 1000))
    _dict = {
        DYNAMODB_COL_IDENTIFIER: {
            "S": f"{database_name}.{table_name}",
        },
        DYNAMODB_COL_NAMESPACE: {
            "S": database_name,
        },
        DYNAMODB_COL_VERSION: {
            "S": str(uuid.uuid4()),
        },
        DYNAMODB_COL_CREATED_AT: {
            "N": current_timestamp_ms,
        },
        DYNAMODB_COL_UPDATED_AT: {
            "N": current_timestamp_ms,
        },
    }

    for key, val in properties.items():
        _dict[_add_property_prefix(key)] = {"S": val}

    _dict[_add_property_prefix(TABLE_TYPE)] = {"S": ICEBERG.upper()}
    _dict[_add_property_prefix(METADATA_LOCATION)] = {"S": metadata_location}
    _dict[_add_property_prefix(PREVIOUS_METADATA_LOCATION)] = {"S": ""}

    return _dict


def _get_rename_table_item(from_dynamo_table_item: Dict[str, Any], to_database_name: str, to_table_name: str) -> Dict[str, Any]:
    _dict = from_dynamo_table_item
    current_timestamp_ms = str(round(time() * 1000))
    _dict[DYNAMODB_COL_IDENTIFIER]["S"] = f"{to_database_name}.{to_table_name}"
    _dict[DYNAMODB_COL_NAMESPACE]["S"] = to_database_name
    _dict[DYNAMODB_COL_VERSION]["S"] = str(uuid.uuid4())
    _dict[DYNAMODB_COL_UPDATED_AT]["N"] = current_timestamp_ms
    return _dict


def _get_create_database_item(database_name: str, properties: Properties) -> Dict[str, Any]:
    current_timestamp_ms = str(round(time() * 1000))
    _dict = {
        DYNAMODB_COL_IDENTIFIER: {
            "S": DYNAMODB_NAMESPACE,
        },
        DYNAMODB_COL_NAMESPACE: {
            "S": database_name,
        },
        DYNAMODB_COL_VERSION: {
            "S": str(uuid.uuid4()),
        },
        DYNAMODB_COL_CREATED_AT: {
            "N": current_timestamp_ms,
        },
        DYNAMODB_COL_UPDATED_AT: {
            "N": current_timestamp_ms,
        },
    }

    for key, val in properties.items():
        _dict[_add_property_prefix(key)] = {"S": val}

    return _dict


def _get_update_database_item(namespace_item: Dict[str, Any], updated_properties: Properties) -> Dict[str, Any]:
    current_timestamp_ms = str(round(time() * 1000))

    _dict = {
        DYNAMODB_COL_IDENTIFIER: namespace_item[DYNAMODB_COL_IDENTIFIER],
        DYNAMODB_COL_NAMESPACE: namespace_item[DYNAMODB_COL_NAMESPACE],
        DYNAMODB_COL_VERSION: {
            "S": str(uuid.uuid4()),
        },
        DYNAMODB_COL_CREATED_AT: namespace_item[DYNAMODB_COL_CREATED_AT],
        DYNAMODB_COL_UPDATED_AT: {
            "N": current_timestamp_ms,
        },
    }

    for key, val in updated_properties.items():
        _dict[_add_property_prefix(key)] = {"S": val}

    return _dict


CREATE_CATALOG_ATTRIBUTE_DEFINITIONS = [
    {
        "AttributeName": DYNAMODB_COL_IDENTIFIER,
        "AttributeType": "S",
    },
    {
        "AttributeName": DYNAMODB_COL_NAMESPACE,
        "AttributeType": "S",
    },
]

CREATE_CATALOG_KEY_SCHEMA = [
    {
        "AttributeName": DYNAMODB_COL_IDENTIFIER,
        "KeyType": "HASH",
    },
    {
        "AttributeName": DYNAMODB_COL_NAMESPACE,
        "KeyType": "RANGE",
    },
]


CREATE_CATALOG_GLOBAL_SECONDARY_INDEXES = [
    {
        "IndexName": DYNAMODB_NAMESPACE_GSI,
        "KeySchema": [
            {
                "AttributeName": DYNAMODB_COL_NAMESPACE,
                "KeyType": "HASH",
            },
            {
                "AttributeName": DYNAMODB_COL_IDENTIFIER,
                "KeyType": "RANGE",
            },
        ],
        "Projection": {
            "ProjectionType": "KEYS_ONLY",
        },
    }
]


def _get_namespace_properties(namespace_dict: Dict[str, str]) -> Properties:
    return {_remove_property_prefix(key): val for key, val in namespace_dict.items() if key.startswith(PROPERTY_KEY_PREFIX)}


def _convert_dynamo_item_to_regular_dict(dynamo_json: Dict[str, Any]) -> Dict[str, str]:
    """Convert a dynamo json to a regular json.

    Example of a dynamo json:
    {
        "AlbumTitle": {
            "S": "Songs About Life",
        },
        "Artist": {
            "S": "Acme Band",
        },
        "SongTitle": {
            "S": "Happy Day",
        }
    }

    Converted to regular json:
    {
        "AlbumTitle": "Songs About Life",
        "Artist": "Acme Band",
        "SongTitle": "Happy Day"
    }

    Only "S" and "N" data types are supported since those are the only ones that Iceberg is utilizing.
    """
    regular_json = {}
    for column_name, val_dict in dynamo_json.items():
        keys = list(val_dict.keys())

        if len(keys) != 1:
            raise ValueError(f"Expecting only 1 key: {keys}")

        data_type = keys[0]
        if data_type not in ("S", "N"):
            raise ValueError("Only S and N data types are supported.")

        values = list(val_dict.values())
        assert len(values) == 1
        column_value = values[0]
        regular_json[column_name] = column_value

    return regular_json


def _add_property_prefix(prop: str) -> str:
    return PROPERTY_KEY_PREFIX + prop


def _remove_property_prefix(prop: str) -> str:
    return prop.lstrip(PROPERTY_KEY_PREFIX)
