def partitions()

in pyiceberg/table/inspect.py [0:0]


    def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
        import pyarrow as pa

        from pyiceberg.io.pyarrow import schema_to_pyarrow

        table_schema = pa.schema(
            [
                pa.field("record_count", pa.int64(), nullable=False),
                pa.field("file_count", pa.int32(), nullable=False),
                pa.field("total_data_file_size_in_bytes", pa.int64(), nullable=False),
                pa.field("position_delete_record_count", pa.int64(), nullable=False),
                pa.field("position_delete_file_count", pa.int32(), nullable=False),
                pa.field("equality_delete_record_count", pa.int64(), nullable=False),
                pa.field("equality_delete_file_count", pa.int32(), nullable=False),
                pa.field("last_updated_at", pa.timestamp(unit="ms"), nullable=True),
                pa.field("last_updated_snapshot_id", pa.int64(), nullable=True),
            ]
        )

        partition_record = self.tbl.metadata.specs_struct()
        has_partitions = len(partition_record.fields) > 0

        if has_partitions:
            pa_record_struct = schema_to_pyarrow(partition_record)
            partitions_schema = pa.schema(
                [
                    pa.field("partition", pa_record_struct, nullable=False),
                    pa.field("spec_id", pa.int32(), nullable=False),
                ]
            )

            table_schema = pa.unify_schemas([partitions_schema, table_schema])

        def update_partitions_map(
            partitions_map: Dict[Tuple[str, Any], Any],
            file: DataFile,
            partition_record_dict: Dict[str, Any],
            snapshot: Optional[Snapshot],
        ) -> None:
            partition_record_key = _convert_to_hashable_type(partition_record_dict)
            if partition_record_key not in partitions_map:
                partitions_map[partition_record_key] = {
                    "partition": partition_record_dict,
                    "spec_id": file.spec_id,
                    "record_count": 0,
                    "file_count": 0,
                    "total_data_file_size_in_bytes": 0,
                    "position_delete_record_count": 0,
                    "position_delete_file_count": 0,
                    "equality_delete_record_count": 0,
                    "equality_delete_file_count": 0,
                    "last_updated_at": snapshot.timestamp_ms if snapshot else None,
                    "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
                }

            partition_row = partitions_map[partition_record_key]

            if snapshot is not None:
                if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
                    partition_row["last_updated_at"] = snapshot.timestamp_ms
                    partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id

            if file.content == DataFileContent.DATA:
                partition_row["record_count"] += file.record_count
                partition_row["file_count"] += 1
                partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
            elif file.content == DataFileContent.POSITION_DELETES:
                partition_row["position_delete_record_count"] += file.record_count
                partition_row["position_delete_file_count"] += 1
            elif file.content == DataFileContent.EQUALITY_DELETES:
                partition_row["equality_delete_record_count"] += file.record_count
                partition_row["equality_delete_file_count"] += 1
            else:
                raise ValueError(f"Unknown DataFileContent ({file.content})")

        partitions_map: Dict[Tuple[str, Any], Any] = {}
        snapshot = self._get_snapshot(snapshot_id)
        for manifest in snapshot.manifests(self.tbl.io):
            for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
                partition = entry.data_file.partition
                partition_record_dict = {
                    field.name: partition[pos]
                    for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
                }
                entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
                update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)

        return pa.Table.from_pylist(
            partitions_map.values(),
            schema=table_schema,
        )