from typing import Any, Dict, List, Optional, Union

from samtranslator.model import GeneratedProperty, Resource
from samtranslator.model.exceptions import ExpectedType, InvalidResourceException
from samtranslator.model.intrinsics import fnSub, ref
from samtranslator.model.types import PassThrough
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.utils.types import Intrinsicable
from samtranslator.validator.value_validator import sam_expect

APIGATEWAY_AUTHORIZER_KEY = "x-amazon-apigateway-authorizer"


class ApiGatewayV2HttpApi(Resource):
    resource_type = "AWS::ApiGatewayV2::Api"
    property_types = {
        "Body": GeneratedProperty(),
        "BodyS3Location": GeneratedProperty(),
        "Description": GeneratedProperty(),
        "FailOnWarnings": GeneratedProperty(),
        "DisableExecuteApiEndpoint": GeneratedProperty(),
        "BasePath": GeneratedProperty(),
        "Tags": GeneratedProperty(),
        "CorsConfiguration": GeneratedProperty(),
    }

    runtime_attrs = {"http_api_id": lambda self: ref(self.logical_id)}

    def assign_tags(self, tags: Dict[str, Any]) -> None:
        """Overriding default 'assign_tags' function in Resource class

        Function to assign tags to the resource
        :param tags: Tags to be assigned to the resource

        """
        # Tags are already defined in Body so they do not need to be assigned here
        return


class ApiGatewayV2Stage(Resource):
    resource_type = "AWS::ApiGatewayV2::Stage"
    property_types = {
        "AccessLogSettings": GeneratedProperty(),
        "DefaultRouteSettings": GeneratedProperty(),
        "RouteSettings": GeneratedProperty(),
        "ClientCertificateId": GeneratedProperty(),
        "Description": GeneratedProperty(),
        "ApiId": GeneratedProperty(),
        "StageName": GeneratedProperty(),
        "Tags": GeneratedProperty(),
        "StageVariables": GeneratedProperty(),
        "AutoDeploy": GeneratedProperty(),
    }

    runtime_attrs = {"stage_name": lambda self: ref(self.logical_id)}
    Tags: Optional[PassThrough]

    def assign_tags(self, tags: Dict[str, Any]) -> None:
        """Overriding default 'assign_tags' function in Resource class

        Function to assign tags to the resource
        :param tags: Tags to be assigned to the resource

        """
        if tags is not None and "Tags" in self.property_types:
            self.Tags = tags


class ApiGatewayV2DomainName(Resource):
    resource_type = "AWS::ApiGatewayV2::DomainName"
    property_types = {
        "DomainName": GeneratedProperty(),
        "DomainNameConfigurations": GeneratedProperty(),
        "MutualTlsAuthentication": GeneratedProperty(),
        "Tags": GeneratedProperty(),
    }

    DomainName: Intrinsicable[str]
    DomainNameConfigurations: Optional[List[Dict[str, Any]]]
    MutualTlsAuthentication: Optional[Dict[str, Any]]
    Tags: Optional[PassThrough]

    def assign_tags(self, tags: Dict[str, Any]) -> None:
        """Overriding default 'assign_tags' function in Resource class

        Function to assign tags to the resource
        :param tags: Tags to be assigned to the resource

        """
        if tags is not None and "Tags" in self.property_types:
            self.Tags = tags


class ApiGatewayV2ApiMapping(Resource):
    resource_type = "AWS::ApiGatewayV2::ApiMapping"
    property_types = {
        "ApiId": GeneratedProperty(),
        "ApiMappingKey": GeneratedProperty(),
        "DomainName": GeneratedProperty(),
        "Stage": GeneratedProperty(),
    }


# https://docs.aws.amazon.com/apigatewayv2/latest/api-reference/apis-apiid-authorizers-authorizerid.html#apis-apiid-authorizers-authorizerid-model-jwtconfiguration
# Change to TypedDict when we don't have to support Python 3.7
JwtConfiguration = Dict[str, Union[str, List[str]]]


