# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=broad-except,redefined-builtin,redefined-outer-name
from functools import wraps
from typing import (
    Any,
    Callable,
    Dict,
    Literal,
    Optional,
    Tuple,
)

import click
from click import Context

from pyiceberg import __version__
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table import TableProperties
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.utils.properties import property_as_int


def catch_exception() -> Callable:  # type: ignore
    def decorator(func: Callable) -> Callable:  # type: ignore
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any):  # type: ignore
            try:
                return func(*args, **kwargs)
            except Exception as e:
                ctx: Context = click.get_current_context(silent=True)
                _, output = _catalog_and_output(ctx)
                output.exception(e)
                ctx.exit(1)

        return wrapper

    return decorator


@click.group()
@click.option("--catalog")
@click.option("--verbose", type=click.BOOL)
@click.option("--output", type=click.Choice(["text", "json"]), default="text")
@click.option("--ugi")
@click.option("--uri")
@click.option("--credential")
@click.pass_context
def run(
    ctx: Context,
    catalog: Optional[str],
    verbose: bool,
    output: str,
    ugi: Optional[str],
    uri: Optional[str],
    credential: Optional[str],
) -> None:
    properties = {}
    if ugi:
        properties["ugi"] = ugi
    if uri:
        properties["uri"] = uri
    if credential:
        properties["credential"] = credential

    ctx.ensure_object(dict)
    if output == "text":
        ctx.obj["output"] = ConsoleOutput(verbose=verbose)
    else:
        ctx.obj["output"] = JsonOutput(verbose=verbose)

    try:
        ctx.obj["catalog"] = load_catalog(catalog, **properties)
    except Exception as e:
        ctx.obj["output"].exception(e)
        ctx.exit(1)

    if not isinstance(ctx.obj["catalog"], Catalog):
        ctx.obj["output"].exception(
            ValueError("Could not determine catalog type from uri. REST (http/https) and Hive (thrift) is supported")
        )
        ctx.exit(1)


def _catalog_and_output(ctx: Context) -> Tuple[Catalog, Output]:
    """Small helper to set the types."""
    return ctx.obj["catalog"], ctx.obj["output"]


@run.command()
@click.pass_context
@click.argument("parent", required=False)
@catch_exception()
def list(ctx: Context, parent: Optional[str]) -> None:  # pylint: disable=redefined-builtin
    """List tables or namespaces."""
    catalog, output = _catalog_and_output(ctx)

    identifiers = []
    if parent:
        # Do we have tables under parent namespace?
        identifiers = catalog.list_tables(parent)
    if not identifiers:
        # List hierarchical namespaces if parent, root namespaces otherwise.
        identifiers = catalog.list_namespaces(parent or ())
    output.identifiers(identifiers)


@run.command()
@click.option("--entity", type=click.Choice(["any", "namespace", "table"]), default="any")
@click.argument("identifier")
@click.pass_context
@catch_exception()
def describe(ctx: Context, entity: Literal["name", "namespace", "table"], identifier: str) -> None:
    """Describe a namespace or a table."""
    catalog, output = _catalog_and_output(ctx)
    identifier_tuple = Catalog.identifier_to_tuple(identifier)

    is_namespace = False
    if entity in {"namespace", "any"} and len(identifier_tuple) > 0:
        try:
            namespace_properties = catalog.load_namespace_properties(identifier_tuple)
            output.describe_properties(namespace_properties)
            is_namespace = True
        except NoSuchNamespaceError as exc:
            if entity != "any" or len(identifier_tuple) == 1:  # type: ignore
                raise exc

    is_table = False
    if entity in {"table", "any"} and len(identifier_tuple) > 1:
        try:
            catalog_table = catalog.load_table(identifier)
            output.describe_table(catalog_table)
            is_table = True
        except NoSuchTableError as exc:
            if entity != "any":
                raise exc

    if is_namespace is False and is_table is False:
        raise NoSuchTableError(f"Table or namespace does not exist: {identifier}")


