# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.singleton import _convert_to_hashable_type

if TYPE_CHECKING:
    import pyarrow as pa

    from pyiceberg.table import Table


class InspectTable:
    tbl: Table

    def __init__(self, tbl: Table) -> None:
        self.tbl = tbl

        try:
            import pyarrow as pa  # noqa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e

    def _get_snapshot(self, snapshot_id: Optional[int] = None) -> Snapshot:
        if snapshot_id is not None:
            if snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id):
                return snapshot
            else:
                raise ValueError(f"Cannot find snapshot with ID {snapshot_id}")

        if snapshot := self.tbl.metadata.current_snapshot():
            return snapshot
        else:
            raise ValueError("Cannot get a snapshot as the table does not have any.")

    def snapshots(self) -> "pa.Table":
        import pyarrow as pa

        snapshots_schema = pa.schema(
            [
                pa.field("committed_at", pa.timestamp(unit="ms"), nullable=False),
                pa.field("snapshot_id", pa.int64(), nullable=False),
                pa.field("parent_id", pa.int64(), nullable=True),
                pa.field("operation", pa.string(), nullable=True),
                pa.field("manifest_list", pa.string(), nullable=False),
                pa.field("summary", pa.map_(pa.string(), pa.string()), nullable=True),
            ]
        )
        snapshots = []
        for snapshot in self.tbl.metadata.snapshots:
            if summary := snapshot.summary:
                operation = summary.operation.value
                additional_properties = snapshot.summary.additional_properties
            else:
                operation = None
                additional_properties = None

            snapshots.append(
                {
                    "committed_at": datetime.fromtimestamp(snapshot.timestamp_ms / 1000.0, tz=timezone.utc),
                    "snapshot_id": snapshot.snapshot_id,
                    "parent_id": snapshot.parent_snapshot_id,
                    "operation": str(operation),
                    "manifest_list": snapshot.manifest_list,
                    "summary": additional_properties,
                }
            )

        return pa.Table.from_pylist(
            snapshots,
            schema=snapshots_schema,
        )

    def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table":
        import pyarrow as pa

        from pyiceberg.io.pyarrow import schema_to_pyarrow

        schema = self.tbl.metadata.schema()

        readable_metrics_struct = []

        def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
            pa_bound_type = schema_to_pyarrow(bound_type)
            return pa.struct(
                [
                    pa.field("column_size", pa.int64(), nullable=True),
                    pa.field("value_count", pa.int64(), nullable=True),
                    pa.field("null_value_count", pa.int64(), nullable=True),
                    pa.field("nan_value_count", pa.int64(), nullable=True),
                    pa.field("lower_bound", pa_bound_type, nullable=True),
                    pa.field("upper_bound", pa_bound_type, nullable=True),
                ]
            )

        for field in self.tbl.metadata.schema().fields:
            readable_metrics_struct.append(
                pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
            )

        partition_record = self.tbl.metadata.specs_struct()
        pa_record_struct = schema_to_pyarrow(partition_record)

        entries_schema = pa.schema(
            [
                pa.field("status", pa.int8(), nullable=False),
                pa.field("snapshot_id", pa.int64(), nullable=False),
                pa.field("sequence_number", pa.int64(), nullable=False),
                pa.field("file_sequence_number", pa.int64(), nullable=False),
                pa.field(
                    "data_file",
                    pa.struct(
                        [
                            pa.field("content", pa.int8(), nullable=False),
                            pa.field("file_path", pa.string(), nullable=False),
                            pa.field("file_format", pa.string(), nullable=False),
                            pa.field("partition", pa_record_struct, nullable=False),
                            pa.field("record_count", pa.int64(), nullable=False),
                            pa.field("file_size_in_bytes", pa.int64(), nullable=False),
                            pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True),
                            pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
                            pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
                            pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
                            pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
                            pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
                            pa.field("key_metadata", pa.binary(), nullable=True),
                            pa.field("split_offsets", pa.list_(pa.int64()), nullable=True),
                            pa.field("equality_ids", pa.list_(pa.int32()), nullable=True),
                            pa.field("sort_order_id", pa.int32(), nullable=True),
                        ]
                    ),
                    nullable=False,
                ),
                pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
            ]
        )

        entries = []
        snapshot = self._get_snapshot(snapshot_id)
        for manifest in snapshot.manifests(self.tbl.io):
            for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=False):
                column_sizes = entry.data_file.column_sizes or {}
                value_counts = entry.data_file.value_counts or {}
                null_value_counts = entry.data_file.null_value_counts or {}
                nan_value_counts = entry.data_file.nan_value_counts or {}
                lower_bounds = entry.data_file.lower_bounds or {}
                upper_bounds = entry.data_file.upper_bounds or {}
                readable_metrics = {
                    schema.find_column_name(field.field_id): {
                        "column_size": column_sizes.get(field.field_id),
                        "value_count": value_counts.get(field.field_id),
                        "null_value_count": null_value_counts.get(field.field_id),
                        "nan_value_count": nan_value_counts.get(field.field_id),
                        # Makes them readable
                        "lower_bound": from_bytes(field.field_type, lower_bound)
                        if (lower_bound := lower_bounds.get(field.field_id))
                        else None,
                        "upper_bound": from_bytes(field.field_type, upper_bound)
                        if (upper_bound := upper_bounds.get(field.field_id))
                        else None,
                    }
                    for field in self.tbl.metadata.schema().fields
                }

                partition = entry.data_file.partition
                partition_record_dict = {
                    field.name: partition[pos]
                    for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
                }

                entries.append(
                    {
                        "status": entry.status.value,
                        "snapshot_id": entry.snapshot_id,
                        "sequence_number": entry.sequence_number,
                        "file_sequence_number": entry.file_sequence_number,
                        "data_file": {
                            "content": entry.data_file.content,
                            "file_path": entry.data_file.file_path,
                            "file_format": entry.data_file.file_format,
                            "partition": partition_record_dict,
                            "record_count": entry.data_file.record_count,
                            "file_size_in_bytes": entry.data_file.file_size_in_bytes,
                            "column_sizes": dict(entry.data_file.column_sizes),
                            "value_counts": dict(entry.data_file.value_counts or {}),
                            "null_value_counts": dict(entry.data_file.null_value_counts or {}),
                            "nan_value_counts": dict(entry.data_file.nan_value_counts or {}),
                            "lower_bounds": entry.data_file.lower_bounds,
                            "upper_bounds": entry.data_file.upper_bounds,
                            "key_metadata": entry.data_file.key_metadata,
                            "split_offsets": entry.data_file.split_offsets,
                            "equality_ids": entry.data_file.equality_ids,
                            "sort_order_id": entry.data_file.sort_order_id,
                            "spec_id": entry.data_file.spec_id,
                        },
                        "readable_metrics": readable_metrics,
                    }
                )

        return pa.Table.from_pylist(
            entries,
            schema=entries_schema,
        )

    def refs(self) -> "pa.Table":
        import pyarrow as pa

        ref_schema = pa.schema(
            [
                pa.field("name", pa.string(), nullable=False),
                pa.field("type", pa.dictionary(pa.int32(), pa.string()), nullable=False),
                pa.field("snapshot_id", pa.int64(), nullable=False),
                pa.field("max_reference_age_in_ms", pa.int64(), nullable=True),
                pa.field("min_snapshots_to_keep", pa.int32(), nullable=True),
                pa.field("max_snapshot_age_in_ms", pa.int64(), nullable=True),
            ]
        )

        ref_results = []
        for ref in self.tbl.metadata.refs:
            if snapshot_ref := self.tbl.metadata.refs.get(ref):
                ref_results.append(
                    {
                        "name": ref,
                        "type": snapshot_ref.snapshot_ref_type.upper(),
                        "snapshot_id": snapshot_ref.snapshot_id,
                        "max_reference_age_in_ms": snapshot_ref.max_ref_age_ms,
                        "min_snapshots_to_keep": snapshot_ref.min_snapshots_to_keep,
                        "max_snapshot_age_in_ms": snapshot_ref.max_snapshot_age_ms,
                    }
                )

        return pa.Table.from_pylist(ref_results, schema=ref_schema)

    def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
        import pyarrow as pa

        from pyiceberg.io.pyarrow import schema_to_pyarrow

        table_schema = pa.schema(
            [
                pa.field("record_count", pa.int64(), nullable=False),
                pa.field("file_count", pa.int32(), nullable=False),
                pa.field("total_data_file_size_in_bytes", pa.int64(), nullable=False),
                pa.field("position_delete_record_count", pa.int64(), nullable=False),
                pa.field("position_delete_file_count", pa.int32(), nullable=False),
                pa.field("equality_delete_record_count", pa.int64(), nullable=False),
                pa.field("equality_delete_file_count", pa.int32(), nullable=False),
                pa.field("last_updated_at", pa.timestamp(unit="ms"), nullable=True),
                pa.field("last_updated_snapshot_id", pa.int64(), nullable=True),
            ]
        )

        partition_record = self.tbl.metadata.specs_struct()
        has_partitions = len(partition_record.fields) > 0

        if has_partitions:
            pa_record_struct = schema_to_pyarrow(partition_record)
            partitions_schema = pa.schema(
                [
                    pa.field("partition", pa_record_struct, nullable=False),
                    pa.field("spec_id", pa.int32(), nullable=False),
                ]
            )

            table_schema = pa.unify_schemas([partitions_schema, table_schema])

        def update_partitions_map(
            partitions_map: Dict[Tuple[str, Any], Any],
            file: DataFile,
            partition_record_dict: Dict[str, Any],
            snapshot: Optional[Snapshot],
        ) -> None:
            partition_record_key = _convert_to_hashable_type(partition_record_dict)
            if partition_record_key not in partitions_map:
                partitions_map[partition_record_key] = {
                    "partition": partition_record_dict,
                    "spec_id": file.spec_id,
                    "record_count": 0,
                    "file_count": 0,
                    "total_data_file_size_in_bytes": 0,
                    "position_delete_record_count": 0,
                    "position_delete_file_count": 0,
                    "equality_delete_record_count": 0,
                    "equality_delete_file_count": 0,
                    "last_updated_at": snapshot.timestamp_ms if snapshot else None,
                    "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
                }

            partition_row = partitions_map[partition_record_key]

            if snapshot is not None:
                if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
                    partition_row["last_updated_at"] = snapshot.timestamp_ms
                    partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id

            if file.content == DataFileContent.DATA:
                partition_row["record_count"] += file.record_count
                partition_row["file_count"] += 1
                partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
            elif file.content == DataFileContent.POSITION_DELETES:
                partition_row["position_delete_record_count"] += file.record_count
                partition_row["position_delete_file_count"] += 1
            elif file.content == DataFileContent.EQUALITY_DELETES:
                partition_row["equality_delete_record_count"] += file.record_count
                partition_row["equality_delete_file_count"] += 1
            else:
                raise ValueError(f"Unknown DataFileContent ({file.content})")

        partitions_map: Dict[Tuple[str, Any], Any] = {}
        snapshot = self._get_snapshot(snapshot_id)
        for manifest in snapshot.manifests(self.tbl.io):
            for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
                partition = entry.data_file.partition
                partition_record_dict = {
                    field.name: partition[pos]
                    for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
                }
                entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
                update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)

        return pa.Table.from_pylist(
            partitions_map.values(),
            schema=table_schema,
        )

    def _get_manifests_schema(self) -> "pa.Schema":
        import pyarrow as pa

        partition_summary_schema = pa.struct(
            [
                pa.field("contains_null", pa.bool_(), nullable=False),
                pa.field("contains_nan", pa.bool_(), nullable=True),
                pa.field("lower_bound", pa.string(), nullable=True),
                pa.field("upper_bound", pa.string(), nullable=True),
            ]
        )

        manifest_schema = pa.schema(
            [
                pa.field("content", pa.int8(), nullable=False),
                pa.field("path", pa.string(), nullable=False),
                pa.field("length", pa.int64(), nullable=False),
                pa.field("partition_spec_id", pa.int32(), nullable=False),
                pa.field("added_snapshot_id", pa.int64(), nullable=False),
                pa.field("added_data_files_count", pa.int32(), nullable=False),
                pa.field("existing_data_files_count", pa.int32(), nullable=False),
                pa.field("deleted_data_files_count", pa.int32(), nullable=False),
                pa.field("added_delete_files_count", pa.int32(), nullable=False),
                pa.field("existing_delete_files_count", pa.int32(), nullable=False),
                pa.field("deleted_delete_files_count", pa.int32(), nullable=False),
                pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
            ]
        )
        return manifest_schema

    def _get_all_manifests_schema(self) -> "pa.Schema":
        import pyarrow as pa

        all_manifests_schema = self._get_manifests_schema()
        all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
        return all_manifests_schema

    def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
        import pyarrow as pa

        def _partition_summaries_to_rows(
            spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
        ) -> List[Dict[str, Any]]:
            rows = []
            for i, field_summary in enumerate(partition_summaries):
                field = spec.fields[i]
                partition_field_type = spec.partition_type(self.tbl.schema()).fields[i].field_type
                lower_bound = (
                    (
                        field.transform.to_human_string(
                            partition_field_type, from_bytes(partition_field_type, field_summary.lower_bound)
                        )
                    )
                    if field_summary.lower_bound
                    else None
                )
                upper_bound = (
                    (
                        field.transform.to_human_string(
                            partition_field_type, from_bytes(partition_field_type, field_summary.upper_bound)
                        )
                    )
                    if field_summary.upper_bound
                    else None
                )
                rows.append(
                    {
                        "contains_null": field_summary.contains_null,
                        "contains_nan": field_summary.contains_nan,
                        "lower_bound": lower_bound,
                        "upper_bound": upper_bound,
                    }
                )
            return rows

        specs = self.tbl.metadata.specs()
        manifests = []
        if snapshot:
            for manifest in snapshot.manifests(self.tbl.io):
                is_data_file = manifest.content == ManifestContent.DATA
                is_delete_file = manifest.content == ManifestContent.DELETES
                manifest_row = {
                    "content": manifest.content,
                    "path": manifest.manifest_path,
                    "length": manifest.manifest_length,
                    "partition_spec_id": manifest.partition_spec_id,
                    "added_snapshot_id": manifest.added_snapshot_id,
                    "added_data_files_count": manifest.added_files_count if is_data_file else 0,
                    "existing_data_files_count": manifest.existing_files_count if is_data_file else 0,
                    "deleted_data_files_count": manifest.deleted_files_count if is_data_file else 0,
                    "added_delete_files_count": manifest.added_files_count if is_delete_file else 0,
                    "existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0,
                    "deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0,
                    "partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
                    if manifest.partitions
                    else [],
                }
                if is_all_manifests_table:
                    manifest_row["reference_snapshot_id"] = snapshot.snapshot_id
                manifests.append(manifest_row)

        return pa.Table.from_pylist(
            manifests,
            schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
        )

    def manifests(self) -> "pa.Table":
        return self._generate_manifests_table(self.tbl.current_snapshot())

    def metadata_log_entries(self) -> "pa.Table":
        import pyarrow as pa

        from pyiceberg.table.snapshots import MetadataLogEntry

        table_schema = pa.schema(
            [
                pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
                pa.field("file", pa.string(), nullable=False),
                pa.field("latest_snapshot_id", pa.int64(), nullable=True),
                pa.field("latest_schema_id", pa.int32(), nullable=True),
                pa.field("latest_sequence_number", pa.int64(), nullable=True),
            ]
        )

        def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]:
            latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
            return {
                "timestamp": metadata_entry.timestamp_ms,
                "file": metadata_entry.metadata_file,
                "latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None,
                "latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None,
                "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None,
            }

        # similar to MetadataLogEntriesTable in Java
        # https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
        metadata_log_entries = self.tbl.metadata.metadata_log + [
            MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)
        ]

        return pa.Table.from_pylist(
            [metadata_log_entry_to_row(entry) for entry in metadata_log_entries],
            schema=table_schema,
        )

    def history(self) -> "pa.Table":
        import pyarrow as pa

        history_schema = pa.schema(
            [
                pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False),
                pa.field("snapshot_id", pa.int64(), nullable=False),
                pa.field("parent_id", pa.int64(), nullable=True),
                pa.field("is_current_ancestor", pa.bool_(), nullable=False),
            ]
        )

        ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)}

        history = []
        metadata = self.tbl.metadata

        for snapshot_entry in metadata.snapshot_log:
            snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id)

            history.append(
                {
                    "made_current_at": datetime.fromtimestamp(snapshot_entry.timestamp_ms / 1000.0, tz=timezone.utc),
                    "snapshot_id": snapshot_entry.snapshot_id,
                    "parent_id": snapshot.parent_snapshot_id if snapshot else None,
                    "is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids,
                }
            )

        return pa.Table.from_pylist(history, schema=history_schema)

    def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
        import pyarrow as pa

        from pyiceberg.io.pyarrow import schema_to_pyarrow

        schema = self.tbl.metadata.schema()
        readable_metrics_struct = []

        def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
            pa_bound_type = schema_to_pyarrow(bound_type)
            return pa.struct(
                [
                    pa.field("column_size", pa.int64(), nullable=True),
                    pa.field("value_count", pa.int64(), nullable=True),
                    pa.field("null_value_count", pa.int64(), nullable=True),
                    pa.field("nan_value_count", pa.int64(), nullable=True),
                    pa.field("lower_bound", pa_bound_type, nullable=True),
                    pa.field("upper_bound", pa_bound_type, nullable=True),
                ]
            )

        for field in self.tbl.metadata.schema().fields:
            readable_metrics_struct.append(
                pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
            )

        files_schema = pa.schema(
            [
                pa.field("content", pa.int8(), nullable=False),
                pa.field("file_path", pa.string(), nullable=False),
                pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False),
                pa.field("spec_id", pa.int32(), nullable=False),
                pa.field("record_count", pa.int64(), nullable=False),
                pa.field("file_size_in_bytes", pa.int64(), nullable=False),
                pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True),
                pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
                pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
                pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
                pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
                pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
                pa.field("key_metadata", pa.binary(), nullable=True),
                pa.field("split_offsets", pa.list_(pa.int64()), nullable=True),
                pa.field("equality_ids", pa.list_(pa.int32()), nullable=True),
                pa.field("sort_order_id", pa.int32(), nullable=True),
                pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
            ]
        )

        files: list[dict[str, Any]] = []

        if not snapshot_id and not self.tbl.metadata.current_snapshot():
            return pa.Table.from_pylist(
                files,
                schema=files_schema,
            )
        snapshot = self._get_snapshot(snapshot_id)

        io = self.tbl.io
        for manifest_list in snapshot.manifests(io):
            for manifest_entry in manifest_list.fetch_manifest_entry(io):
                data_file = manifest_entry.data_file
                if data_file_filter and data_file.content not in data_file_filter:
                    continue
                column_sizes = data_file.column_sizes or {}
                value_counts = data_file.value_counts or {}
                null_value_counts = data_file.null_value_counts or {}
                nan_value_counts = data_file.nan_value_counts or {}
                lower_bounds = data_file.lower_bounds or {}
                upper_bounds = data_file.upper_bounds or {}
                readable_metrics = {
                    schema.find_column_name(field.field_id): {
                        "column_size": column_sizes.get(field.field_id),
                        "value_count": value_counts.get(field.field_id),
                        "null_value_count": null_value_counts.get(field.field_id),
                        "nan_value_count": nan_value_counts.get(field.field_id),
                        "lower_bound": from_bytes(field.field_type, lower_bound)
                        if (lower_bound := lower_bounds.get(field.field_id))
                        else None,
                        "upper_bound": from_bytes(field.field_type, upper_bound)
                        if (upper_bound := upper_bounds.get(field.field_id))
                        else None,
                    }
                    for field in self.tbl.metadata.schema().fields
                }
                files.append(
                    {
                        "content": data_file.content,
                        "file_path": data_file.file_path,
                        "file_format": data_file.file_format,
                        "spec_id": data_file.spec_id,
                        "record_count": data_file.record_count,
                        "file_size_in_bytes": data_file.file_size_in_bytes,
                        "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
                        "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
                        "null_value_counts": dict(data_file.null_value_counts)
                        if data_file.null_value_counts is not None
                        else None,
                        "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
                        "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
                        "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
                        "key_metadata": data_file.key_metadata,
                        "split_offsets": data_file.split_offsets,
                        "equality_ids": data_file.equality_ids,
                        "sort_order_id": data_file.sort_order_id,
                        "readable_metrics": readable_metrics,
                    }
                )

        return pa.Table.from_pylist(
            files,
            schema=files_schema,
        )

    def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
        return self._files(snapshot_id)

    def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
        return self._files(snapshot_id, {DataFileContent.DATA})

    def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
        return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

    def all_manifests(self) -> "pa.Table":
        import pyarrow as pa

        snapshots = self.tbl.snapshots()
        if not snapshots:
            return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())

        executor = ExecutorFactory.get_or_create()
        manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
            lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
        )
        return pa.concat_tables(manifests_by_snapshots)
