pyiceberg/cli/output.py (174 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from abc import ABC, abstractmethod
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from uuid import UUID
from rich.console import Console
from rich.table import Table as RichTable
from rich.tree import Tree
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.refs import SnapshotRefType
from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties
class Output(ABC):
"""Output interface for exporting."""
@abstractmethod
def exception(self, ex: Exception) -> None: ...
@abstractmethod
def identifiers(self, identifiers: List[Identifier]) -> None: ...
@abstractmethod
def describe_table(self, table: Table) -> None: ...
@abstractmethod
def files(self, table: Table, history: bool) -> None: ...
@abstractmethod
def describe_properties(self, properties: Properties) -> None: ...
@abstractmethod
def text(self, response: str) -> None: ...
@abstractmethod
def schema(self, schema: Schema) -> None: ...
@abstractmethod
def spec(self, spec: PartitionSpec) -> None: ...
@abstractmethod
def uuid(self, uuid: Optional[UUID]) -> None: ...
@abstractmethod
def version(self, version: str) -> None: ...
@abstractmethod
def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: ...
class ConsoleOutput(Output):
"""Writes to the console."""
verbose: bool
def __init__(self, **properties: Any) -> None:
self.verbose = properties.get("verbose", False)
@property
def _table(self) -> RichTable:
return RichTable.grid(padding=(0, 2))
def exception(self, ex: Exception) -> None:
if self.verbose:
Console(stderr=True).print_exception()
else:
Console(stderr=True).print(ex)
def identifiers(self, identifiers: List[Identifier]) -> None:
table = self._table
for identifier in identifiers:
table.add_row(".".join(identifier))
Console().print(table)
def describe_table(self, table: Table) -> None:
metadata = table.metadata
table_properties = self._table
for key, value in metadata.properties.items():
table_properties.add_row(key, value)
schema_tree = Tree(f"Schema, id={table.metadata.current_schema_id}")
for field in table.schema().fields:
schema_tree.add(str(field))
snapshot_tree = Tree("Snapshots")
for snapshot in metadata.snapshots:
snapshot_tree.add(f"Snapshot {snapshot.snapshot_id}, schema {snapshot.schema_id}: {snapshot.manifest_list}")
output_table = self._table
output_table.add_row("Table format version", str(metadata.format_version))
output_table.add_row("Metadata location", table.metadata_location)
output_table.add_row("Table UUID", str(table.metadata.table_uuid))
output_table.add_row("Last Updated", str(metadata.last_updated_ms))
output_table.add_row("Partition spec", str(table.spec()))
output_table.add_row("Sort order", str(table.sort_order()))
output_table.add_row("Current schema", schema_tree)
output_table.add_row("Current snapshot", str(table.current_snapshot()))
output_table.add_row("Snapshots", snapshot_tree)
output_table.add_row("Properties", table_properties)
Console().print(output_table)
def files(self, table: Table, history: bool) -> None:
if history:
snapshots = table.metadata.snapshots
else:
if snapshot := table.current_snapshot():
snapshots = [snapshot]
else:
snapshots = []
snapshot_tree = Tree(f"Snapshots: {'.'.join(table.name())}")
io = table.io
for snapshot in snapshots:
list_tree = snapshot_tree.add(
f"Snapshot {snapshot.snapshot_id}, schema {snapshot.schema_id}: {snapshot.manifest_list}"
)
manifest_list = snapshot.manifests(io)
for manifest in manifest_list:
manifest_tree = list_tree.add(f"Manifest: {manifest.manifest_path}")
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=False):
manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}")
Console().print(snapshot_tree)
def describe_properties(self, properties: Properties) -> None:
output_table = self._table
for k, v in properties.items():
output_table.add_row(k, v)
Console().print(output_table)
def text(self, response: str) -> None:
Console(soft_wrap=True).print(response)
def schema(self, schema: Schema) -> None:
output_table = self._table
for field in schema.fields:
output_table.add_row(field.name, str(field.field_type), field.doc or "")
Console().print(output_table)
def spec(self, spec: PartitionSpec) -> None:
Console().print(str(spec))
def uuid(self, uuid: Optional[UUID]) -> None:
Console().print(str(uuid) if uuid else "missing")
def version(self, version: str) -> None:
Console().print(version)
def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
refs_table = RichTable(title="Snapshot Refs")
refs_table.add_column("Ref")
refs_table.add_column("Type")
refs_table.add_column("Max ref age ms")
refs_table.add_column("Min snapshots to keep")
refs_table.add_column("Max snapshot age ms")
for name, type, ref_detail in ref_details:
refs_table.add_row(
name, type, ref_detail["max_ref_age_ms"], ref_detail["min_snapshots_to_keep"], ref_detail["max_snapshot_age_ms"]
)
Console().print(refs_table)
class JsonOutput(Output):
"""Writes json to stdout."""
verbose: bool
def __init__(self, **properties: Any) -> None:
self.verbose = properties.get("verbose", False)
def _out(self, d: Any) -> None:
print(json.dumps(d))
def exception(self, ex: Exception) -> None:
self._out({"type": ex.__class__.__name__, "message": str(ex)})
def identifiers(self, identifiers: List[Identifier]) -> None:
self._out([".".join(identifier) for identifier in identifiers])
def describe_table(self, table: Table) -> None:
class FauxTable(IcebergBaseModel):
"""Just to encode it using Pydantic."""
identifier: Identifier
metadata_location: str
metadata: TableMetadata
print(
FauxTable(
identifier=table.name(), metadata=table.metadata, metadata_location=table.metadata_location
).model_dump_json()
)
def describe_properties(self, properties: Properties) -> None:
self._out(properties)
def text(self, response: str) -> None:
print(json.dumps(response))
def schema(self, schema: Schema) -> None:
print(schema.model_dump_json())
def files(self, table: Table, history: bool) -> None:
pass
def spec(self, spec: PartitionSpec) -> None:
print(spec.model_dump_json())
def uuid(self, uuid: Optional[UUID]) -> None:
self._out({"uuid": str(uuid) if uuid else "missing"})
def version(self, version: str) -> None:
self._out({"version": version})
def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
self._out(
[
{"name": name, "type": type, detail_key: detail_val}
for name, type, detail in refs
for detail_key, detail_val in detail.items()
]
)