in pyiceberg/table/inspect.py [0:0]
def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa
from pyiceberg.io.pyarrow import schema_to_pyarrow
table_schema = pa.schema(
[
pa.field("record_count", pa.int64(), nullable=False),
pa.field("file_count", pa.int32(), nullable=False),
pa.field("total_data_file_size_in_bytes", pa.int64(), nullable=False),
pa.field("position_delete_record_count", pa.int64(), nullable=False),
pa.field("position_delete_file_count", pa.int32(), nullable=False),
pa.field("equality_delete_record_count", pa.int64(), nullable=False),
pa.field("equality_delete_file_count", pa.int32(), nullable=False),
pa.field("last_updated_at", pa.timestamp(unit="ms"), nullable=True),
pa.field("last_updated_snapshot_id", pa.int64(), nullable=True),
]
)
partition_record = self.tbl.metadata.specs_struct()
has_partitions = len(partition_record.fields) > 0
if has_partitions:
pa_record_struct = schema_to_pyarrow(partition_record)
partitions_schema = pa.schema(
[
pa.field("partition", pa_record_struct, nullable=False),
pa.field("spec_id", pa.int32(), nullable=False),
]
)
table_schema = pa.unify_schemas([partitions_schema, table_schema])
def update_partitions_map(
partitions_map: Dict[Tuple[str, Any], Any],
file: DataFile,
partition_record_dict: Dict[str, Any],
snapshot: Optional[Snapshot],
) -> None:
partition_record_key = _convert_to_hashable_type(partition_record_dict)
if partition_record_key not in partitions_map:
partitions_map[partition_record_key] = {
"partition": partition_record_dict,
"spec_id": file.spec_id,
"record_count": 0,
"file_count": 0,
"total_data_file_size_in_bytes": 0,
"position_delete_record_count": 0,
"position_delete_file_count": 0,
"equality_delete_record_count": 0,
"equality_delete_file_count": 0,
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
}
partition_row = partitions_map[partition_record_key]
if snapshot is not None:
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
partition_row["last_updated_at"] = snapshot.timestamp_ms
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
if file.content == DataFileContent.DATA:
partition_row["record_count"] += file.record_count
partition_row["file_count"] += 1
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
elif file.content == DataFileContent.POSITION_DELETES:
partition_row["position_delete_record_count"] += file.record_count
partition_row["position_delete_file_count"] += 1
elif file.content == DataFileContent.EQUALITY_DELETES:
partition_row["equality_delete_record_count"] += file.record_count
partition_row["equality_delete_file_count"] += 1
else:
raise ValueError(f"Unknown DataFileContent ({file.content})")
partitions_map: Dict[Tuple[str, Any], Any] = {}
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
return pa.Table.from_pylist(
partitions_map.values(),
schema=table_schema,
)