"""AWS Glue Catalog Get Module."""

from __future__ import annotations

import base64
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterator, cast

import boto3
import botocore.exceptions

import awswrangler.pandas as pd
from awswrangler import _utils, exceptions
from awswrangler._config import apply_configs
from awswrangler.catalog._utils import _catalog_id, _extract_dtypes_from_table_details

if TYPE_CHECKING:
    from mypy_boto3_glue.type_defs import GetPartitionsResponseTypeDef

_logger: logging.Logger = logging.getLogger(__name__)


def _get_table_input(
    database: str,
    table: str,
    boto3_session: boto3.Session | None,
    catalog_id: str | None = None,
) -> dict[str, Any] | None:
    client_glue = _utils.client("glue", session=boto3_session)
    args: dict[str, Any] = _catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table)
    try:
        response = client_glue.get_table(**args)
    except client_glue.exceptions.EntityNotFoundException:
        return None
    table_input: dict[str, Any] = {}
    for k, v in response["Table"].items():
        if k in [
            "Name",
            "Description",
            "Owner",
            "LastAccessTime",
            "LastAnalyzedTime",
            "Retention",
            "StorageDescriptor",
            "PartitionKeys",
            "ViewOriginalText",
            "ViewExpandedText",
            "TableType",
            "Parameters",
            "TargetTable",
        ]:
            table_input[k] = v
    return table_input


def _append_partitions(partitions_values: dict[str, list[str]], response: "GetPartitionsResponseTypeDef") -> str | None:
    _logger.debug("response: %s", response)
    token: str | None = response.get("NextToken", None)
    if (response is not None) and ("Partitions" in response):
        for partition in response["Partitions"]:
            location: str | None = partition["StorageDescriptor"].get("Location")
            if location is not None:
                values: list[str] = partition["Values"]
                partitions_values[location] = values
    else:
        token = None
    return token


