# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import abc
import collections.abc
import json
from typing import Any, Dict, Mapping, Optional, Tuple, Union, get_args, get_origin

from . import sdkType, utils


class Datum:
    def __init__(self, value: Any, type: Optional[str]):
        self.value: Any = value
        self.type: Optional[str] = type

    @property
    def python_value(self) -> Any:
        if self.value is None or self.type is None:
            return None
        elif self.type in ("bytes", "string", "int", "double"):
            return self.value
        elif self.type == "json":
            return json.loads(self.value)
        elif self.type == "collection_string":
            return [v for v in self.value.string]
        elif self.type == "collection_bytes":
            return [v for v in self.value.bytes]
        elif self.type == "collection_double":
            return [v for v in self.value.double]
        elif self.type == "collection_sint64":
            return [v for v in self.value.sint64]
        else:
            return self.value

    @property
    def python_type(self) -> type:
        return type(self.python_value)

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return False

        return self.value == other.value and self.type == other.type

    def __hash__(self):
        return hash((type(self), (self.value, self.type)))

    def __repr__(self):
        val_repr = repr(self.value)
        if len(val_repr) > 10:
            val_repr = val_repr[:10] + "..."
        return "<Datum {} {}>".format(self.type, val_repr)


class _ConverterMeta(abc.ABCMeta):

    _bindings: Dict[str, type] = {}

    def __new__(
        mcls, name, bases, dct, *, binding: Optional[str], trigger: Optional[str] = None
    ):
        cls = super().__new__(mcls, name, bases, dct)
        cls._trigger = trigger  # type: ignore
        if binding is None:
            return cls

        if binding in mcls._bindings:
            raise RuntimeError(
                f"cannot register a converter for {binding!r} binding: "
                f"another converter for this binding has already been "
                f"registered"
            )

        mcls._bindings[binding] = cls
        if trigger is not None:
            mcls._bindings[trigger] = cls

        return cls

    @classmethod
    def get(cls, binding_name):
        return cls._bindings.get(binding_name)

    @classmethod
    def get_raw_bindings(cls, indexed_function, input_types):
        return utils.get_raw_bindings(indexed_function, input_types)

    @classmethod
    def check_supported_type(cls, annotation: type) -> bool:
        if annotation is None:
            return False

        # The annotation is a class/type (not an object) - not iterable
        if (isinstance(annotation, type)
                and issubclass(annotation, sdkType.SdkType)):
            return True

        # An iterable who only has one inner type and is a subclass of SdkType
        return cls._is_iterable_supported_type(annotation)

    @classmethod
    def _is_iterable_supported_type(cls, annotation: type) -> bool:
        # Check base type from type hint. Ex: List from List[SdkType]
        base_type = get_origin(annotation)
        if (base_type is None
                or not issubclass(base_type, collections.abc.Iterable)):
            return False

        inner_types = get_args(annotation)
        if inner_types is None or len(inner_types) != 1:
            return False

        inner_type = inner_types[0]

        return (isinstance(inner_type, type)
                and issubclass(inner_type, sdkType.SdkType))

    def has_trigger_support(cls) -> bool:
        return cls._trigger is not None  # type: ignore


class _BaseConverter(metaclass=_ConverterMeta, binding=None):

    @classmethod
    def _decode_typed_data(
        cls,
        data: Datum,
        *,
        python_type: Union[type, Tuple[type, ...]],
        context: str = "data",
    ) -> Any:
        if data is None:
            return None

        data_type = data.type
        if (data_type == "model_binding_data"
                or data_type == "collection_model_binding_data"):
            result = data.value
        elif data_type is None:
            return None
        else:
            raise ValueError(f"unsupported type of {context}: {data_type}")

        if not isinstance(result, python_type):
            if isinstance(python_type, (tuple, list, dict)):
                raise ValueError(
                    f"unexpected value type in {context}: "
                    f"{type(result).__name__}, expected one of: "
                    f'{", ".join(t.__name__ for t in python_type)}'
                )
            else:
                try:
                    # Try coercing into the requested type
                    result = python_type(result)
                except (TypeError, ValueError) as e:
                    raise ValueError(
                        f"cannot convert value of {context} into "
                        f"{python_type.__name__}: {e}"
                    ) from None

        return result

    @classmethod
    def _decode_trigger_metadata_field(
        cls,
        trigger_metadata: Mapping[str, Datum],
        field: str,
        *,
        python_type: Union[type, Tuple[type, ...]],
    ) -> Any:
        data = trigger_metadata.get(field)
        if data is None:
            return None
        else:
            return cls._decode_typed_data(
                data,
                python_type=python_type,
                context=f"field {field!r} in trigger metadata",
            )


class InConverter(_BaseConverter, binding=None):

    @classmethod
    @abc.abstractmethod
    def check_input_type_annotation(cls, pytype: type) -> bool:
        pass

    @classmethod
    @abc.abstractmethod
    def decode(cls, data: Datum, *, trigger_metadata) -> Any:
        raise NotImplementedError

    @classmethod
    @abc.abstractmethod
    def has_implicit_output(cls) -> bool:
        return False


class OutConverter(_BaseConverter, binding=None):

    @classmethod
    @abc.abstractmethod
    def check_output_type_annotation(cls, pytype: type) -> bool:
        pass

    @classmethod
    @abc.abstractmethod
    def encode(cls, obj: Any, *, expected_type: Optional[type]) -> Optional[Datum]:
        raise NotImplementedError


def get_binding_registry():
    return _ConverterMeta