@run.command()
@click.argument("identifier")
@click.option("--history", is_flag=True)
@click.pass_context
@catch_exception()
def files(ctx: Context, identifier: str, history: bool) -> None:
    """List all the files of the table."""
    catalog, output = _catalog_and_output(ctx)

    catalog_table = catalog.load_table(identifier)
    output.files(catalog_table, history)


@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def schema(ctx: Context, identifier: str) -> None:
    """Get the schema of the table."""
    catalog, output = _catalog_and_output(ctx)
    table = catalog.load_table(identifier)
    output.schema(table.schema())


@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def spec(ctx: Context, identifier: str) -> None:
    """Return the partition spec of the table."""
    catalog, output = _catalog_and_output(ctx)
    table = catalog.load_table(identifier)
    output.spec(table.spec())


@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def uuid(ctx: Context, identifier: str) -> None:
    """Return the UUID of the table."""
    catalog, output = _catalog_and_output(ctx)
    metadata = catalog.load_table(identifier).metadata
    output.uuid(metadata.table_uuid)


@run.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def location(ctx: Context, identifier: str) -> None:
    """Return the location of the table."""
    catalog, output = _catalog_and_output(ctx)
    table = catalog.load_table(identifier)
    output.text(table.location())


@run.command()
@click.pass_context
@catch_exception()
def version(ctx: Context) -> None:
    """Print pyiceberg version."""
    ctx.obj["output"].version(__version__)


@run.group()
def create() -> None:
    """Operation to create a namespace."""


@create.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str) -> None:
    """Create a namespace."""
    catalog, output = _catalog_and_output(ctx)

    catalog.create_namespace(identifier)
    output.text(f"Created namespace: {identifier}")


@run.group()
def drop() -> None:
    """Operations to drop a namespace or table."""


@drop.command()
@click.argument("identifier")
@click.pass_context
@catch_exception()
def table(ctx: Context, identifier: str) -> None:  # noqa: F811
    """Drop a table."""
    catalog, output = _catalog_and_output(ctx)

    catalog.drop_table(identifier)
    output.text(f"Dropped table: {identifier}")


@drop.command()  # type: ignore
@click.argument("identifier")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str) -> None:  # noqa: F811
    """Drop a namespace."""
    catalog, output = _catalog_and_output(ctx)

    catalog.drop_namespace(identifier)
    output.text(f"Dropped namespace: {identifier}")


@run.command()
@click.argument("from_identifier")
@click.argument("to_identifier")
@click.pass_context
@catch_exception()
def rename(ctx: Context, from_identifier: str, to_identifier: str) -> None:
    """Rename a table."""
    catalog, output = _catalog_and_output(ctx)

    catalog.rename_table(from_identifier, to_identifier)
    output.text(f"Renamed table from {from_identifier} to {to_identifier}")


@run.group()
def properties() -> None:
    """Properties on tables/namespaces."""


@properties.group()
def get() -> None:
    """Fetch properties on tables/namespaces."""


@get.command("namespace")
@click.argument("identifier")
@click.argument("property_name", required=False)
@click.pass_context
@catch_exception()
def get_namespace(ctx: Context, identifier: str, property_name: str) -> None:
    """Fetch properties on a namespace."""
    catalog, output = _catalog_and_output(ctx)
    identifier_tuple = Catalog.identifier_to_tuple(identifier)

    namespace_properties = catalog.load_namespace_properties(identifier_tuple)
    assert namespace_properties

    if property_name:
        if property_value := namespace_properties.get(property_name):
            output.text(property_value)
        else:
            raise NoSuchPropertyException(f"Could not find property {property_name} on namespace {identifier}")
    else:
        output.describe_properties(namespace_properties)