class ApiGatewayV2Authorizer:
    def __init__(  # type: ignore[no-untyped-def] # noqa: PLR0913
        self,
        api_logical_id=None,
        name=None,
        authorization_scopes=None,
        jwt_configuration=None,
        id_source=None,
        function_arn=None,
        function_invoke_role=None,
        identity=None,
        authorizer_payload_format_version=None,
        enable_simple_responses=None,
        is_aws_iam_authorizer=False,
        enable_function_default_permissions=None,
    ):
        """
        Creates an authorizer for use in V2 Http Apis
        """
        self.api_logical_id = api_logical_id
        self.name = name
        self.authorization_scopes = authorization_scopes
        self.jwt_configuration: Optional[JwtConfiguration] = self._get_jwt_configuration(
            jwt_configuration, api_logical_id
        )
        self.id_source = id_source
        self.function_arn = function_arn
        self.function_invoke_role = function_invoke_role
        self.identity = identity
        self.authorizer_payload_format_version = authorizer_payload_format_version
        self.enable_simple_responses = enable_simple_responses
        self.is_aws_iam_authorizer = is_aws_iam_authorizer
        self.enable_function_default_permissions = enable_function_default_permissions

        self._validate_input_parameters()

        authorizer_type = self._get_auth_type()

        # Validate necessary parameters exist
        if authorizer_type == "JWT":
            self._validate_jwt_authorizer()

        if authorizer_type == "REQUEST":
            self._validate_lambda_authorizer()

        if enable_function_default_permissions is not None:
            sam_expect(
                enable_function_default_permissions,
                api_logical_id,
                f"Authorizers.{name}.EnableFunctionDefaultPermissions",
            ).to_be_a_bool()

    def _get_auth_type(self) -> str:
        if self.is_aws_iam_authorizer:
            return "AWS_IAM"
        if self.jwt_configuration:
            return "JWT"
        return "REQUEST"

    def _validate_input_parameters(self) -> None:
        authorizer_type = self._get_auth_type()

        if self.authorization_scopes is not None and not isinstance(self.authorization_scopes, list):
            raise InvalidResourceException(self.api_logical_id, "AuthorizationScopes must be a list.")

        if self.authorization_scopes is not None and not authorizer_type == "JWT":
            raise InvalidResourceException(
                self.api_logical_id, "AuthorizationScopes must be defined only for OAuth2 Authorizer."
            )

        if self.jwt_configuration is not None and not authorizer_type == "JWT":
            raise InvalidResourceException(
                self.api_logical_id, "JwtConfiguration must be defined only for OAuth2 Authorizer."
            )

        if self.id_source is not None and not authorizer_type == "JWT":
            raise InvalidResourceException(
                self.api_logical_id, "IdentitySource must be defined only for OAuth2 Authorizer."
            )

        if self.function_arn is not None and not authorizer_type == "REQUEST":
            raise InvalidResourceException(
                self.api_logical_id, "FunctionArn must be defined only for Lambda Authorizer."
            )

        if self.function_invoke_role is not None and not authorizer_type == "REQUEST":
            raise InvalidResourceException(
                self.api_logical_id, "FunctionInvokeRole must be defined only for Lambda Authorizer."
            )

        if self.identity is not None and not authorizer_type == "REQUEST":
            raise InvalidResourceException(self.api_logical_id, "Identity must be defined only for Lambda Authorizer.")

        if self.authorizer_payload_format_version is not None and not authorizer_type == "REQUEST":
            raise InvalidResourceException(
                self.api_logical_id, "AuthorizerPayloadFormatVersion must be defined only for Lambda Authorizer."
            )

        if self.enable_simple_responses is not None and not authorizer_type == "REQUEST":
            raise InvalidResourceException(
                self.api_logical_id, "EnableSimpleResponses must be defined only for Lambda Authorizer."
            )

        if self.enable_function_default_permissions is not None and authorizer_type != "REQUEST":
            raise InvalidResourceException(
                self.api_logical_id, "EnableFunctionDefaultPermissions must be defined only for Lambda Authorizer."
            )

    def _validate_jwt_authorizer(self) -> None:
        if not self.jwt_configuration:
            raise InvalidResourceException(
                self.api_logical_id, f"{self.name} OAuth2 Authorizer must define 'JwtConfiguration'."
            )
        if not self.id_source:
            raise InvalidResourceException(
                self.api_logical_id, f"{self.name} OAuth2 Authorizer must define 'IdentitySource'."
            )

    def _validate_lambda_authorizer(self) -> None:
        if not self.function_arn:
            raise InvalidResourceException(
                self.api_logical_id, f"{self.name} Lambda Authorizer must define 'FunctionArn'."
            )
        if not self.authorizer_payload_format_version:
            raise InvalidResourceException(
                self.api_logical_id, f"{self.name} Lambda Authorizer must define 'AuthorizerPayloadFormatVersion'."
            )

    def generate_openapi(self) -> Dict[str, Any]:
        """
        Generates OAS for the securitySchemes section
        """
        authorizer_type = self._get_auth_type()
        openapi: Dict[str, Any]

        if authorizer_type == "AWS_IAM":
            openapi = {
                "type": "apiKey",
                "name": "Authorization",
                "in": "header",
                "x-amazon-apigateway-authtype": "awsSigv4",
            }

        elif authorizer_type == "JWT":
            openapi = {
                "type": "oauth2",
                APIGATEWAY_AUTHORIZER_KEY: {
                    "jwtConfiguration": self.jwt_configuration,
                    "identitySource": self.id_source,
                    "type": "jwt",
                },
            }

        elif authorizer_type == "REQUEST":
            openapi = {
                "type": "apiKey",
                "name": "Unused",
                "in": "header",
                APIGATEWAY_AUTHORIZER_KEY: {"type": "request"},
            }

            # Generate the lambda arn
            partition = ArnGenerator.get_partition_name()
            resource = "lambda:path/2015-03-31/functions/${__FunctionArn__}/invocations"
            authorizer_uri = fnSub(
                ArnGenerator.generate_arn(
                    partition=partition, service="apigateway", resource=resource, include_account_id=False
                ),
                {"__FunctionArn__": self.function_arn},
            )
            openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerUri"] = authorizer_uri

            # Set authorizerCredentials if present
            function_invoke_role = self._get_function_invoke_role()
            if function_invoke_role:
                openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerCredentials"] = function_invoke_role

            # Set identitySource if present
            if self.identity:
                sam_expect(self.identity, self.api_logical_id, f"Auth.Authorizers.{self.name}.Identity").to_be_a_map()
                # Set authorizerResultTtlInSeconds if present
                reauthorize_every = self.identity.get("ReauthorizeEvery")
                if reauthorize_every is not None:
                    openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerResultTtlInSeconds"] = reauthorize_every

                # Set identitySource if present
                openapi[APIGATEWAY_AUTHORIZER_KEY]["identitySource"] = self._get_identity_source(self.identity)

            # Set authorizerPayloadFormatVersion. It's a required parameter
            openapi[APIGATEWAY_AUTHORIZER_KEY][
                "authorizerPayloadFormatVersion"
            ] = self.authorizer_payload_format_version

            # Set enableSimpleResponses if present
            if self.enable_simple_responses:
                openapi[APIGATEWAY_AUTHORIZER_KEY]["enableSimpleResponses"] = self.enable_simple_responses

        else:
            raise ValueError(f"Unexpected authorizer_type: {authorizer_type}")
        return openapi

    def _get_function_invoke_role(self) -> Optional[PassThrough]:
        if not self.function_invoke_role or self.function_invoke_role == "NONE":
            return None

        return self.function_invoke_role

    def _get_identity_source(self, auth_identity: Dict[str, Any]) -> List[str]:
        """
        Generate the list of identitySource using authorizer's Identity config by flatting them.
        For the format of identitySource, see:
        https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-swagger-extensions-authorizer.html

        It will add API GW prefix to each item:
        - prefix "$request.header." to all values in "Headers"
        - prefix "$request.querystring." to all values in "QueryStrings"
        - prefix "$stageVariables." to all values in "StageVariables"
        - prefix "$context." to all values in "Context"
        """
        identity_source: List[str] = []

        identity_property_path = f"Authorizers.{self.name}.Identity"

        for prefix, property_name in [
            ("$request.header.", "Headers"),
            ("$request.querystring.", "QueryStrings"),
            ("$stageVariables.", "StageVariables"),
            ("$context.", "Context"),
        ]:
            property_values = auth_identity.get(property_name)
            if property_values:
                sam_expect(
                    property_values, self.api_logical_id, f"{identity_property_path}.{property_name}"
                ).to_be_a_list_of(ExpectedType.STRING)
                identity_source += [prefix + value for value in property_values]

        return identity_source

    @staticmethod
    def _get_jwt_configuration(
        props: Optional[Dict[str, Union[str, List[str]]]], api_logical_id: str
    ) -> Optional[JwtConfiguration]:
        """Make sure that JWT configuration dict keys are lower case.

        ApiGatewayV2Authorizer doesn't create `AWS::ApiGatewayV2::Authorizer` but generates
        Open Api which will be appended to the API's Open Api definition body.
        For Open Api JWT configuration keys should be in lower case.
        But for `AWS::ApiGatewayV2::Authorizer` the same keys are capitalized,
        the way it's usually done in CloudFormation resources.
        Users get often confused when passing capitalized key to `AWS::Serverless::HttpApi` doesn't work.
        There exist a comment about that in the documentation
        https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-httpapi-oauth2authorizer.html#sam-httpapi-oauth2authorizer-jwtconfiguration
        but the comment doesn't prevent users from making the error.

        Parameters
        ----------
        props: jwt configuration dict with the keys either lower case or capitalized
        api_logical_id: logical id of the Serverless Api resource with the jwt configuration

        Returns
        -------
            jwt configuration dict with low case keys
        """
        if not props:
            return None
        sam_expect(props, api_logical_id, "JwtConfiguration").to_be_a_map()
        return {k.lower(): v for k, v in props.items()}
