pyiceberg/cli/console.py (328 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=broad-except,redefined-builtin,redefined-outer-name
from functools import wraps
from typing import (
Any,
Callable,
Dict,
Literal,
Optional,
Tuple,
)
import click
from click import Context
from pyiceberg import __version__
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table import TableProperties
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.utils.properties import property_as_int
def catch_exception() -> Callable: # type: ignore
def decorator(func: Callable) -> Callable: # type: ignore
@wraps(func)
def wrapper(*args: Any, **kwargs: Any): # type: ignore
try:
return func(*args, **kwargs)
except Exception as e:
ctx: Context = click.get_current_context(silent=True)
_, output = _catalog_and_output(ctx)
output.exception(e)
ctx.exit(1)
return wrapper
return decorator
@click.group()
@click.option("--catalog")
@click.option("--verbose", type=click.BOOL)
@click.option("--output", type=click.Choice(["text", "json"]), default="text")
@click.option("--ugi")
@click.option("--uri")
@click.option("--credential")
@click.pass_context
def run(
ctx: Context,
catalog: Optional[str],
verbose: bool,
output: str,
ugi: Optional[str],
uri: Optional[str],
credential: Optional[str],
) -> None:
properties = {}
if ugi:
properties["ugi"] = ugi
if uri:
properties["uri"] = uri
if credential:
properties["credential"] = credential
ctx.ensure_object(dict)
if output == "text":
ctx.obj["output"] = ConsoleOutput(verbose=verbose)
else:
ctx.obj["output"] = JsonOutput(verbose=verbose)
try:
ctx.obj["catalog"] = load_catalog(catalog, **properties)
except Exception as e:
ctx.obj["output"].exception(e)
ctx.exit(1)
if not isinstance(ctx.obj["catalog"], Catalog):
ctx.obj["output"].exception(
ValueError("Could not determine catalog type from uri. REST (http/https) and Hive (thrift) is supported")
)
ctx.exit(1)
def _catalog_and_output(ctx: Context) -> Tuple[Catalog, Output]:
"""Small helper to set the types."""
return ctx.obj["catalog"], ctx.obj["output"]
@run.command()
@click.pass_context
@click.argument("parent", required=False)
@catch_exception()
def list(ctx: Context, parent: Optional[str]) -> None: # pylint: disable=redefined-builtin
"""List tables or namespaces."""
catalog, output = _catalog_and_output(ctx)
identifiers = []
if parent:
# Do we have tables under parent namespace?
identifiers = catalog.list_tables(parent)
if not identifiers:
# List hierarchical namespaces if parent, root namespaces otherwise.
identifiers = catalog.list_namespaces(parent or ())
output.identifiers(identifiers)
@run.command()
@click.option("--entity", type=click.Choice(["any", "namespace", "table"]), default="any")
@click.argument("identifier")
@click.pass_context
@catch_exception()
def describe(ctx: Context, entity: Literal["name", "namespace", "table"], identifier: str) -> None:
"""Describe a namespace or a table."""
catalog, output = _catalog_and_output(ctx)
identifier_tuple = Catalog.identifier_to_tuple(identifier)
is_namespace = False
if entity in {"namespace", "any"} and len(identifier_tuple) > 0:
try:
namespace_properties = catalog.load_namespace_properties(identifier_tuple)
output.describe_properties(namespace_properties)
is_namespace = True
except NoSuchNamespaceError as exc:
if entity != "any" or len(identifier_tuple) == 1: # type: ignore
raise exc
is_table = False
if entity in {"table", "any"} and len(identifier_tuple) > 1:
try:
catalog_table = catalog.load_table(identifier)
output.describe_table(catalog_table)
is_table = True
except NoSuchTableError as exc:
if entity != "any":
raise exc
if is_namespace is False and is_table is False:
raise NoSuchTableError(f"Table or namespace does not exist: {identifier}")
@run.command()
@click.argument("identifier")
@click.option("--history", is_flag=True)
@click.pass_context
@catch_exception()
def files(ctx: Context, identifier: str, history: bool) -> None:
"""List all the files of the table."""
catalog, output = _catalog_and_output(ctx)
catalog_table = catalog.load_table(identifier)
output.files(catalog_table, history)
@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def schema(ctx: Context, identifier: str) -> None:
"""Get the schema of the table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
output.schema(table.schema())
@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def spec(ctx: Context, identifier: str) -> None:
"""Return the partition spec of the table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
output.spec(table.spec())
@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def uuid(ctx: Context, identifier: str) -> None:
"""Return the UUID of the table."""
catalog, output = _catalog_and_output(ctx)
metadata = catalog.load_table(identifier).metadata
output.uuid(metadata.table_uuid)
@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def location(ctx: Context, identifier: str) -> None:
"""Return the location of the table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
output.text(table.location())
@run.command()
@click.pass_context
@catch_exception()
def version(ctx: Context) -> None:
"""Print pyiceberg version."""
ctx.obj["output"].version(__version__)
@run.group()
def create() -> None:
"""Operation to create a namespace."""
@create.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str) -> None:
"""Create a namespace."""
catalog, output = _catalog_and_output(ctx)
catalog.create_namespace(identifier)
output.text(f"Created namespace: {identifier}")
@run.group()
def drop() -> None:
"""Operations to drop a namespace or table."""
@drop.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def table(ctx: Context, identifier: str) -> None: # noqa: F811
"""Drop a table."""
catalog, output = _catalog_and_output(ctx)
catalog.drop_table(identifier)
output.text(f"Dropped table: {identifier}")
@drop.command() # type: ignore
@click.argument("identifier")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str) -> None: # noqa: F811
"""Drop a namespace."""
catalog, output = _catalog_and_output(ctx)
catalog.drop_namespace(identifier)
output.text(f"Dropped namespace: {identifier}")
@run.command()
@click.argument("from_identifier")
@click.argument("to_identifier")
@click.pass_context
@catch_exception()
def rename(ctx: Context, from_identifier: str, to_identifier: str) -> None:
"""Rename a table."""
catalog, output = _catalog_and_output(ctx)
catalog.rename_table(from_identifier, to_identifier)
output.text(f"Renamed table from {from_identifier} to {to_identifier}")
@run.group()
def properties() -> None:
"""Properties on tables/namespaces."""
@properties.group()
def get() -> None:
"""Fetch properties on tables/namespaces."""
@get.command("namespace")
@click.argument("identifier")
@click.argument("property_name", required=False)
@click.pass_context
@catch_exception()
def get_namespace(ctx: Context, identifier: str, property_name: str) -> None:
"""Fetch properties on a namespace."""
catalog, output = _catalog_and_output(ctx)
identifier_tuple = Catalog.identifier_to_tuple(identifier)
namespace_properties = catalog.load_namespace_properties(identifier_tuple)
assert namespace_properties
if property_name:
if property_value := namespace_properties.get(property_name):
output.text(property_value)
else:
raise NoSuchPropertyException(f"Could not find property {property_name} on namespace {identifier}")
else:
output.describe_properties(namespace_properties)
@get.command("table")
@click.argument("identifier")
@click.argument("property_name", required=False)
@click.pass_context
@catch_exception()
def get_table(ctx: Context, identifier: str, property_name: str) -> None:
"""Fetch properties on a table."""
catalog, output = _catalog_and_output(ctx)
identifier_tuple = Catalog.identifier_to_tuple(identifier)
metadata = catalog.load_table(identifier_tuple).metadata
assert metadata
if property_name:
if property_value := metadata.properties.get(property_name):
output.text(property_value)
else:
raise NoSuchPropertyException(f"Could not find property {property_name} on table {identifier}")
else:
output.describe_properties(metadata.properties)
@properties.group()
def set() -> None:
"""Set a property on tables/namespaces."""
@set.command() # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.argument("property_value")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str, property_name: str, property_value: str) -> None: # noqa: F811
"""Set a property on a namespace."""
catalog, output = _catalog_and_output(ctx)
catalog.update_namespace_properties(identifier, updates={property_name: property_value})
output.text(f"Updated {property_name} on {identifier}")
@set.command() # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.argument("property_value")
@click.pass_context
@catch_exception()
def table(ctx: Context, identifier: str, property_name: str, property_value: str) -> None: # noqa: F811
"""Set a property on a table."""
catalog, output = _catalog_and_output(ctx)
identifier_tuple = Catalog.identifier_to_tuple(identifier)
_ = catalog.load_table(identifier_tuple)
output.text(f"Setting {property_name}={property_value} on {identifier}")
raise NotImplementedError("Writing is WIP")
@properties.group()
def remove() -> None:
"""Remove a property from tables/namespaces."""
@remove.command() # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F811
"""Remove a property from a namespace."""
catalog, output = _catalog_and_output(ctx)
result = catalog.update_namespace_properties(identifier, removals={property_name})
if result.removed == [property_name]:
output.text(f"Property {property_name} removed from {identifier}")
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")
@remove.command() # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.pass_context
@catch_exception()
def table(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F811
"""Remove a property from a table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
if property_name in table.metadata.properties:
output.exception(NotImplementedError("Writing is WIP"))
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")
@run.command()
@click.argument("identifier")
@click.option("--type", required=False)
@click.option("--verbose", type=click.BOOL)
@click.pass_context
@catch_exception()
def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
"""List all the refs in the provided table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
refs = table.refs()
if type:
type = type.lower()
if type not in {"branch", "tag"}:
raise ValueError(f"Type must be either branch or tag, got: {type}")
relevant_refs = [
(ref_name, ref.snapshot_ref_type, _retention_properties(ref, table.properties))
for (ref_name, ref) in refs.items()
if not type or ref.snapshot_ref_type == type
]
output.describe_refs(relevant_refs)
def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
retention_properties = {}
if ref.snapshot_ref_type == "branch":
default_min_snapshots_to_keep = property_as_int(
table_properties,
TableProperties.MIN_SNAPSHOTS_TO_KEEP,
TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
)
default_max_snapshot_age_ms = property_as_int(
table_properties,
TableProperties.MAX_SNAPSHOT_AGE_MS,
TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT,
)
retention_properties["min_snapshots_to_keep"] = (
str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else str(default_min_snapshots_to_keep)
)
retention_properties["max_snapshot_age_ms"] = (
str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else str(default_max_snapshot_age_ms)
)
else:
retention_properties["min_snapshots_to_keep"] = "N/A"
retention_properties["max_snapshot_age_ms"] = "N/A"
retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever"
return retention_properties