pyiceberg/table/inspect.py (559 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.singleton import _convert_to_hashable_type if TYPE_CHECKING: import pyarrow as pa from pyiceberg.table import Table class InspectTable: tbl: Table def __init__(self, tbl: Table) -> None: self.tbl = tbl try: import pyarrow as pa # noqa except ModuleNotFoundError as e: raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e def _get_snapshot(self, snapshot_id: Optional[int] = None) -> Snapshot: if snapshot_id is not None: if snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id): return snapshot else: raise ValueError(f"Cannot find snapshot with ID {snapshot_id}") if snapshot := self.tbl.metadata.current_snapshot(): return snapshot else: raise ValueError("Cannot get a snapshot as the table does not have any.") def snapshots(self) -> "pa.Table": import pyarrow as pa snapshots_schema = pa.schema( [ pa.field("committed_at", pa.timestamp(unit="ms"), nullable=False), pa.field("snapshot_id", pa.int64(), nullable=False), pa.field("parent_id", pa.int64(), nullable=True), pa.field("operation", pa.string(), nullable=True), pa.field("manifest_list", pa.string(), nullable=False), pa.field("summary", pa.map_(pa.string(), pa.string()), nullable=True), ] ) snapshots = [] for snapshot in self.tbl.metadata.snapshots: if summary := snapshot.summary: operation = summary.operation.value additional_properties = snapshot.summary.additional_properties else: operation = None additional_properties = None snapshots.append( { "committed_at": datetime.fromtimestamp(snapshot.timestamp_ms / 1000.0, tz=timezone.utc), "snapshot_id": snapshot.snapshot_id, "parent_id": snapshot.parent_snapshot_id, "operation": str(operation), "manifest_list": snapshot.manifest_list, "summary": additional_properties, } ) return pa.Table.from_pylist( snapshots, schema=snapshots_schema, ) def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow schema = self.tbl.metadata.schema() readable_metrics_struct = [] def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa_bound_type = schema_to_pyarrow(bound_type) return pa.struct( [ pa.field("column_size", pa.int64(), nullable=True), pa.field("value_count", pa.int64(), nullable=True), pa.field("null_value_count", pa.int64(), nullable=True), pa.field("nan_value_count", pa.int64(), nullable=True), pa.field("lower_bound", pa_bound_type, nullable=True), pa.field("upper_bound", pa_bound_type, nullable=True), ] ) for field in self.tbl.metadata.schema().fields: readable_metrics_struct.append( pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) ) partition_record = self.tbl.metadata.specs_struct() pa_record_struct = schema_to_pyarrow(partition_record) entries_schema = pa.schema( [ pa.field("status", pa.int8(), nullable=False), pa.field("snapshot_id", pa.int64(), nullable=False), pa.field("sequence_number", pa.int64(), nullable=False), pa.field("file_sequence_number", pa.int64(), nullable=False), pa.field( "data_file", pa.struct( [ pa.field("content", pa.int8(), nullable=False), pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.string(), nullable=False), pa.field("partition", pa_record_struct, nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), pa.field("key_metadata", pa.binary(), nullable=True), pa.field("split_offsets", pa.list_(pa.int64()), nullable=True), pa.field("equality_ids", pa.list_(pa.int32()), nullable=True), pa.field("sort_order_id", pa.int32(), nullable=True), ] ), nullable=False, ), pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) entries = [] snapshot = self._get_snapshot(snapshot_id) for manifest in snapshot.manifests(self.tbl.io): for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=False): column_sizes = entry.data_file.column_sizes or {} value_counts = entry.data_file.value_counts or {} null_value_counts = entry.data_file.null_value_counts or {} nan_value_counts = entry.data_file.nan_value_counts or {} lower_bounds = entry.data_file.lower_bounds or {} upper_bounds = entry.data_file.upper_bounds or {} readable_metrics = { schema.find_column_name(field.field_id): { "column_size": column_sizes.get(field.field_id), "value_count": value_counts.get(field.field_id), "null_value_count": null_value_counts.get(field.field_id), "nan_value_count": nan_value_counts.get(field.field_id), # Makes them readable "lower_bound": from_bytes(field.field_type, lower_bound) if (lower_bound := lower_bounds.get(field.field_id)) else None, "upper_bound": from_bytes(field.field_type, upper_bound) if (upper_bound := upper_bounds.get(field.field_id)) else None, } for field in self.tbl.metadata.schema().fields } partition = entry.data_file.partition partition_record_dict = { field.name: partition[pos] for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) } entries.append( { "status": entry.status.value, "snapshot_id": entry.snapshot_id, "sequence_number": entry.sequence_number, "file_sequence_number": entry.file_sequence_number, "data_file": { "content": entry.data_file.content, "file_path": entry.data_file.file_path, "file_format": entry.data_file.file_format, "partition": partition_record_dict, "record_count": entry.data_file.record_count, "file_size_in_bytes": entry.data_file.file_size_in_bytes, "column_sizes": dict(entry.data_file.column_sizes), "value_counts": dict(entry.data_file.value_counts or {}), "null_value_counts": dict(entry.data_file.null_value_counts or {}), "nan_value_counts": dict(entry.data_file.nan_value_counts or {}), "lower_bounds": entry.data_file.lower_bounds, "upper_bounds": entry.data_file.upper_bounds, "key_metadata": entry.data_file.key_metadata, "split_offsets": entry.data_file.split_offsets, "equality_ids": entry.data_file.equality_ids, "sort_order_id": entry.data_file.sort_order_id, "spec_id": entry.data_file.spec_id, }, "readable_metrics": readable_metrics, } ) return pa.Table.from_pylist( entries, schema=entries_schema, ) def refs(self) -> "pa.Table": import pyarrow as pa ref_schema = pa.schema( [ pa.field("name", pa.string(), nullable=False), pa.field("type", pa.dictionary(pa.int32(), pa.string()), nullable=False), pa.field("snapshot_id", pa.int64(), nullable=False), pa.field("max_reference_age_in_ms", pa.int64(), nullable=True), pa.field("min_snapshots_to_keep", pa.int32(), nullable=True), pa.field("max_snapshot_age_in_ms", pa.int64(), nullable=True), ] ) ref_results = [] for ref in self.tbl.metadata.refs: if snapshot_ref := self.tbl.metadata.refs.get(ref): ref_results.append( { "name": ref, "type": snapshot_ref.snapshot_ref_type.upper(), "snapshot_id": snapshot_ref.snapshot_id, "max_reference_age_in_ms": snapshot_ref.max_ref_age_ms, "min_snapshots_to_keep": snapshot_ref.min_snapshots_to_keep, "max_snapshot_age_in_ms": snapshot_ref.max_snapshot_age_ms, } ) return pa.Table.from_pylist(ref_results, schema=ref_schema) def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow table_schema = pa.schema( [ pa.field("record_count", pa.int64(), nullable=False), pa.field("file_count", pa.int32(), nullable=False), pa.field("total_data_file_size_in_bytes", pa.int64(), nullable=False), pa.field("position_delete_record_count", pa.int64(), nullable=False), pa.field("position_delete_file_count", pa.int32(), nullable=False), pa.field("equality_delete_record_count", pa.int64(), nullable=False), pa.field("equality_delete_file_count", pa.int32(), nullable=False), pa.field("last_updated_at", pa.timestamp(unit="ms"), nullable=True), pa.field("last_updated_snapshot_id", pa.int64(), nullable=True), ] ) partition_record = self.tbl.metadata.specs_struct() has_partitions = len(partition_record.fields) > 0 if has_partitions: pa_record_struct = schema_to_pyarrow(partition_record) partitions_schema = pa.schema( [ pa.field("partition", pa_record_struct, nullable=False), pa.field("spec_id", pa.int32(), nullable=False), ] ) table_schema = pa.unify_schemas([partitions_schema, table_schema]) def update_partitions_map( partitions_map: Dict[Tuple[str, Any], Any], file: DataFile, partition_record_dict: Dict[str, Any], snapshot: Optional[Snapshot], ) -> None: partition_record_key = _convert_to_hashable_type(partition_record_dict) if partition_record_key not in partitions_map: partitions_map[partition_record_key] = { "partition": partition_record_dict, "spec_id": file.spec_id, "record_count": 0, "file_count": 0, "total_data_file_size_in_bytes": 0, "position_delete_record_count": 0, "position_delete_file_count": 0, "equality_delete_record_count": 0, "equality_delete_file_count": 0, "last_updated_at": snapshot.timestamp_ms if snapshot else None, "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None, } partition_row = partitions_map[partition_record_key] if snapshot is not None: if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms: partition_row["last_updated_at"] = snapshot.timestamp_ms partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id if file.content == DataFileContent.DATA: partition_row["record_count"] += file.record_count partition_row["file_count"] += 1 partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes elif file.content == DataFileContent.POSITION_DELETES: partition_row["position_delete_record_count"] += file.record_count partition_row["position_delete_file_count"] += 1 elif file.content == DataFileContent.EQUALITY_DELETES: partition_row["equality_delete_record_count"] += file.record_count partition_row["equality_delete_file_count"] += 1 else: raise ValueError(f"Unknown DataFileContent ({file.content})") partitions_map: Dict[Tuple[str, Any], Any] = {} snapshot = self._get_snapshot(snapshot_id) for manifest in snapshot.manifests(self.tbl.io): for entry in manifest.fetch_manifest_entry(io=self.tbl.io): partition = entry.data_file.partition partition_record_dict = { field.name: partition[pos] for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) } entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) return pa.Table.from_pylist( partitions_map.values(), schema=table_schema, ) def _get_manifests_schema(self) -> "pa.Schema": import pyarrow as pa partition_summary_schema = pa.struct( [ pa.field("contains_null", pa.bool_(), nullable=False), pa.field("contains_nan", pa.bool_(), nullable=True), pa.field("lower_bound", pa.string(), nullable=True), pa.field("upper_bound", pa.string(), nullable=True), ] ) manifest_schema = pa.schema( [ pa.field("content", pa.int8(), nullable=False), pa.field("path", pa.string(), nullable=False), pa.field("length", pa.int64(), nullable=False), pa.field("partition_spec_id", pa.int32(), nullable=False), pa.field("added_snapshot_id", pa.int64(), nullable=False), pa.field("added_data_files_count", pa.int32(), nullable=False), pa.field("existing_data_files_count", pa.int32(), nullable=False), pa.field("deleted_data_files_count", pa.int32(), nullable=False), pa.field("added_delete_files_count", pa.int32(), nullable=False), pa.field("existing_delete_files_count", pa.int32(), nullable=False), pa.field("deleted_delete_files_count", pa.int32(), nullable=False), pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False), ] ) return manifest_schema def _get_all_manifests_schema(self) -> "pa.Schema": import pyarrow as pa all_manifests_schema = self._get_manifests_schema() all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) return all_manifests_schema def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table": import pyarrow as pa def _partition_summaries_to_rows( spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary] ) -> List[Dict[str, Any]]: rows = [] for i, field_summary in enumerate(partition_summaries): field = spec.fields[i] partition_field_type = spec.partition_type(self.tbl.schema()).fields[i].field_type lower_bound = ( ( field.transform.to_human_string( partition_field_type, from_bytes(partition_field_type, field_summary.lower_bound) ) ) if field_summary.lower_bound else None ) upper_bound = ( ( field.transform.to_human_string( partition_field_type, from_bytes(partition_field_type, field_summary.upper_bound) ) ) if field_summary.upper_bound else None ) rows.append( { "contains_null": field_summary.contains_null, "contains_nan": field_summary.contains_nan, "lower_bound": lower_bound, "upper_bound": upper_bound, } ) return rows specs = self.tbl.metadata.specs() manifests = [] if snapshot: for manifest in snapshot.manifests(self.tbl.io): is_data_file = manifest.content == ManifestContent.DATA is_delete_file = manifest.content == ManifestContent.DELETES manifest_row = { "content": manifest.content, "path": manifest.manifest_path, "length": manifest.manifest_length, "partition_spec_id": manifest.partition_spec_id, "added_snapshot_id": manifest.added_snapshot_id, "added_data_files_count": manifest.added_files_count if is_data_file else 0, "existing_data_files_count": manifest.existing_files_count if is_data_file else 0, "deleted_data_files_count": manifest.deleted_files_count if is_data_file else 0, "added_delete_files_count": manifest.added_files_count if is_delete_file else 0, "existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0, "deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0, "partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions) if manifest.partitions else [], } if is_all_manifests_table: manifest_row["reference_snapshot_id"] = snapshot.snapshot_id manifests.append(manifest_row) return pa.Table.from_pylist( manifests, schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(), ) def manifests(self) -> "pa.Table": return self._generate_manifests_table(self.tbl.current_snapshot()) def metadata_log_entries(self) -> "pa.Table": import pyarrow as pa from pyiceberg.table.snapshots import MetadataLogEntry table_schema = pa.schema( [ pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False), pa.field("file", pa.string(), nullable=False), pa.field("latest_snapshot_id", pa.int64(), nullable=True), pa.field("latest_schema_id", pa.int32(), nullable=True), pa.field("latest_sequence_number", pa.int64(), nullable=True), ] ) def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]: latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms) return { "timestamp": metadata_entry.timestamp_ms, "file": metadata_entry.metadata_file, "latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None, "latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None, "latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None, } # similar to MetadataLogEntriesTable in Java # https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66 metadata_log_entries = self.tbl.metadata.metadata_log + [ MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms) ] return pa.Table.from_pylist( [metadata_log_entry_to_row(entry) for entry in metadata_log_entries], schema=table_schema, ) def history(self) -> "pa.Table": import pyarrow as pa history_schema = pa.schema( [ pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False), pa.field("snapshot_id", pa.int64(), nullable=False), pa.field("parent_id", pa.int64(), nullable=True), pa.field("is_current_ancestor", pa.bool_(), nullable=False), ] ) ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)} history = [] metadata = self.tbl.metadata for snapshot_entry in metadata.snapshot_log: snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id) history.append( { "made_current_at": datetime.fromtimestamp(snapshot_entry.timestamp_ms / 1000.0, tz=timezone.utc), "snapshot_id": snapshot_entry.snapshot_id, "parent_id": snapshot.parent_snapshot_id if snapshot else None, "is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids, } ) return pa.Table.from_pylist(history, schema=history_schema) def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow schema = self.tbl.metadata.schema() readable_metrics_struct = [] def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa_bound_type = schema_to_pyarrow(bound_type) return pa.struct( [ pa.field("column_size", pa.int64(), nullable=True), pa.field("value_count", pa.int64(), nullable=True), pa.field("null_value_count", pa.int64(), nullable=True), pa.field("nan_value_count", pa.int64(), nullable=True), pa.field("lower_bound", pa_bound_type, nullable=True), pa.field("upper_bound", pa_bound_type, nullable=True), ] ) for field in self.tbl.metadata.schema().fields: readable_metrics_struct.append( pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) ) files_schema = pa.schema( [ pa.field("content", pa.int8(), nullable=False), pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False), pa.field("spec_id", pa.int32(), nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), pa.field("key_metadata", pa.binary(), nullable=True), pa.field("split_offsets", pa.list_(pa.int64()), nullable=True), pa.field("equality_ids", pa.list_(pa.int32()), nullable=True), pa.field("sort_order_id", pa.int32(), nullable=True), pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) files: list[dict[str, Any]] = [] if not snapshot_id and not self.tbl.metadata.current_snapshot(): return pa.Table.from_pylist( files, schema=files_schema, ) snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io for manifest_list in snapshot.manifests(io): for manifest_entry in manifest_list.fetch_manifest_entry(io): data_file = manifest_entry.data_file if data_file_filter and data_file.content not in data_file_filter: continue column_sizes = data_file.column_sizes or {} value_counts = data_file.value_counts or {} null_value_counts = data_file.null_value_counts or {} nan_value_counts = data_file.nan_value_counts or {} lower_bounds = data_file.lower_bounds or {} upper_bounds = data_file.upper_bounds or {} readable_metrics = { schema.find_column_name(field.field_id): { "column_size": column_sizes.get(field.field_id), "value_count": value_counts.get(field.field_id), "null_value_count": null_value_counts.get(field.field_id), "nan_value_count": nan_value_counts.get(field.field_id), "lower_bound": from_bytes(field.field_type, lower_bound) if (lower_bound := lower_bounds.get(field.field_id)) else None, "upper_bound": from_bytes(field.field_type, upper_bound) if (upper_bound := upper_bounds.get(field.field_id)) else None, } for field in self.tbl.metadata.schema().fields } files.append( { "content": data_file.content, "file_path": data_file.file_path, "file_format": data_file.file_format, "spec_id": data_file.spec_id, "record_count": data_file.record_count, "file_size_in_bytes": data_file.file_size_in_bytes, "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None, "null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None, "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None, "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None, "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None, "key_metadata": data_file.key_metadata, "split_offsets": data_file.split_offsets, "equality_ids": data_file.equality_ids, "sort_order_id": data_file.sort_order_id, "readable_metrics": readable_metrics, } ) return pa.Table.from_pylist( files, schema=files_schema, ) def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id) def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.DATA}) def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) def all_manifests(self) -> "pa.Table": import pyarrow as pa snapshots = self.tbl.snapshots() if not snapshots: return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) executor = ExecutorFactory.get_or_create() manifests_by_snapshots: Iterator["pa.Table"] = executor.map( lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots)