"""Amazon S3 Read ORC Module (PRIVATE)."""

from __future__ import annotations

import datetime
import itertools
import logging
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
)

import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.dataset
from typing_extensions import Literal

from awswrangler import _data_types, _utils, exceptions
from awswrangler._arrow import _add_table_partitions
from awswrangler._config import apply_configs
from awswrangler._distributed import engine
from awswrangler._executor import _BaseExecutor, _get_executor
from awswrangler.s3._fs import open_s3_object
from awswrangler.s3._list import _path2list
from awswrangler.s3._read import (
    _apply_partition_filter,
    _check_version_id,
    _extract_partitions_dtypes_from_table_details,
    _get_num_output_blocks,
    _get_path_ignore_suffix,
    _get_path_root,
    _get_paths_for_glue_table,
    _TableMetadataReader,
)
from awswrangler.typing import RaySettings, _ReadTableMetadataReturnValue

if TYPE_CHECKING:
    from mypy_boto3_s3 import S3Client
    from pyarrow.orc import ORCFile

FULL_READ_S3_BLOCK_SIZE = 20_971_520  # 20 MB (20 * 2**20)
METADATA_READ_S3_BLOCK_SIZE = 131_072  # 128 KB (128 * 2**10)

_logger: logging.Logger = logging.getLogger(__name__)


def _pyarrow_orc_file_wrapper(source: Any) -> "ORCFile":
    from pyarrow.orc import ORCFile

    try:
        return ORCFile(source=source)
    except pyarrow.ArrowInvalid as ex:
        if str(ex) == "ORC file size is 0 bytes":
            _logger.warning("Ignoring empty file...")
            return None
        raise


@engine.dispatch_on_engine
def _read_orc_metadata_file(
    s3_client: "S3Client" | None,
    path: str,
    s3_additional_kwargs: dict[str, str] | None,
    use_threads: bool | int,
    version_id: str | None = None,
) -> pa.schema:
    with open_s3_object(
        path=path,
        mode="rb",
        version_id=version_id,
        use_threads=use_threads,
        s3_client=s3_client,
        s3_block_size=METADATA_READ_S3_BLOCK_SIZE,
        s3_additional_kwargs=s3_additional_kwargs,
    ) as f:
        orc_file: "ORCFile" | None = _pyarrow_orc_file_wrapper(source=f)
        if orc_file:
            return orc_file.schema
        return None


class _ORCTableMetadataReader(_TableMetadataReader):
    def _read_metadata_file(
        self,
        s3_client: "S3Client" | None,
        path: str,
        s3_additional_kwargs: dict[str, str] | None,
        use_threads: bool | int,
        version_id: str | None = None,
        coerce_int96_timestamp_unit: str | None = None,
    ) -> pa.schema:
        return _read_orc_metadata_file(
            s3_client=s3_client,
            path=path,
            s3_additional_kwargs=s3_additional_kwargs,
            use_threads=use_threads,
            version_id=version_id,
        )


def _read_orc_file(
    s3_client: "S3Client" | None,
    path: str,
    path_root: str | None,
    columns: list[str] | None,
    s3_additional_kwargs: dict[str, str] | None,
    use_threads: bool | int,
    version_id: str | None = None,
) -> pa.Table:
    s3_block_size: int = FULL_READ_S3_BLOCK_SIZE if columns else -1  # One shot for a full read or see constant
    with open_s3_object(
        path=path,
        mode="rb",
        version_id=version_id,
        use_threads=use_threads,
        s3_block_size=s3_block_size,
        s3_additional_kwargs=s3_additional_kwargs,
        s3_client=s3_client,
    ) as f:
        orc_file: "ORCFile" | None = _pyarrow_orc_file_wrapper(
            source=f,
        )
        if orc_file is None:
            raise exceptions.InvalidFile(f"Invalid ORC file: {path}")
        return _add_table_partitions(
            table=orc_file.read(columns=columns),
            path=path,
            path_root=path_root,
        )


