in pyiceberg/table/inspect.py [0:0]
def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa
from pyiceberg.io.pyarrow import schema_to_pyarrow
schema = self.tbl.metadata.schema()
readable_metrics_struct = []
def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa_bound_type = schema_to_pyarrow(bound_type)
return pa.struct(
[
pa.field("column_size", pa.int64(), nullable=True),
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
]
)
for field in self.tbl.metadata.schema().fields:
readable_metrics_struct.append(
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
)
partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)
entries_schema = pa.schema(
[
pa.field("status", pa.int8(), nullable=False),
pa.field("snapshot_id", pa.int64(), nullable=False),
pa.field("sequence_number", pa.int64(), nullable=False),
pa.field("file_sequence_number", pa.int64(), nullable=False),
pa.field(
"data_file",
pa.struct(
[
pa.field("content", pa.int8(), nullable=False),
pa.field("file_path", pa.string(), nullable=False),
pa.field("file_format", pa.string(), nullable=False),
pa.field("partition", pa_record_struct, nullable=False),
pa.field("record_count", pa.int64(), nullable=False),
pa.field("file_size_in_bytes", pa.int64(), nullable=False),
pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("key_metadata", pa.binary(), nullable=True),
pa.field("split_offsets", pa.list_(pa.int64()), nullable=True),
pa.field("equality_ids", pa.list_(pa.int32()), nullable=True),
pa.field("sort_order_id", pa.int32(), nullable=True),
]
),
nullable=False,
),
pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
]
)
entries = []
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=False):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
nan_value_counts = entry.data_file.nan_value_counts or {}
lower_bounds = entry.data_file.lower_bounds or {}
upper_bounds = entry.data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}
partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}
entries.append(
{
"status": entry.status.value,
"snapshot_id": entry.snapshot_id,
"sequence_number": entry.sequence_number,
"file_sequence_number": entry.file_sequence_number,
"data_file": {
"content": entry.data_file.content,
"file_path": entry.data_file.file_path,
"file_format": entry.data_file.file_format,
"partition": partition_record_dict,
"record_count": entry.data_file.record_count,
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
"column_sizes": dict(entry.data_file.column_sizes),
"value_counts": dict(entry.data_file.value_counts or {}),
"null_value_counts": dict(entry.data_file.null_value_counts or {}),
"nan_value_counts": dict(entry.data_file.nan_value_counts or {}),
"lower_bounds": entry.data_file.lower_bounds,
"upper_bounds": entry.data_file.upper_bounds,
"key_metadata": entry.data_file.key_metadata,
"split_offsets": entry.data_file.split_offsets,
"equality_ids": entry.data_file.equality_ids,
"sort_order_id": entry.data_file.sort_order_id,
"spec_id": entry.data_file.spec_id,
},
"readable_metrics": readable_metrics,
}
)
return pa.Table.from_pylist(
entries,
schema=entries_schema,
)