"""AWS Glue Catalog Module."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Literal

import boto3

from awswrangler import _utils, exceptions, typing
from awswrangler._config import apply_configs
from awswrangler.catalog._definitions import (
    _csv_table_definition,
    _json_table_definition,
    _orc_table_definition,
    _parquet_table_definition,
)
from awswrangler.catalog._delete import delete_all_partitions, delete_table_if_exists
from awswrangler.catalog._get import _get_table_input
from awswrangler.catalog._utils import _catalog_id, sanitize_column_name, sanitize_table_name

if TYPE_CHECKING:
    from mypy_boto3_glue import GlueClient

_logger: logging.Logger = logging.getLogger(__name__)


def _update_if_necessary(
    dic: dict[str, str | dict[str, str]], key: str, value: str | dict[str, str] | None, mode: str
) -> str:
    if value is not None:
        if key not in dic or dic[key] != value:
            dic[key] = value
            if mode in ("append", "overwrite_partitions"):
                return "update"
    return mode


def _create_table(  # noqa: PLR0912,PLR0915
    database: str,
    table: str,
    description: str | None,
    parameters: dict[str, str] | None,
    mode: str,
    catalog_versioning: bool,
    boto3_session: boto3.Session | None,
    table_input: dict[str, Any],
    table_exist: bool,
    partitions_types: dict[str, str] | None,
    columns_comments: dict[str, str] | None,
    columns_parameters: dict[str, dict[str, str]] | None,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None,
    catalog_id: str | None,
) -> None:
    # Description
    mode = _update_if_necessary(dic=table_input, key="Description", value=description, mode=mode)

    if "Parameters" not in table_input:
        table_input["Parameters"] = {}

    # Parameters
    parameters = parameters if parameters else {}
    for k, v in parameters.items():
        mode = _update_if_necessary(dic=table_input["Parameters"], key=k, value=v, mode=mode)

    # Projection
    projection_params = athena_partition_projection_settings if athena_partition_projection_settings else {}
    if athena_partition_projection_settings:
        table_input["Parameters"]["projection.enabled"] = "true"
        partitions_types = partitions_types if partitions_types else {}
        projection_types = projection_params.get("projection_types", {})
        projection_ranges = projection_params.get("projection_ranges", {})
        projection_values = projection_params.get("projection_values", {})
        projection_intervals = projection_params.get("projection_intervals", {})
        projection_digits = projection_params.get("projection_digits", {})
        projection_formats = projection_params.get("projection_formats", {})
        projection_types = {sanitize_column_name(k): v for k, v in projection_types.items()}
        projection_ranges = {sanitize_column_name(k): v for k, v in projection_ranges.items()}
        projection_values = {sanitize_column_name(k): v for k, v in projection_values.items()}
        projection_intervals = {sanitize_column_name(k): v for k, v in projection_intervals.items()}
        projection_digits = {sanitize_column_name(k): v for k, v in projection_digits.items()}
        projection_formats = {sanitize_column_name(k): v for k, v in projection_formats.items()}
        projection_storage_location_template = projection_params.get("projection_storage_location_template")
        for k, v in projection_types.items():
            dtype: str | None = partitions_types.get(k)
            if dtype is None and projection_storage_location_template is None:
                raise exceptions.InvalidArgumentCombination(
                    f"Column {k} appears as projected column but not as partitioned column."
                )
            if dtype == "date":
                table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd"
            elif dtype == "timestamp":
                table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd HH:mm:ss"
                table_input["Parameters"][f"projection.{k}.interval.unit"] = "SECONDS"
                table_input["Parameters"][f"projection.{k}.interval"] = "1"
        for k, v in projection_types.items():
            mode = _update_if_necessary(dic=table_input["Parameters"], key=f"projection.{k}.type", value=v, mode=mode)
        for k, v in projection_ranges.items():
            mode = _update_if_necessary(dic=table_input["Parameters"], key=f"projection.{k}.range", value=v, mode=mode)
        for k, v in projection_values.items():
            mode = _update_if_necessary(dic=table_input["Parameters"], key=f"projection.{k}.values", value=v, mode=mode)
        for k, v in projection_intervals.items():
            mode = _update_if_necessary(
                dic=table_input["Parameters"], key=f"projection.{k}.interval", value=str(v), mode=mode
            )
        for k, v in projection_digits.items():
            mode = _update_if_necessary(
                dic=table_input["Parameters"], key=f"projection.{k}.digits", value=str(v), mode=mode
            )
        for k, v in projection_formats.items():
            mode = _update_if_necessary(
                dic=table_input["Parameters"], key=f"projection.{k}.format", value=str(v), mode=mode
            )
        mode = _update_if_necessary(
            table_input["Parameters"],
            key="storage.location.template",
            value=projection_storage_location_template,
            mode=mode,
        )
    else:
        table_input["Parameters"]["projection.enabled"] = "false"

    # Column comments
    columns_comments = columns_comments if columns_comments else {}
    columns_comments = {sanitize_column_name(k): v for k, v in columns_comments.items()}
    if columns_comments:
        for col in table_input["StorageDescriptor"]["Columns"]:
            name: str = col["Name"]
            if name in columns_comments:
                mode = _update_if_necessary(dic=col, key="Comment", value=columns_comments[name], mode=mode)
        for par in table_input["PartitionKeys"]:
            name = par["Name"]
            if name in columns_comments:
                mode = _update_if_necessary(dic=par, key="Comment", value=columns_comments[name], mode=mode)

    # Column parameters
    columns_parameters = columns_parameters if columns_parameters else {}
    columns_parameters = {sanitize_column_name(k): v for k, v in columns_parameters.items()}
    if columns_parameters:
        for col in table_input["StorageDescriptor"]["Columns"]:
            name: str = col["Name"]  # type: ignore[no-redef]
            if name in columns_parameters:
                mode = _update_if_necessary(dic=col, key="Parameters", value=columns_parameters[name], mode=mode)
        for par in table_input["PartitionKeys"]:
            name = par["Name"]
            if name in columns_parameters:
                mode = _update_if_necessary(dic=par, key="Parameters", value=columns_parameters[name], mode=mode)

    _logger.debug("table_input: %s", table_input)

    client_glue = _utils.client(service_name="glue", session=boto3_session)
    skip_archive: bool = not catalog_versioning
    if mode not in ("overwrite", "append", "overwrite_partitions", "update"):
        raise exceptions.InvalidArgument(
            f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'."
        )
    args: dict[str, Any] = _catalog_id(
        catalog_id=catalog_id,
        DatabaseName=database,
        TableInput=table_input,
    )
    if table_exist:
        _logger.debug("Updating table (%s)...", mode)
        args["SkipArchive"] = skip_archive
        if mode == "overwrite":
            delete_all_partitions(table=table, database=database, catalog_id=catalog_id, boto3_session=boto3_session)
        if mode in ["overwrite", "update"]:
            client_glue.update_table(**args)
    else:
        try:
            _logger.debug("Creating table (%s)...", mode)
            client_glue.create_table(**args)
        except client_glue.exceptions.AlreadyExistsException:
            if mode == "overwrite":
                _utils.try_it(
                    f=_overwrite_table,
                    ex=client_glue.exceptions.AlreadyExistsException,
                    client_glue=client_glue,
                    catalog_id=catalog_id,
                    database=database,
                    table=table,
                    table_input=table_input,
                    boto3_session=boto3_session,
                )
    _logger.debug("Leaving table as is (%s)...", mode)


def _overwrite_table(
    client_glue: "GlueClient",
    catalog_id: str | None,
    database: str,
    table: str,
    table_input: dict[str, Any],
    boto3_session: boto3.Session,
) -> None:
    delete_table_if_exists(
        database=database,
        table=table,
        boto3_session=boto3_session,
        catalog_id=catalog_id,
    )
    args: dict[str, Any] = _catalog_id(
        catalog_id=catalog_id,
        DatabaseName=database,
        TableInput=table_input,
    )
    client_glue.create_table(**args)


def _upsert_table_parameters(
    parameters: dict[str, str],
    database: str,
    catalog_versioning: bool,
    catalog_id: str | None,
    table_input: dict[str, Any],
    boto3_session: boto3.Session | None,
) -> dict[str, str]:
    pars: dict[str, str] = table_input["Parameters"]
    update: bool = False
    for k, v in parameters.items():
        if k not in pars or v != pars[k]:
            pars[k] = v
            update = True
    if update is True:
        _overwrite_table_parameters(
            parameters=pars,
            database=database,
            catalog_id=catalog_id,
            boto3_session=boto3_session,
            table_input=table_input,
            catalog_versioning=catalog_versioning,
        )
    return pars


def _overwrite_table_parameters(
    parameters: dict[str, str],
    database: str,
    catalog_versioning: bool,
    catalog_id: str | None,
    table_input: dict[str, Any],
    boto3_session: boto3.Session | None,
) -> dict[str, str]:
    table_input["Parameters"] = parameters
    client_glue = _utils.client(service_name="glue", session=boto3_session)
    skip_archive: bool = not catalog_versioning
    client_glue.update_table(
        **_catalog_id(
            catalog_id=catalog_id,
            DatabaseName=database,
            TableInput=table_input,
            SkipArchive=skip_archive,
        )
    )
    return parameters


def _update_table_input(table_input: dict[str, Any], columns_types: dict[str, str], allow_reorder: bool = True) -> bool:
    column_updated = False

    catalog_cols: dict[str, str] = {x["Name"]: x["Type"] for x in table_input["StorageDescriptor"]["Columns"]}

    if not allow_reorder:
        for catalog_key, frame_key in zip(catalog_cols, columns_types):
            if catalog_key != frame_key:
                raise exceptions.InvalidArgumentValue(f"Column {frame_key} is out of order.")

    for c, t in columns_types.items():
        if c not in catalog_cols:
            _logger.debug("New column %s with type %s.", c, t)
            table_input["StorageDescriptor"]["Columns"].append({"Name": c, "Type": t})
            column_updated = True
        elif t != catalog_cols[c]:  # Data type change detected!
            raise exceptions.InvalidArgumentValue(
                f"Data type change detected on column {c} (Old type: {catalog_cols[c]} / New type {t})."
            )

    return column_updated


def _create_parquet_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None,
    partitions_types: dict[str, str] | None,
    bucketing_info: typing.BucketingInfoTuple | None,
    catalog_id: str | None,
    compression: str | None,
    description: str | None,
    parameters: dict[str, str] | None,
    columns_comments: dict[str, str] | None,
    columns_parameters: dict[str, dict[str, str]] | None,
    mode: str,
    catalog_versioning: bool,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None,
    boto3_session: boto3.Session | None,
    catalog_table_input: dict[str, Any] | None,
) -> None:
    table = sanitize_table_name(table=table)
    columns_types = {sanitize_column_name(k): v for k, v in columns_types.items()}
    partitions_types = {} if partitions_types is None else partitions_types
    _logger.debug("catalog_table_input: %s", catalog_table_input)

    table_input: dict[str, Any]
    if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
        table_input = catalog_table_input

        is_table_updated = _update_table_input(table_input, columns_types)
        if is_table_updated:
            mode = "update"
    else:
        table_input = _parquet_table_definition(
            table=table,
            path=path,
            columns_types=columns_types,
            table_type=table_type,
            partitions_types=partitions_types,
            bucketing_info=bucketing_info,
            compression=compression,
        )
    table_exist: bool = catalog_table_input is not None
    _logger.debug("table_exist: %s", table_exist)
    _create_table(
        database=database,
        table=table,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        boto3_session=boto3_session,
        table_input=table_input,
        table_exist=table_exist,
        partitions_types=partitions_types,
        athena_partition_projection_settings=athena_partition_projection_settings,
        catalog_id=catalog_id,
    )


def _create_orc_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None,
    partitions_types: dict[str, str] | None,
    bucketing_info: typing.BucketingInfoTuple | None,
    catalog_id: str | None,
    compression: str | None,
    description: str | None,
    parameters: dict[str, str] | None,
    columns_comments: dict[str, str] | None,
    columns_parameters: dict[str, dict[str, str]] | None,
    mode: str,
    catalog_versioning: bool,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None,
    boto3_session: boto3.Session | None,
    catalog_table_input: dict[str, Any] | None,
) -> None:
    table = sanitize_table_name(table=table)
    columns_types = {sanitize_column_name(k): v for k, v in columns_types.items()}
    partitions_types = {} if partitions_types is None else partitions_types
    _logger.debug("catalog_table_input: %s", catalog_table_input)

    table_input: dict[str, Any]
    if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
        table_input = catalog_table_input

        is_table_updated = _update_table_input(table_input, columns_types)
        if is_table_updated:
            mode = "update"
    else:
        table_input = _orc_table_definition(
            table=table,
            path=path,
            columns_types=columns_types,
            table_type=table_type,
            partitions_types=partitions_types,
            bucketing_info=bucketing_info,
            compression=compression,
        )
    table_exist: bool = catalog_table_input is not None
    _logger.debug("table_exist: %s", table_exist)
    _create_table(
        database=database,
        table=table,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        boto3_session=boto3_session,
        table_input=table_input,
        table_exist=table_exist,
        partitions_types=partitions_types,
        athena_partition_projection_settings=athena_partition_projection_settings,
        catalog_id=catalog_id,
    )


def _create_csv_table(
    database: str,
    table: str,
    path: str | None,
    columns_types: dict[str, str],
    table_type: str | None,
    partitions_types: dict[str, str] | None,
    bucketing_info: typing.BucketingInfoTuple | None,
    description: str | None,
    compression: str | None,
    parameters: dict[str, str] | None,
    columns_comments: dict[str, str] | None,
    columns_parameters: dict[str, dict[str, str]] | None,
    mode: str,
    catalog_versioning: bool,
    schema_evolution: bool,
    sep: str,
    skip_header_line_count: int | None,
    serde_library: str | None,
    serde_parameters: dict[str, str] | None,
    boto3_session: boto3.Session | None,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None,
    catalog_table_input: dict[str, Any] | None,
    catalog_id: str | None,
) -> None:
    table = sanitize_table_name(table=table)
    columns_types = {sanitize_column_name(k): v for k, v in columns_types.items()}
    partitions_types = {} if partitions_types is None else partitions_types
    _logger.debug("catalog_table_input: %s", catalog_table_input)

    if schema_evolution is False:
        _utils.check_schema_changes(columns_types=columns_types, table_input=catalog_table_input, mode=mode)

    table_input: dict[str, Any]
    if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
        table_input = catalog_table_input

        is_table_updated = _update_table_input(table_input, columns_types, allow_reorder=False)
        if is_table_updated:
            mode = "update"

    else:
        table_input = _csv_table_definition(
            table=table,
            path=path,
            columns_types=columns_types,
            table_type=table_type,
            partitions_types=partitions_types,
            bucketing_info=bucketing_info,
            compression=compression,
            sep=sep,
            skip_header_line_count=skip_header_line_count,
            serde_library=serde_library,
            serde_parameters=serde_parameters,
        )
    table_exist: bool = catalog_table_input is not None
    _logger.debug("table_exist: %s", table_exist)
    _create_table(
        database=database,
        table=table,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        boto3_session=boto3_session,
        table_input=table_input,
        table_exist=table_exist,
        partitions_types=partitions_types,
        athena_partition_projection_settings=athena_partition_projection_settings,
        catalog_id=catalog_id,
    )


def _create_json_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None,
    partitions_types: dict[str, str] | None,
    bucketing_info: typing.BucketingInfoTuple | None,
    description: str | None,
    compression: str | None,
    parameters: dict[str, str] | None,
    columns_comments: dict[str, str] | None,
    columns_parameters: dict[str, dict[str, str]] | None,
    mode: str,
    catalog_versioning: bool,
    schema_evolution: bool,
    serde_library: str | None,
    serde_parameters: dict[str, str] | None,
    boto3_session: boto3.Session | None,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None,
    catalog_table_input: dict[str, Any] | None,
    catalog_id: str | None,
) -> None:
    table = sanitize_table_name(table=table)
    columns_types = {sanitize_column_name(k): v for k, v in columns_types.items()}
    partitions_types = {} if partitions_types is None else partitions_types
    _logger.debug("catalog_table_input: %s", catalog_table_input)

    table_input: dict[str, Any]
    if schema_evolution is False:
        _utils.check_schema_changes(columns_types=columns_types, table_input=catalog_table_input, mode=mode)
    if (catalog_table_input is not None) and (mode in ("append", "overwrite_partitions")):
        table_input = catalog_table_input

        is_table_updated = _update_table_input(table_input, columns_types)
        if is_table_updated:
            mode = "update"

    else:
        table_input = _json_table_definition(
            table=table,
            path=path,
            columns_types=columns_types,
            table_type=table_type,
            partitions_types=partitions_types,
            bucketing_info=bucketing_info,
            compression=compression,
            serde_library=serde_library,
            serde_parameters=serde_parameters,
        )
    table_exist: bool = catalog_table_input is not None
    _logger.debug("table_exist: %s", table_exist)
    _create_table(
        database=database,
        table=table,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        boto3_session=boto3_session,
        table_input=table_input,
        table_exist=table_exist,
        partitions_types=partitions_types,
        athena_partition_projection_settings=athena_partition_projection_settings,
        catalog_id=catalog_id,
    )


@apply_configs
def upsert_table_parameters(
    parameters: dict[str, str],
    database: str,
    table: str,
    catalog_versioning: bool = False,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, str]:
    """Insert or Update the received parameters.

    Parameters
    ----------
    parameters
        e.g. {"source": "mysql", "destination":  "datalake"}
    database
        Database name.
    table
        Table name.
    catalog_versioning
        If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    boto3_session
        Boto3 Session. The default boto3 session will be used if boto3_session receive None.

    Returns
    -------
       All parameters after the upsert.

    Examples
    --------
    >>> import awswrangler as wr
    >>> pars = wr.catalog.upsert_table_parameters(
    ...     parameters={"source": "mysql", "destination":  "datalake"},
    ...     database="...",
    ...     table="...",
    ... )

    """
    table_input: dict[str, str] | None = _get_table_input(
        database=database,
        table=table,
        boto3_session=boto3_session,
        catalog_id=catalog_id,
    )
    if table_input is None:
        raise exceptions.InvalidArgumentValue(f"Table {database}.{table} does not exist.")
    return _upsert_table_parameters(
        parameters=parameters,
        database=database,
        boto3_session=boto3_session,
        catalog_id=catalog_id,
        table_input=table_input,
        catalog_versioning=catalog_versioning,
    )


@apply_configs
def overwrite_table_parameters(
    parameters: dict[str, str],
    database: str,
    table: str,
    catalog_versioning: bool = False,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, str]:
    """Overwrite all existing parameters.

    Parameters
    ----------
    parameters
        e.g. {"source": "mysql", "destination":  "datalake"}
    database
        Database name.
    table
        Table name.
    catalog_versioning
        If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
       All parameters after the overwrite (The same received).

    Examples
    --------
    >>> import awswrangler as wr
    >>> pars = wr.catalog.overwrite_table_parameters(
    ...     parameters={"source": "mysql", "destination":  "datalake"},
    ...     database="...",
    ...     table="...",
    ... )

    """
    table_input: dict[str, Any] | None = _get_table_input(
        database=database,
        table=table,
        catalog_id=catalog_id,
        boto3_session=boto3_session,
    )
    if table_input is None:
        raise exceptions.InvalidTable(f"Table {table} does not exist on database {database}.")
    return _overwrite_table_parameters(
        parameters=parameters,
        database=database,
        catalog_id=catalog_id,
        table_input=table_input,
        boto3_session=boto3_session,
        catalog_versioning=catalog_versioning,
    )


@apply_configs
def create_database(
    name: str,
    description: str | None = None,
    catalog_id: str | None = None,
    exist_ok: bool = False,
    database_input_args: dict[str, Any] | None = None,
    boto3_session: boto3.Session | None = None,
) -> None:
    """Create a database in AWS Glue Catalog.

    Parameters
    ----------
    name
        Database name.
    description
        A description for the Database.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    exist_ok
        If set to ``True`` will not raise an Exception if a Database with the same already exists.
        In this case the description will be updated if it is different from the current one.
    database_input_args
        Additional metadata to pass to database creation. Supported arguments listed here:
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.create_database(
    ...     name='awswrangler_test'
    ... )
    """
    client_glue = _utils.client(service_name="glue", session=boto3_session)
    args: dict[str, Any] = {"Name": name, **database_input_args} if database_input_args else {"Name": name}
    if description is not None:
        args["Description"] = description

    try:
        r = client_glue.get_database(**_catalog_id(catalog_id=catalog_id, Name=name))
        if not exist_ok:
            raise exceptions.AlreadyExists(f"Database {name} already exists and <exist_ok> is set to False.")
        for k, v in args.items():
            if v != r["Database"].get(k, ""):
                client_glue.update_database(**_catalog_id(catalog_id=catalog_id, Name=name, DatabaseInput=args))
                break
    except client_glue.exceptions.EntityNotFoundException:
        client_glue.create_database(**_catalog_id(catalog_id=catalog_id, DatabaseInput=args))


@apply_configs
def create_parquet_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None = None,
    partitions_types: dict[str, str] | None = None,
    bucketing_info: typing.BucketingInfoTuple | None = None,
    catalog_id: str | None = None,
    compression: str | None = None,
    description: str | None = None,
    parameters: dict[str, str] | None = None,
    columns_comments: dict[str, str] | None = None,
    columns_parameters: dict[str, dict[str, str]] | None = None,
    mode: Literal["overwrite", "append"] = "overwrite",
    catalog_versioning: bool = False,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None = None,
    boto3_session: boto3.Session | None = None,
) -> None:
    """Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.

    'https://docs.aws.amazon.com/athena/latest/ug/data-types.html'

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    path
        Amazon S3 path (e.g. s3://bucket/prefix/).
    columns_types
        Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
    table_type
        The type of the Glue Table. Set to ``EXTERNAL_TABLE`` if ``None``.
    partitions_types
        Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
    bucketing_info
        Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
        second element.
        Only `str`, `int` and `bool` are supported as column data types for bucketing.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    compression
        Compression style (``None``, ``snappy``, ``gzip``, etc).
    description
        Table description
    parameters
        Key/value pairs to tag the table.
    columns_comments
        Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
    columns_parameters
        Columns names and the related parameters (e.g. {'col0': {'par0': 'Param 0', 'par1': 'Param 1'}}).
    mode
        'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
    catalog_versioning
        If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
    athena_partition_projection_settings
        Parameters of the Athena Partition Projection (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html).
        AthenaPartitionProjectionSettings is a `TypedDict`, meaning the passed parameter can be instantiated either as an
        instance of AthenaPartitionProjectionSettings or as a regular Python dict.

        Following projection parameters are supported:

        .. list-table:: Projection Parameters
           :header-rows: 1

           * - Name
             - Type
             - Description
           * - projection_types
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections types.
               Valid types: "enum", "integer", "date", "injected"
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'enum', 'col2_name': 'integer'})
           * - projection_ranges
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections ranges.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
           * - projection_values
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections values.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
           * - projection_intervals
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections intervals.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '5'})
           * - projection_digits
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections digits.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '2'})
           * - projection_formats
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections formats.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
           * - projection_storage_location_template
             - Optional[str]
             - Value which is allows Athena to properly map partition values if the S3 file locations do not follow
               a typical `.../column=value/...` pattern.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
               (e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.create_parquet_table(
    ...     database='default',
    ...     table='my_table',
    ...     path='s3://bucket/prefix/',
    ...     columns_types={'col0': 'bigint', 'col1': 'double'},
    ...     partitions_types={'col2': 'date'},
    ...     compression='snappy',
    ...     description='My own table!',
    ...     parameters={'source': 'postgresql'},
    ...     columns_comments={'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}
    ... )

    """
    catalog_table_input: dict[str, Any] | None = _get_table_input(
        database=database,
        table=table,
        boto3_session=boto3_session,
        catalog_id=catalog_id,
    )
    _create_parquet_table(
        database=database,
        table=table,
        path=path,
        columns_types=columns_types,
        table_type=table_type,
        partitions_types=partitions_types,
        bucketing_info=bucketing_info,
        catalog_id=catalog_id,
        compression=compression,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        athena_partition_projection_settings=athena_partition_projection_settings,
        boto3_session=boto3_session,
        catalog_table_input=catalog_table_input,
    )


@apply_configs
def create_orc_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None = None,
    partitions_types: dict[str, str] | None = None,
    bucketing_info: typing.BucketingInfoTuple | None = None,
    catalog_id: str | None = None,
    compression: str | None = None,
    description: str | None = None,
    parameters: dict[str, str] | None = None,
    columns_comments: dict[str, str] | None = None,
    columns_parameters: dict[str, dict[str, str]] | None = None,
    mode: Literal["overwrite", "append"] = "overwrite",
    catalog_versioning: bool = False,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None = None,
    boto3_session: boto3.Session | None = None,
) -> None:
    """Create a ORC Table (Metadata Only) in the AWS Glue Catalog.

    'https://docs.aws.amazon.com/athena/latest/ug/data-types.html'

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    path
        Amazon S3 path (e.g. s3://bucket/prefix/).
    columns_types
        Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
    table_type
        The type of the Glue Table. Set to EXTERNAL_TABLE if None.
    partitions_types
        Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
    bucketing_info
        Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
        second element.
        Only `str`, `int` and `bool` are supported as column data types for bucketing.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    compression
        Compression style (``None``, ``snappy``, ``gzip``, etc).
    description
        Table description
    parameters
        Key/value pairs to tag the table.
    columns_comments
        Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
    columns_parameters
        Columns names and the related parameters (e.g. {'col0': {'par0': 'Param 0', 'par1': 'Param 1'}}).
    mode
        'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
    catalog_versioning
        If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
    athena_partition_projection_settings
        Parameters of the Athena Partition Projection (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html).
        AthenaPartitionProjectionSettings is a `TypedDict`, meaning the passed parameter can be instantiated either as an
        instance of AthenaPartitionProjectionSettings or as a regular Python dict.

        Following projection parameters are supported:

        .. list-table:: Projection Parameters
           :header-rows: 1

           * - Name
             - Type
             - Description
           * - projection_types
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections types.
               Valid types: "enum", "integer", "date", "injected"
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'enum', 'col2_name': 'integer'})
           * - projection_ranges
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections ranges.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
           * - projection_values
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections values.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
           * - projection_intervals
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections intervals.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '5'})
           * - projection_digits
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections digits.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '2'})
           * - projection_formats
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections formats.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
           * - projection_storage_location_template
             - Optional[str]
             - Value which is allows Athena to properly map partition values if the S3 file locations do not follow
               a typical `.../column=value/...` pattern.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
               (e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.create_parquet_table(
    ...     database='default',
    ...     table='my_table',
    ...     path='s3://bucket/prefix/',
    ...     columns_types={'col0': 'bigint', 'col1': 'double'},
    ...     partitions_types={'col2': 'date'},
    ...     compression='snappy',
    ...     description='My own table!',
    ...     parameters={'source': 'postgresql'},
    ...     columns_comments={'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}
    ... )

    """
    catalog_table_input: dict[str, Any] | None = _get_table_input(
        database=database,
        table=table,
        boto3_session=boto3_session,
        catalog_id=catalog_id,
    )
    _create_orc_table(
        database=database,
        table=table,
        path=path,
        columns_types=columns_types,
        table_type=table_type,
        partitions_types=partitions_types,
        bucketing_info=bucketing_info,
        catalog_id=catalog_id,
        compression=compression,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        athena_partition_projection_settings=athena_partition_projection_settings,
        boto3_session=boto3_session,
        catalog_table_input=catalog_table_input,
    )


@apply_configs
def create_csv_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None = None,
    partitions_types: dict[str, str] | None = None,
    bucketing_info: typing.BucketingInfoTuple | None = None,
    compression: str | None = None,
    description: str | None = None,
    parameters: dict[str, str] | None = None,
    columns_comments: dict[str, str] | None = None,
    columns_parameters: dict[str, dict[str, str]] | None = None,
    mode: Literal["overwrite", "append"] = "overwrite",
    catalog_versioning: bool = False,
    schema_evolution: bool = False,
    sep: str = ",",
    skip_header_line_count: int | None = None,
    serde_library: str | None = None,
    serde_parameters: dict[str, str] | None = None,
    boto3_session: boto3.Session | None = None,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None = None,
    catalog_id: str | None = None,
) -> None:
    r"""Create a CSV Table (Metadata Only) in the AWS Glue Catalog.

    'https://docs.aws.amazon.com/athena/latest/ug/data-types.html'

    Note
    ----
    Athena requires the columns in the underlying CSV files in S3 to be in the same order
    as the columns in the Glue data catalog.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    path
        Amazon S3 path (e.g. s3://bucket/prefix/).
    columns_types
        Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
    table_type
        The type of the Glue Table. Set to EXTERNAL_TABLE if None.
    partitions_types
        Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
    bucketing_info
        Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
        second element.
        Only `str`, `int` and `bool` are supported as column data types for bucketing.
    compression
        Compression style (``None``, ``gzip``, etc).
    description
        Table description
    parameters
        Key/value pairs to tag the table.
    columns_comments
        Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
    columns_parameters
        Columns names and the related parameters (e.g. {'col0': {'par0': 'Param 0', 'par1': 'Param 1'}}).
    mode
        'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
    catalog_versioning
        If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
    schema_evolution
        If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
        (Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
        Related tutorial:
        https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
    sep
        String of length 1. Field delimiter for the output file.
    skip_header_line_count
        Number of Lines to skip regarding to the header.
    serde_library
        Specifies the SerDe Serialization library which will be used. You need to provide the Class library name
        as a string.
        If no library is provided the default is `org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe`.
    serde_parameters
        Dictionary of initialization parameters for the SerDe.
        The default is `{"field.delim": sep, "escape.delim": "\\"}`.
    athena_partition_projection_settings
        Parameters of the Athena Partition Projection (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html).
        AthenaPartitionProjectionSettings is a `TypedDict`, meaning the passed parameter can be instantiated either as an
        instance of AthenaPartitionProjectionSettings or as a regular Python dict.

        Following projection parameters are supported:

        .. list-table:: Projection Parameters
           :header-rows: 1

           * - Name
             - Type
             - Description
           * - projection_types
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections types.
               Valid types: "enum", "integer", "date", "injected"
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'enum', 'col2_name': 'integer'})
           * - projection_ranges
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections ranges.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
           * - projection_values
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections values.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
           * - projection_intervals
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections intervals.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '5'})
           * - projection_digits
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections digits.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '2'})
           * - projection_formats
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections formats.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
           * - projection_storage_location_template
             - Optional[str]
             - Value which is allows Athena to properly map partition values if the S3 file locations do not follow
               a typical `.../column=value/...` pattern.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
               (e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.create_csv_table(
    ...     database='default',
    ...     table='my_table',
    ...     path='s3://bucket/prefix/',
    ...     columns_types={'col0': 'bigint', 'col1': 'double'},
    ...     partitions_types={'col2': 'date'},
    ...     compression='gzip',
    ...     description='My own table!',
    ...     parameters={'source': 'postgresql'},
    ...     columns_comments={'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}
    ... )

    """
    catalog_table_input: dict[str, Any] | None = _get_table_input(
        database=database,
        table=table,
        boto3_session=boto3_session,
        catalog_id=catalog_id,
    )
    _create_csv_table(
        database=database,
        table=table,
        path=path,
        columns_types=columns_types,
        table_type=table_type,
        partitions_types=partitions_types,
        bucketing_info=bucketing_info,
        catalog_id=catalog_id,
        compression=compression,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        schema_evolution=schema_evolution,
        athena_partition_projection_settings=athena_partition_projection_settings,
        boto3_session=boto3_session,
        catalog_table_input=catalog_table_input,
        sep=sep,
        skip_header_line_count=skip_header_line_count,
        serde_library=serde_library,
        serde_parameters=serde_parameters,
    )


@apply_configs
def create_json_table(
    database: str,
    table: str,
    path: str,
    columns_types: dict[str, str],
    table_type: str | None = None,
    partitions_types: dict[str, str] | None = None,
    bucketing_info: typing.BucketingInfoTuple | None = None,
    compression: str | None = None,
    description: str | None = None,
    parameters: dict[str, str] | None = None,
    columns_comments: dict[str, str] | None = None,
    columns_parameters: dict[str, dict[str, str]] | None = None,
    mode: Literal["overwrite", "append"] = "overwrite",
    catalog_versioning: bool = False,
    schema_evolution: bool = False,
    serde_library: str | None = None,
    serde_parameters: dict[str, str] | None = None,
    boto3_session: boto3.Session | None = None,
    athena_partition_projection_settings: typing.AthenaPartitionProjectionSettings | None = None,
    catalog_id: str | None = None,
) -> None:
    r"""Create a JSON Table (Metadata Only) in the AWS Glue Catalog.

    'https://docs.aws.amazon.com/athena/latest/ug/data-types.html'

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    path
        Amazon S3 path (e.g. s3://bucket/prefix/).
    columns_types
        Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
    table_type
        The type of the Glue Table. Set to EXTERNAL_TABLE if None.
    partitions_types
        Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
    bucketing_info
        Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
        second element.
        Only `str`, `int` and `bool` are supported as column data types for bucketing.
    compression
        Compression style (``None``, ``gzip``, etc).
    description
        Table description
    parameters
        Key/value pairs to tag the table.
    columns_comments
        Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}).
    columns_parameters
        Columns names and the related parameters (e.g. {'col0': {'par0': 'Param 0', 'par1': 'Param 1'}}).
    mode
        'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
    catalog_versioning
        If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
    schema_evolution
        If True allows schema evolution (new or missing columns), otherwise a exception will be raised.
        (Only considered if dataset=True and mode in ("append", "overwrite_partitions"))
        Related tutorial:
        https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/014%20-%20Schema%20Evolution.html
    serde_library
        Specifies the SerDe Serialization library which will be used. You need to provide the Class library name
        as a string.
        If no library is provided the default is `org.openx.data.jsonserde.JsonSerDe`.
    serde_parameters
        Dictionary of initialization parameters for the SerDe.
        The default is `{"field.delim": sep, "escape.delim": "\\"}`.
    athena_partition_projection_settings
        Parameters of the Athena Partition Projection (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html).
        AthenaPartitionProjectionSettings is a `TypedDict`, meaning the passed parameter can be instantiated either as an
        instance of AthenaPartitionProjectionSettings or as a regular Python dict.

        Following projection parameters are supported:

        .. list-table:: Projection Parameters
           :header-rows: 1

           * - Name
             - Type
             - Description
           * - projection_types
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections types.
               Valid types: "enum", "integer", "date", "injected"
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'enum', 'col2_name': 'integer'})
           * - projection_ranges
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections ranges.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
           * - projection_values
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections values.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
           * - projection_intervals
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections intervals.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '5'})
           * - projection_digits
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections digits.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_name': '1', 'col2_name': '2'})
           * - projection_formats
             - Optional[Dict[str, str]]
             - Dictionary of partitions names and Athena projections formats.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
               (e.g. {'col_date': 'yyyy-MM-dd', 'col2_timestamp': 'yyyy-MM-dd HH:mm:ss'})
           * - projection_storage_location_template
             - Optional[str]
             - Value which is allows Athena to properly map partition values if the S3 file locations do not follow
               a typical `.../column=value/...` pattern.
               https://docs.aws.amazon.com/athena/latest/ug/partition-projection-setting-up.html
               (e.g. s3://bucket/table_root/a=${a}/${b}/some_static_subdirectory/${c}/)
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.create_json_table(
    ...     database='default',
    ...     table='my_table',
    ...     path='s3://bucket/prefix/',
    ...     columns_types={'col0': 'bigint', 'col1': 'double'},
    ...     partitions_types={'col2': 'date'},
    ...     description='My very own JSON table!',
    ...     parameters={'source': 'postgresql'},
    ...     columns_comments={'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}
    ... )

    """
    catalog_table_input: dict[str, Any] | None = _get_table_input(
        database=database, table=table, boto3_session=boto3_session, catalog_id=catalog_id
    )
    _create_json_table(
        database=database,
        table=table,
        path=path,
        columns_types=columns_types,
        table_type=table_type,
        partitions_types=partitions_types,
        bucketing_info=bucketing_info,
        catalog_id=catalog_id,
        compression=compression,
        description=description,
        parameters=parameters,
        columns_comments=columns_comments,
        columns_parameters=columns_parameters,
        mode=mode,
        catalog_versioning=catalog_versioning,
        schema_evolution=schema_evolution,
        athena_partition_projection_settings=athena_partition_projection_settings,
        boto3_session=boto3_session,
        catalog_table_input=catalog_table_input,
        serde_library=serde_library,
        serde_parameters=serde_parameters,
    )