@engine.dispatch_on_engine
def _read_orc(
    paths: list[str],
    path_root: str | None,
    schema: pa.schema | None,
    columns: list[str] | None,
    use_threads: bool | int,
    override_num_blocks: int,
    version_ids: dict[str, str] | None,
    s3_client: "S3Client" | None,
    s3_additional_kwargs: dict[str, Any] | None,
    arrow_kwargs: dict[str, Any],
) -> pd.DataFrame:
    executor: _BaseExecutor = _get_executor(use_threads=use_threads)
    tables = executor.map(
        _read_orc_file,
        s3_client,
        paths,
        itertools.repeat(path_root),
        itertools.repeat(columns),
        itertools.repeat(s3_additional_kwargs),
        itertools.repeat(use_threads),
        [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths],
    )
    return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs)


@_utils.validate_distributed_kwargs(
    unsupported_kwargs=["boto3_session", "version_id", "s3_additional_kwargs", "dtype_backend"],
)
@apply_configs
def read_orc(
    path: str | list[str],
    path_root: str | None = None,
    dataset: bool = False,
    path_suffix: str | list[str] | None = None,
    path_ignore_suffix: str | list[str] | None = None,
    ignore_empty: bool = True,
    partition_filter: Callable[[dict[str, str]], bool] | None = None,
    columns: list[str] | None = None,
    validate_schema: bool = False,
    last_modified_begin: datetime.datetime | None = None,
    last_modified_end: datetime.datetime | None = None,
    version_id: str | dict[str, str] | None = None,
    dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
    use_threads: bool | int = True,
    ray_args: RaySettings | None = None,
    boto3_session: boto3.Session | None = None,
    s3_additional_kwargs: dict[str, Any] | None = None,
    pyarrow_additional_kwargs: dict[str, Any] | None = None,
) -> pd.DataFrame:
    """Read ORC file(s) from an S3 prefix or list of S3 objects paths.

    The concept of `dataset` enables more complex features like partitioning
    and catalog integration (AWS Glue Catalog).

    This function accepts Unix shell-style wildcards in the path argument.
    * (matches everything), ? (matches any single character),
    [seq] (matches any character in seq), [!seq] (matches any character not in seq).
    If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`),
    you can use `glob.escape(path)` before passing the argument to this function.

    Note
    ----
    If `use_threads=True`, the number of threads is obtained from os.cpu_count().

    Note
    ----
    Filtering by `last_modified begin` and `last_modified end` is applied after listing all S3 files

    Parameters
    ----------
    path
        S3 prefix (accepts Unix shell-style wildcards)
        (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
    path_root
        Root path of the dataset. If dataset=`True`, it is used as a starting point to load partition columns.
    dataset
        If `True`, read an ORC dataset instead of individual file(s), loading all related partitions as columns.
    path_suffix
        Suffix or List of suffixes to be read (e.g. [".gz.orc", ".snappy.orc"]).
        If None, reads all files. (default)
    path_ignore_suffix
        Suffix or List of suffixes to be ignored.(e.g. [".csv", "_SUCCESS"]).
        If None, reads all files. (default)
    ignore_empty
        Ignore files with 0 bytes.
    partition_filter
        Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
        This function must receive a single argument (Dict[str, str]) where keys are partitions
        names and values are partitions values. Partitions values must be strings and the function
        must return a bool, True to read the partition or False to ignore it.
        Ignored if `dataset=False`.
        E.g ``lambda x: True if x["year"] == "2020" and x["month"] == "1" else False``
        https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/023%20-%20Flexible%20Partitions%20Filter.html
    columns
        List of columns to read from the file(s).
    validate_schema
        Check that the schema is consistent across individual files.
    last_modified_begin
        Filter S3 objects by Last modified date.
        Filter is only applied after listing all objects.
    last_modified_end
        Filter S3 objects by Last modified date.
        Filter is only applied after listing all objects.
    version_id
        Version id of the object or mapping of object path to version id.
        (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
    dtype_backend
        Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
        nullable dtypes are used for all dtypes that have a nullable implementation when
        “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set.

        The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above.
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled, os.cpu_count() is used as the max number of threads.
        If integer is provided, specified number is used.
    ray_args
        Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed.
    boto3_session
        Boto3 Session. The default boto3 session is used if None is received.
    s3_additional_kwargs
        Forward to S3 botocore requests.
    pyarrow_additional_kwargs
        Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
        Valid values include "split_blocks", "self_destruct", "ignore_metadata".
        e.g. pyarrow_additional_kwargs={'split_blocks': True}.

    Returns
    -------
        Pandas DataFrame.

    Examples
    --------
    Reading all ORC files under a prefix

    >>> import awswrangler as wr
    >>> df = wr.s3.read_orc(path='s3://bucket/prefix/')

    Reading all ORC files from a list

    >>> import awswrangler as wr
    >>> df = wr.s3.read_orc(path=['s3://bucket/filename0.orc', 's3://bucket/filename1.orc'])

    Reading ORC Dataset with PUSH-DOWN filter over partitions

    >>> import awswrangler as wr
    >>> my_filter = lambda x: True if x["city"].startswith("new") else False
    >>> df = wr.s3.read_orc(path, dataset=True, partition_filter=my_filter)

    """
    s3_client = _utils.client(service_name="s3", session=boto3_session)
    paths: list[str] = _path2list(
        path=path,
        s3_client=s3_client,
        suffix=path_suffix,
        ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
        last_modified_begin=last_modified_begin,
        last_modified_end=last_modified_end,
        ignore_empty=ignore_empty,
        s3_additional_kwargs=s3_additional_kwargs,
    )
    if not path_root:
        path_root = _get_path_root(path=path, dataset=dataset)
    if path_root and partition_filter:
        paths = _apply_partition_filter(path_root=path_root, paths=paths, filter_func=partition_filter)
    if len(paths) < 1:
        raise exceptions.NoFilesFound(f"No files Found on: {path}.")

    version_ids = _check_version_id(paths=paths, version_id=version_id)

    # Create PyArrow schema based on file metadata, columns filter, and partitions
    schema: pa.schema | None = None
    if validate_schema:
        metadata_reader = _ORCTableMetadataReader()
        schema = metadata_reader.validate_schemas(
            paths=paths,
            path_root=path_root,
            columns=columns,
            validate_schema=validate_schema,
            s3_client=s3_client,
            version_ids=version_ids,
            use_threads=use_threads,
            s3_additional_kwargs=s3_additional_kwargs,
        )

    arrow_kwargs = _data_types.pyarrow2pandas_defaults(
        use_threads=use_threads, kwargs=pyarrow_additional_kwargs, dtype_backend=dtype_backend
    )

    return _read_orc(
        paths,
        path_root=path_root,
        schema=schema,
        columns=columns,
        use_threads=use_threads,
        override_num_blocks=_get_num_output_blocks(ray_args),
        s3_client=s3_client,
        s3_additional_kwargs=s3_additional_kwargs,
        arrow_kwargs=arrow_kwargs,
        version_ids=version_ids,
    )