def _get_partitions(
    database: str,
    table: str,
    expression: str | None = None,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
    client_glue = _utils.client("glue", session=boto3_session)

    args: dict[str, Any] = _catalog_id(
        catalog_id=catalog_id,
        DatabaseName=database,
        TableName=table,
        MaxResults=1_000,
        Segment={"SegmentNumber": 0, "TotalSegments": 1},
        ExcludeColumnSchema=True,
    )
    if expression is not None:
        args["Expression"] = expression

    partitions_values: dict[str, list[str]] = {}
    _logger.debug("Starting pagination...")

    response = client_glue.get_partitions(**args)
    token: str | None = _append_partitions(partitions_values=partitions_values, response=response)
    while token is not None:
        args["NextToken"] = response["NextToken"]
        response = client_glue.get_partitions(**args)
        token = _append_partitions(partitions_values=partitions_values, response=response)

    _logger.debug("Pagination done.")
    return partitions_values


@apply_configs
def get_table_types(
    database: str,
    table: str,
    catalog_id: str | None = None,
    filter_iceberg_current: bool = False,
    boto3_session: boto3.Session | None = None,
) -> dict[str, str] | None:
    """Get all columns and types from a table.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    filter_iceberg_current
        If True, returns only current iceberg fields (fields marked with iceberg.field.current: true).
        Otherwise, returns the all fields. False by default (return all fields).
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        If table exists, a dictionary like {'col name': 'col data type'}. Otherwise None.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.get_table_types(database='default', table='my_table')
    {'col0': 'int', 'col1': double}

    """
    client_glue = _utils.client(service_name="glue", session=boto3_session)
    try:
        response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
    except client_glue.exceptions.EntityNotFoundException:
        return None
    return _extract_dtypes_from_table_details(
        response=response,
        filter_iceberg_current=filter_iceberg_current,
    )


def get_databases(
    catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> Iterator[dict[str, Any]]:
    """Get an iterator of databases.

    Parameters
    ----------
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Iterator of Databases.

    Examples
    --------
    >>> import awswrangler as wr
    >>> dbs = wr.catalog.get_databases()

    """
    client_glue = _utils.client("glue", session=boto3_session)
    paginator = client_glue.get_paginator("get_databases")
    response_iterator = paginator.paginate(**_catalog_id(catalog_id=catalog_id))
    for page in response_iterator:
        for db in page["DatabaseList"]:
            yield cast(Dict[str, Any], db)


@apply_configs
def databases(
    limit: int = 100, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> pd.DataFrame:
    """Get a Pandas DataFrame with all listed databases.

    Parameters
    ----------
    limit
        Max number of tables to be returned.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Pandas DataFrame filled by formatted table information.

    Examples
    --------
    >>> import awswrangler as wr
    >>> df_dbs = wr.catalog.databases()

    """
    database_iter = get_databases(catalog_id=catalog_id, boto3_session=boto3_session)
    dbs = itertools.islice(database_iter, limit)
    df_dict: dict[str, list[str]] = {"Database": [], "Description": []}
    for db in dbs:
        df_dict["Database"].append(db["Name"])
        df_dict["Description"].append(db.get("Description", ""))
    return pd.DataFrame(data=df_dict)


@apply_configs
def get_tables(
    catalog_id: str | None = None,
    database: str | None = None,
    name_contains: str | None = None,
    name_prefix: str | None = None,
    name_suffix: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> Iterator[dict[str, Any]]:
    """Get an iterator of tables.

    Note
    ----
    Please, do not filter using name_contains and name_prefix/name_suffix at the same time.
    Only name_prefix and name_suffix can be combined together.

    Parameters
    ----------
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    database
        Database name.
    name_contains
        Select by a specific string on table name
    name_prefix
        Select by a specific prefix on table name
    name_suffix
        Select by a specific suffix on table name
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Iterator of tables.

    Examples
    --------
    >>> import awswrangler as wr
    >>> tables = wr.catalog.get_tables()

    """
    client_glue = _utils.client(service_name="glue", session=boto3_session)
    paginator = client_glue.get_paginator("get_tables")
    args: dict[str, str] = {}
    if (name_prefix is not None) and (name_suffix is not None) and (name_contains is not None):
        raise exceptions.InvalidArgumentCombination(
            "Please, do not filter using name_contains and "
            "name_prefix/name_suffix at the same time. Only "
            "name_prefix and name_suffix can be combined together."
        )
    if (name_prefix is not None) and (name_suffix is not None):
        args["Expression"] = f"{name_prefix}*{name_suffix}"
    elif name_contains is not None:
        args["Expression"] = f"*{name_contains}*"
    elif name_prefix is not None:
        args["Expression"] = f"{name_prefix}*"
    elif name_suffix is not None:
        args["Expression"] = f"*{name_suffix}"
    if database is not None:
        dbs: list[str] = [database]
    else:
        dbs = [x["Name"] for x in get_databases(catalog_id=catalog_id)]
    for db in dbs:
        args["DatabaseName"] = db
        response_iterator = paginator.paginate(**_catalog_id(catalog_id=catalog_id, **args))
        try:
            for page in response_iterator:
                for tbl in page["TableList"]:
                    yield cast(Dict[str, Any], tbl)
        except client_glue.exceptions.EntityNotFoundException:
            continue


@apply_configs
def tables(
    limit: int = 100,
    catalog_id: str | None = None,
    database: str | None = None,
    search_text: str | None = None,
    name_contains: str | None = None,
    name_prefix: str | None = None,
    name_suffix: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
    """Get a DataFrame with tables filtered by a search term, prefix, suffix.

    Parameters
    ----------
    limit
        Max number of tables to be returned.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    database
        Database name.
    search_text
        Select only tables with the given string in table's properties.
    name_contains
        Select by a specific string on table name
    name_prefix
        Select by a specific prefix on table name
    name_suffix
        Select by a specific suffix on table name
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Pandas DataFrame filled by formatted table information.

    Examples
    --------
    >>> import awswrangler as wr
    >>> df_tables = wr.catalog.tables()

    """
    if search_text is None:
        table_iter = get_tables(
            catalog_id=catalog_id,
            database=database,
            name_contains=name_contains,
            name_prefix=name_prefix,
            name_suffix=name_suffix,
            boto3_session=boto3_session,
        )
        tbls: list[dict[str, Any]] = list(itertools.islice(table_iter, limit))
    else:
        tbls = list(search_tables(text=search_text, catalog_id=catalog_id, boto3_session=boto3_session))
        if database is not None:
            tbls = [x for x in tbls if x["DatabaseName"] == database]
        if name_contains is not None:
            tbls = [x for x in tbls if name_contains in x["Name"]]
        if name_prefix is not None:
            tbls = [x for x in tbls if x["Name"].startswith(name_prefix)]
        if name_suffix is not None:
            tbls = [x for x in tbls if x["Name"].endswith(name_suffix)]
        tbls = tbls[:limit]

    df_dict: dict[str, list[str]] = {
        "Database": [],
        "Table": [],
        "Description": [],
        "TableType": [],
        "Columns": [],
        "Partitions": [],
    }
    for tbl in tbls:
        df_dict["Database"].append(tbl["DatabaseName"])
        df_dict["Table"].append(tbl["Name"])
        df_dict["Description"].append(tbl.get("Description", ""))
        df_dict["TableType"].append(tbl.get("TableType", ""))
        try:
            columns = tbl["StorageDescriptor"]["Columns"]
            df_dict["Columns"].append(", ".join([x["Name"] for x in columns]))
        except KeyError:
            df_dict["Columns"].append("")
        if "PartitionKeys" in tbl:
            df_dict["Partitions"].append(", ".join([x["Name"] for x in tbl["PartitionKeys"]]))
        else:
            df_dict["Partitions"].append("")
    return pd.DataFrame(data=df_dict)


def search_tables(
    text: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> Iterator[dict[str, Any]]:
    """Get Pandas DataFrame of tables filtered by a search string.

    Parameters
    ----------
    text
        Select only tables with the given string in table's properties.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Iterator of tables.

    Examples
    --------
    >>> import awswrangler as wr
    >>> df_tables = wr.catalog.search_tables(text='my_property')

    """
    client_glue = _utils.client("glue", session=boto3_session)
    args: dict[str, Any] = _catalog_id(catalog_id=catalog_id, SearchText=text)
    response = client_glue.search_tables(**args)
    for tbl in response["TableList"]:
        yield cast(Dict[str, Any], tbl)
    while "NextToken" in response:
        args["NextToken"] = response["NextToken"]
        response = client_glue.search_tables(**args)
        for tbl in response["TableList"]:
            yield cast(Dict[str, Any], tbl)


@apply_configs
def table(
    database: str,
    table: str,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
    """Get table details as Pandas DataFrame.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Pandas DataFrame filled by formatted table information.

    Examples
    --------
    >>> import awswrangler as wr
    >>> df_table = wr.catalog.table(database='default', table='my_table')

    """
    client_glue = _utils.client(service_name="glue", session=boto3_session)
    tbl = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))["Table"]
    df_dict: dict[str, list[str | bool]] = {"Column Name": [], "Type": [], "Partition": [], "Comment": []}
    if "StorageDescriptor" in tbl:
        for col in tbl["StorageDescriptor"].get("Columns", {}):
            df_dict["Column Name"].append(col["Name"])
            df_dict["Type"].append(col["Type"])
            df_dict["Partition"].append(False)
            if "Comment" in col:
                df_dict["Comment"].append(col["Comment"])
            else:
                df_dict["Comment"].append("")
    if "PartitionKeys" in tbl:
        for col in tbl["PartitionKeys"]:
            df_dict["Column Name"].append(col["Name"])
            df_dict["Type"].append(col["Type"])
            df_dict["Partition"].append(True)
            if "Comment" in col:
                df_dict["Comment"].append(col["Comment"])
            else:
                df_dict["Comment"].append("")
    return pd.DataFrame(data=df_dict)


@apply_configs
def get_table_location(
    database: str,
    table: str,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> str:
    """Get table's location on Glue catalog.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Table's location.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.catalog.get_table_location(database='default', table='my_table')
    's3://bucket/prefix/'

    """
    client_glue = _utils.client("glue", session=boto3_session)
    res = client_glue.get_table(
        **_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table),
    )
    try:
        return res["Table"]["StorageDescriptor"]["Location"]
    except KeyError as ex:
        raise exceptions.InvalidTable(f"{database}.{table}") from ex


def get_connection(
    name: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> dict[str, Any]:
    """Get Glue connection details.

    Parameters
    ----------
    name
        Connection name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        API Response for:
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_connection

    Examples
    --------
    >>> import awswrangler as wr
    >>> res = wr.catalog.get_connection(name='my_connection')

    """
    client_glue = _utils.client("glue", session=boto3_session)

    res = _utils.try_it(
        f=client_glue.get_connection,
        ex=botocore.exceptions.ClientError,
        ex_code="ThrottlingException",
        max_num_tries=3,
        **_catalog_id(catalog_id=catalog_id, Name=name, HidePassword=False),
    )["Connection"]

    if "ENCRYPTED_PASSWORD" in res["ConnectionProperties"]:
        client_kms = _utils.client(service_name="kms", session=boto3_session)
        pwd = client_kms.decrypt(CiphertextBlob=base64.b64decode(res["ConnectionProperties"]["ENCRYPTED_PASSWORD"]))[
            "Plaintext"
        ].decode("utf-8")
        res["ConnectionProperties"]["PASSWORD"] = pwd
    return cast(Dict[str, Any], res)


@apply_configs
def get_parquet_partitions(
    database: str,
    table: str,
    expression: str | None = None,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
    """Get all partitions from a Table in the AWS Glue Catalog.

    Expression argument instructions:
    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    expression
        An expression that filters the partitions to be returned.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        partitions_values: Dictionary with keys as S3 path locations and values as a
        list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).

    Examples
    --------
    Fetch all partitions

    >>> import awswrangler as wr
    >>> wr.catalog.get_parquet_partitions(
    ...     database='default',
    ...     table='my_table',
    ... )
    {
        's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
        's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
        's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
    }

    Filtering partitions

    >>> import awswrangler as wr
    >>> wr.catalog.get_parquet_partitions(
    ...     database='default',
    ...     table='my_table',
    ...     expression='m=10'
    ... )
    {
        's3://bucket/prefix/y=2020/m=10/': ['2020', '10']
    }

    """
    return _get_partitions(
        database=database,
        table=table,
        expression=expression,
        catalog_id=catalog_id,
        boto3_session=boto3_session,
    )


@apply_configs
def get_csv_partitions(
    database: str,
    table: str,
    expression: str | None = None,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
    """Get all partitions from a Table in the AWS Glue Catalog.

    Expression argument instructions:
    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    expression
        An expression that filters the partitions to be returned.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        partitions_values: Dictionary with keys as S3 path locations and values as a
        list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).

    Examples
    --------
    Fetch all partitions

    >>> import awswrangler as wr
    >>> wr.catalog.get_csv_partitions(
    ...     database='default',
    ...     table='my_table',
    ... )
    {
        's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
        's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
        's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
    }

    Filtering partitions

    >>> import awswrangler as wr
    >>> wr.catalog.get_csv_partitions(
    ...     database='default',
    ...     table='my_table',
    ...     expression='m=10'
    ... )
    {
        's3://bucket/prefix/y=2020/m=10/': ['2020', '10']
    }

    """
    return _get_partitions(
        database=database,
        table=table,
        expression=expression,
        catalog_id=catalog_id,
        boto3_session=boto3_session,
    )


@apply_configs
def get_partitions(
    database: str,
    table: str,
    expression: str | None = None,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
    """Get all partitions from a Table in the AWS Glue Catalog.

    Expression argument instructions:
    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    expression
        An expression that filters the partitions to be returned.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        partitions_values: Dictionary with keys as S3 path locations and values as a
        list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).

    Examples
    --------
    Fetch all partitions

    >>> import awswrangler as wr
    >>> wr.catalog.get_partitions(
    ...     database='default',
    ...     table='my_table',
    ... )
    {
        's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
        's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
        's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
    }

    Filtering partitions

    >>> import awswrangler as wr
    >>> wr.catalog.get_partitions(
    ...     database='default',
    ...     table='my_table',
    ...     expression='m=10'
    ... )
    {
        's3://bucket/prefix/y=2020/m=10/': ['2020', '10']
    }

    """
    return _get_partitions(
        database=database,
        table=table,
        expression=expression,
        catalog_id=catalog_id,
        boto3_session=boto3_session,
    )


def get_table_parameters(
    database: str,
    table: str,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, str]:
    """Get all parameters.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Dictionary of parameters.

    Examples
    --------
    >>> import awswrangler as wr
    >>> pars = wr.catalog.get_table_parameters(database="...", table="...")

    """
    client_glue = _utils.client("glue", session=boto3_session)
    response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
    parameters: dict[str, str] = response["Table"]["Parameters"]
    return parameters


def get_table_description(
    database: str,
    table: str,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> str | None:
    """Get table description.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Description if exists.

    Examples
    --------
    >>> import awswrangler as wr
    >>> desc = wr.catalog.get_table_description(database="...", table="...")

    """
    client_glue = _utils.client("glue", session=boto3_session)
    response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
    desc: str | None = response["Table"].get("Description", None)
    return desc


@apply_configs
def get_columns_comments(
    database: str,
    table: str,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, str | None]:
    """Get all columns comments.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Columns comments. e.g. {"col1": "foo boo bar", "col2": None}.

    Examples
    --------
    >>> import awswrangler as wr
    >>> pars = wr.catalog.get_columns_comments(database="...", table="...")

    """
    client_glue = _utils.client("glue", session=boto3_session)
    response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
    comments: dict[str, str | None] = {}
    for c in response["Table"]["StorageDescriptor"]["Columns"]:
        comments[c["Name"]] = c.get("Comment")
    if "PartitionKeys" in response["Table"]:
        for p in response["Table"]["PartitionKeys"]:
            comments[p["Name"]] = p.get("Comment")
    return comments


@apply_configs
def get_columns_parameters(
    database: str,
    table: str,
    catalog_id: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, dict[str, str] | None]:
    """Get all columns parameters.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Columns parameters.

    Examples
    --------
    >>> import awswrangler as wr
    >>> pars = wr.catalog.get_columns_parameters(database="...", table="...")

    """
    client_glue = _utils.client("glue", session=boto3_session)
    response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
    parameters = {}
    for c in response["Table"]["StorageDescriptor"]["Columns"]:
        parameters[c["Name"]] = c.get("Parameters")
    if "PartitionKeys" in response["Table"]:
        for p in response["Table"]["PartitionKeys"]:
            parameters[p["Name"]] = p.get("Parameters")
    return parameters


@apply_configs
def get_table_versions(
    database: str, table: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> list[dict[str, Any]]:
    """Get all versions.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        List of table inputs:
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_table_versions

    Examples
    --------
    >>> import awswrangler as wr
    >>> tables_versions = wr.catalog.get_table_versions(database="...", table="...")

    """
    client_glue = _utils.client("glue", session=boto3_session)
    paginator = client_glue.get_paginator("get_table_versions")
    versions: list[dict[str, Any]] = []
    response_iterator = paginator.paginate(**_catalog_id(DatabaseName=database, TableName=table, catalog_id=catalog_id))
    for page in response_iterator:
        for tbl in page["TableVersions"]:
            versions.append(cast(Dict[str, Any], tbl))
    return versions


@apply_configs
def get_table_number_of_versions(
    database: str, table: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> int:
    """Get total number of versions.

    Parameters
    ----------
    database
        Database name.
    table
        Table name.
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If ``None`` is provided, the AWS account ID is used by default.
    boto3_session
        The default boto3 session will be used if **boto3_session** receive ``None``.

    Returns
    -------
        Total number of versions.

    Examples
    --------
    >>> import awswrangler as wr
    >>> num = wr.catalog.get_table_number_of_versions(database="...", table="...")

    """
    client_glue = _utils.client(service_name="glue", session=boto3_session)
    paginator = client_glue.get_paginator("get_table_versions")
    count: int = 0
    response_iterator = paginator.paginate(**_catalog_id(DatabaseName=database, TableName=table, catalog_id=catalog_id))
    for page in response_iterator:
        count += len(page["TableVersions"])
    return count