@get.command("table")
@click.argument("identifier")
@click.argument("property_name", required=False)
@click.pass_context
@catch_exception()
def get_table(ctx: Context, identifier: str, property_name: str) -> None:
    """Fetch properties on a table."""
    catalog, output = _catalog_and_output(ctx)
    identifier_tuple = Catalog.identifier_to_tuple(identifier)

    metadata = catalog.load_table(identifier_tuple).metadata
    assert metadata

    if property_name:
        if property_value := metadata.properties.get(property_name):
            output.text(property_value)
        else:
            raise NoSuchPropertyException(f"Could not find property {property_name} on table {identifier}")
    else:
        output.describe_properties(metadata.properties)


@properties.group()
def set() -> None:
    """Set a property on tables/namespaces."""


@set.command()  # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.argument("property_value")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str, property_name: str, property_value: str) -> None:  # noqa: F811
    """Set a property on a namespace."""
    catalog, output = _catalog_and_output(ctx)

    catalog.update_namespace_properties(identifier, updates={property_name: property_value})
    output.text(f"Updated {property_name} on {identifier}")


@set.command()  # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.argument("property_value")
@click.pass_context
@catch_exception()
def table(ctx: Context, identifier: str, property_name: str, property_value: str) -> None:  # noqa: F811
    """Set a property on a table."""
    catalog, output = _catalog_and_output(ctx)
    identifier_tuple = Catalog.identifier_to_tuple(identifier)

    _ = catalog.load_table(identifier_tuple)
    output.text(f"Setting {property_name}={property_value} on {identifier}")
    raise NotImplementedError("Writing is WIP")


@properties.group()
def remove() -> None:
    """Remove a property from tables/namespaces."""


@remove.command()  # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.pass_context
@catch_exception()
def namespace(ctx: Context, identifier: str, property_name: str) -> None:  # noqa: F811
    """Remove a property from a namespace."""
    catalog, output = _catalog_and_output(ctx)

    result = catalog.update_namespace_properties(identifier, removals={property_name})

    if result.removed == [property_name]:
        output.text(f"Property {property_name} removed from {identifier}")
    else:
        raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@remove.command()  # type: ignore
@click.argument("identifier")
@click.argument("property_name")
@click.pass_context
@catch_exception()
def table(ctx: Context, identifier: str, property_name: str) -> None:  # noqa: F811
    """Remove a property from a table."""
    catalog, output = _catalog_and_output(ctx)
    table = catalog.load_table(identifier)
    if property_name in table.metadata.properties:
        output.exception(NotImplementedError("Writing is WIP"))
        ctx.exit(1)
    else:
        raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@run.command()
@click.argument("identifier")
@click.option("--type", required=False)
@click.option("--verbose", type=click.BOOL)
@click.pass_context
@catch_exception()
def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
    """List all the refs in the provided table."""
    catalog, output = _catalog_and_output(ctx)
    table = catalog.load_table(identifier)
    refs = table.refs()
    if type:
        type = type.lower()
        if type not in {"branch", "tag"}:
            raise ValueError(f"Type must be either branch or tag, got: {type}")

    relevant_refs = [
        (ref_name, ref.snapshot_ref_type, _retention_properties(ref, table.properties))
        for (ref_name, ref) in refs.items()
        if not type or ref.snapshot_ref_type == type
    ]

    output.describe_refs(relevant_refs)


def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
    retention_properties = {}
    if ref.snapshot_ref_type == "branch":
        default_min_snapshots_to_keep = property_as_int(
            table_properties,
            TableProperties.MIN_SNAPSHOTS_TO_KEEP,
            TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
        )

        default_max_snapshot_age_ms = property_as_int(
            table_properties,
            TableProperties.MAX_SNAPSHOT_AGE_MS,
            TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT,
        )

        retention_properties["min_snapshots_to_keep"] = (
            str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else str(default_min_snapshots_to_keep)
        )
        retention_properties["max_snapshot_age_ms"] = (
            str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else str(default_max_snapshot_age_ms)
        )
    else:
        retention_properties["min_snapshots_to_keep"] = "N/A"
        retention_properties["max_snapshot_age_ms"] = "N/A"

    retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever"

    return retention_properties