@_utils.validate_distributed_kwargs(
    unsupported_kwargs=["boto3_session", "s3_additional_kwargs", "dtype_backend"],
)
@apply_configs
def read_orc_table(
    table: str,
    database: str,
    filename_suffix: str | list[str] | None = None,
    filename_ignore_suffix: str | list[str] | None = None,
    catalog_id: str | None = None,
    partition_filter: Callable[[dict[str, str]], bool] | None = None,
    columns: list[str] | None = None,
    validate_schema: bool = True,
    dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
    use_threads: bool | int = True,
    ray_args: RaySettings | None = None,
    boto3_session: boto3.Session | None = None,
    s3_additional_kwargs: dict[str, Any] | None = None,
    pyarrow_additional_kwargs: dict[str, Any] | None = None,
) -> pd.DataFrame:
    """Read Apache ORC table registered in the AWS Glue Catalog.

    Note
    ----
    If `use_threads=True`, the number of threads is obtained from os.cpu_count().

    Parameters
    ----------
    table
        AWS Glue Catalog table name.
    database
        AWS Glue Catalog database name.
    filename_suffix
        Suffix or List of suffixes to be read (e.g. [".gz.orc", ".snappy.orc"]).
        If None, read all files. (default)
    filename_ignore_suffix
        Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]).
        If None, read all files. (default)
    catalog_id
        The ID of the Data Catalog from which to retrieve Databases.
        If none is provided, the AWS account ID is used by default.
    partition_filter
        Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
        This function must receive a single argument (Dict[str, str]) where keys are partitions
        names and values are partitions values. Partitions values must be strings and the function
        must return a bool, True to read the partition or False to ignore it.
        Ignored if `dataset=False`.
        E.g ``lambda x: True if x["year"] == "2020" and x["month"] == "1" else False``
        https://aws-sdk-pandas.readthedocs.io/en/3.11.0/tutorials/023%20-%20Flexible%20Partitions%20Filter.html
    columns
        List of columns to read from the file(s).
    validate_schema
        Check that the schema is consistent across individual files.
    dtype_backend
        Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
        nullable dtypes are used for all dtypes that have a nullable implementation when
        “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set.

        The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above.
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled, os.cpu_count() is used as the max number of threads.
        If integer is provided, specified number is used.
    ray_args
        Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed.
    boto3_session
        Boto3 Session. The default boto3 session is used if None is received.
    s3_additional_kwargs
        Forward to S3 botocore requests.
    pyarrow_additional_kwargs
        Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
        Valid values include "split_blocks", "self_destruct", "ignore_metadata".
        e.g. pyarrow_additional_kwargs={'split_blocks': True}.

    Returns
    -------
        Pandas DataFrame.

    Examples
    --------
    Reading ORC Table

    >>> import awswrangler as wr
    >>> df = wr.s3.read_orc_table(database='...', table='...')

    Reading ORC Dataset with PUSH-DOWN filter over partitions

    >>> import awswrangler as wr
    >>> my_filter = lambda x: True if x["city"].startswith("new") else False
    >>> df = wr.s3.read_orc_table(path, dataset=True, partition_filter=my_filter)

    """
    paths: str | list[str]
    path_root: str | None

    paths, path_root, res = _get_paths_for_glue_table(
        table=table,
        database=database,
        filename_suffix=filename_suffix,
        filename_ignore_suffix=filename_ignore_suffix,
        catalog_id=catalog_id,
        partition_filter=partition_filter,
        boto3_session=boto3_session,
        s3_additional_kwargs=s3_additional_kwargs,
    )

    df = read_orc(
        path=paths,
        path_root=path_root,
        dataset=True,
        path_suffix=filename_suffix if path_root is None else None,
        path_ignore_suffix=filename_ignore_suffix if path_root is None else None,
        columns=columns,
        validate_schema=validate_schema,
        dtype_backend=dtype_backend,
        use_threads=use_threads,
        ray_args=ray_args,
        boto3_session=boto3_session,
        s3_additional_kwargs=s3_additional_kwargs,
        pyarrow_additional_kwargs=pyarrow_additional_kwargs,
    )

    return _data_types.cast_pandas_with_athena_types(
        df=df,
        dtype=_extract_partitions_dtypes_from_table_details(response=res),
        dtype_backend=dtype_backend,
    )


@_utils.validate_distributed_kwargs(
    unsupported_kwargs=["boto3_session"],
)
@apply_configs
def read_orc_metadata(
    path: str | list[str],
    dataset: bool = False,
    version_id: str | dict[str, str] | None = None,
    path_suffix: str | None = None,
    path_ignore_suffix: str | list[str] | None = None,
    ignore_empty: bool = True,
    ignore_null: bool = False,
    dtype: dict[str, str] | None = None,
    sampling: float = 1.0,
    use_threads: bool | int = True,
    boto3_session: boto3.Session | None = None,
    s3_additional_kwargs: dict[str, Any] | None = None,
) -> _ReadTableMetadataReturnValue:
    """Read Apache ORC file(s) metadata from an S3 prefix or list of S3 objects paths.

    The concept of `dataset` enables more complex features like partitioning
    and catalog integration (AWS Glue Catalog).

    This function accepts Unix shell-style wildcards in the path argument.
    * (matches everything), ? (matches any single character),
    [seq] (matches any character in seq), [!seq] (matches any character not in seq).
    If you want to use a path which includes Unix shell-style wildcard characters (`*, ?, []`),
    you can use `glob.escape(path)` before passing the argument to this function.

    Note
    ----
    If `use_threads=True`, the number of threads is obtained from os.cpu_count().

    Parameters
    ----------
    path
        S3 prefix (accepts Unix shell-style wildcards)
        (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
    dataset
        If `True`, read an ORC dataset instead of individual file(s), loading all related partitions as columns.
    version_id
        Version id of the object or mapping of object path to version id.
        (e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
    path_suffix
        Suffix or List of suffixes to be read (e.g. [".gz.orc", ".snappy.orc"]).
        If None, reads all files. (default)
    path_ignore_suffix
        Suffix or List of suffixes to be ignored.(e.g. [".csv", "_SUCCESS"]).
        If None, reads all files. (default)
    ignore_empty
        Ignore files with 0 bytes.
    ignore_null
        Ignore columns with null type.
    dtype
        Dictionary of columns names and Athena/Glue types to cast.
        Use when you have columns with undetermined data types as partitions columns.
        (e.g. {'col name': 'bigint', 'col2 name': 'int'})
    sampling
        Ratio of files metadata to inspect.
        Must be `0.0 < sampling <= 1.0`.
        The higher, the more accurate.
        The lower, the faster.
    use_threads
        True to enable concurrent requests, False to disable multiple threads.
        If enabled os.cpu_count() will be used as the max number of threads.
        If integer is provided, specified number is used.
    boto3_session
        Boto3 Session. The default boto3 session will be used if boto3_session receive None.
    s3_additional_kwargs
        Forward to S3 botocore requests.

    Returns
    -------
        columns_types: Dictionary with keys as column names and values as
        data types (e.g. {'col0': 'bigint', 'col1': 'double'}). /
        partitions_types: Dictionary with keys as partition names
        and values as data types (e.g. {'col2': 'date'}).

    Examples
    --------
    Reading all ORC files (with partitions) metadata under a prefix

    >>> import awswrangler as wr
    >>> columns_types, partitions_types = wr.s3.read_orc_metadata(path='s3://bucket/prefix/', dataset=True)

    Reading all ORC files metadata from a list

    >>> import awswrangler as wr
    >>> columns_types, partitions_types = wr.s3.read_orc_metadata(path=[
    ...     's3://bucket/filename0.orc',
    ...     's3://bucket/filename1.orc',
    ... ])

    """
    reader = _ORCTableMetadataReader()
    columns_types, partitions_types, _ = reader.read_table_metadata(
        path=path,
        version_id=version_id,
        path_suffix=path_suffix,
        path_ignore_suffix=path_ignore_suffix,
        ignore_empty=ignore_empty,
        ignore_null=ignore_null,
        dtype=dtype,
        sampling=sampling,
        dataset=dataset,
        use_threads=use_threads,
        s3_additional_kwargs=s3_additional_kwargs,
        boto3_session=boto3_session,
    )

    return _ReadTableMetadataReturnValue(columns_types, partitions_types)
