samtranslator/model/apigatewayv2.py (251 lines of code) (raw):
from typing import Any, Dict, List, Optional, Union
from samtranslator.model import GeneratedProperty, Resource
from samtranslator.model.exceptions import ExpectedType, InvalidResourceException
from samtranslator.model.intrinsics import fnSub, ref
from samtranslator.model.types import PassThrough
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.utils.types import Intrinsicable
from samtranslator.validator.value_validator import sam_expect
APIGATEWAY_AUTHORIZER_KEY = "x-amazon-apigateway-authorizer"
class ApiGatewayV2HttpApi(Resource):
resource_type = "AWS::ApiGatewayV2::Api"
property_types = {
"Body": GeneratedProperty(),
"BodyS3Location": GeneratedProperty(),
"Description": GeneratedProperty(),
"FailOnWarnings": GeneratedProperty(),
"DisableExecuteApiEndpoint": GeneratedProperty(),
"BasePath": GeneratedProperty(),
"Tags": GeneratedProperty(),
"CorsConfiguration": GeneratedProperty(),
}
runtime_attrs = {"http_api_id": lambda self: ref(self.logical_id)}
def assign_tags(self, tags: Dict[str, Any]) -> None:
"""Overriding default 'assign_tags' function in Resource class
Function to assign tags to the resource
:param tags: Tags to be assigned to the resource
"""
# Tags are already defined in Body so they do not need to be assigned here
return
class ApiGatewayV2Stage(Resource):
resource_type = "AWS::ApiGatewayV2::Stage"
property_types = {
"AccessLogSettings": GeneratedProperty(),
"DefaultRouteSettings": GeneratedProperty(),
"RouteSettings": GeneratedProperty(),
"ClientCertificateId": GeneratedProperty(),
"Description": GeneratedProperty(),
"ApiId": GeneratedProperty(),
"StageName": GeneratedProperty(),
"Tags": GeneratedProperty(),
"StageVariables": GeneratedProperty(),
"AutoDeploy": GeneratedProperty(),
}
runtime_attrs = {"stage_name": lambda self: ref(self.logical_id)}
Tags: Optional[PassThrough]
def assign_tags(self, tags: Dict[str, Any]) -> None:
"""Overriding default 'assign_tags' function in Resource class
Function to assign tags to the resource
:param tags: Tags to be assigned to the resource
"""
if tags is not None and "Tags" in self.property_types:
self.Tags = tags
class ApiGatewayV2DomainName(Resource):
resource_type = "AWS::ApiGatewayV2::DomainName"
property_types = {
"DomainName": GeneratedProperty(),
"DomainNameConfigurations": GeneratedProperty(),
"MutualTlsAuthentication": GeneratedProperty(),
"Tags": GeneratedProperty(),
}
DomainName: Intrinsicable[str]
DomainNameConfigurations: Optional[List[Dict[str, Any]]]
MutualTlsAuthentication: Optional[Dict[str, Any]]
Tags: Optional[PassThrough]
def assign_tags(self, tags: Dict[str, Any]) -> None:
"""Overriding default 'assign_tags' function in Resource class
Function to assign tags to the resource
:param tags: Tags to be assigned to the resource
"""
if tags is not None and "Tags" in self.property_types:
self.Tags = tags
class ApiGatewayV2ApiMapping(Resource):
resource_type = "AWS::ApiGatewayV2::ApiMapping"
property_types = {
"ApiId": GeneratedProperty(),
"ApiMappingKey": GeneratedProperty(),
"DomainName": GeneratedProperty(),
"Stage": GeneratedProperty(),
}
# https://docs.aws.amazon.com/apigatewayv2/latest/api-reference/apis-apiid-authorizers-authorizerid.html#apis-apiid-authorizers-authorizerid-model-jwtconfiguration
# Change to TypedDict when we don't have to support Python 3.7
JwtConfiguration = Dict[str, Union[str, List[str]]]
class ApiGatewayV2Authorizer:
def __init__( # type: ignore[no-untyped-def] # noqa: PLR0913
self,
api_logical_id=None,
name=None,
authorization_scopes=None,
jwt_configuration=None,
id_source=None,
function_arn=None,
function_invoke_role=None,
identity=None,
authorizer_payload_format_version=None,
enable_simple_responses=None,
is_aws_iam_authorizer=False,
enable_function_default_permissions=None,
):
"""
Creates an authorizer for use in V2 Http Apis
"""
self.api_logical_id = api_logical_id
self.name = name
self.authorization_scopes = authorization_scopes
self.jwt_configuration: Optional[JwtConfiguration] = self._get_jwt_configuration(
jwt_configuration, api_logical_id
)
self.id_source = id_source
self.function_arn = function_arn
self.function_invoke_role = function_invoke_role
self.identity = identity
self.authorizer_payload_format_version = authorizer_payload_format_version
self.enable_simple_responses = enable_simple_responses
self.is_aws_iam_authorizer = is_aws_iam_authorizer
self.enable_function_default_permissions = enable_function_default_permissions
self._validate_input_parameters()
authorizer_type = self._get_auth_type()
# Validate necessary parameters exist
if authorizer_type == "JWT":
self._validate_jwt_authorizer()
if authorizer_type == "REQUEST":
self._validate_lambda_authorizer()
if enable_function_default_permissions is not None:
sam_expect(
enable_function_default_permissions,
api_logical_id,
f"Authorizers.{name}.EnableFunctionDefaultPermissions",
).to_be_a_bool()
def _get_auth_type(self) -> str:
if self.is_aws_iam_authorizer:
return "AWS_IAM"
if self.jwt_configuration:
return "JWT"
return "REQUEST"
def _validate_input_parameters(self) -> None:
authorizer_type = self._get_auth_type()
if self.authorization_scopes is not None and not isinstance(self.authorization_scopes, list):
raise InvalidResourceException(self.api_logical_id, "AuthorizationScopes must be a list.")
if self.authorization_scopes is not None and not authorizer_type == "JWT":
raise InvalidResourceException(
self.api_logical_id, "AuthorizationScopes must be defined only for OAuth2 Authorizer."
)
if self.jwt_configuration is not None and not authorizer_type == "JWT":
raise InvalidResourceException(
self.api_logical_id, "JwtConfiguration must be defined only for OAuth2 Authorizer."
)
if self.id_source is not None and not authorizer_type == "JWT":
raise InvalidResourceException(
self.api_logical_id, "IdentitySource must be defined only for OAuth2 Authorizer."
)
if self.function_arn is not None and not authorizer_type == "REQUEST":
raise InvalidResourceException(
self.api_logical_id, "FunctionArn must be defined only for Lambda Authorizer."
)
if self.function_invoke_role is not None and not authorizer_type == "REQUEST":
raise InvalidResourceException(
self.api_logical_id, "FunctionInvokeRole must be defined only for Lambda Authorizer."
)
if self.identity is not None and not authorizer_type == "REQUEST":
raise InvalidResourceException(self.api_logical_id, "Identity must be defined only for Lambda Authorizer.")
if self.authorizer_payload_format_version is not None and not authorizer_type == "REQUEST":
raise InvalidResourceException(
self.api_logical_id, "AuthorizerPayloadFormatVersion must be defined only for Lambda Authorizer."
)
if self.enable_simple_responses is not None and not authorizer_type == "REQUEST":
raise InvalidResourceException(
self.api_logical_id, "EnableSimpleResponses must be defined only for Lambda Authorizer."
)
if self.enable_function_default_permissions is not None and authorizer_type != "REQUEST":
raise InvalidResourceException(
self.api_logical_id, "EnableFunctionDefaultPermissions must be defined only for Lambda Authorizer."
)
def _validate_jwt_authorizer(self) -> None:
if not self.jwt_configuration:
raise InvalidResourceException(
self.api_logical_id, f"{self.name} OAuth2 Authorizer must define 'JwtConfiguration'."
)
if not self.id_source:
raise InvalidResourceException(
self.api_logical_id, f"{self.name} OAuth2 Authorizer must define 'IdentitySource'."
)
def _validate_lambda_authorizer(self) -> None:
if not self.function_arn:
raise InvalidResourceException(
self.api_logical_id, f"{self.name} Lambda Authorizer must define 'FunctionArn'."
)
if not self.authorizer_payload_format_version:
raise InvalidResourceException(
self.api_logical_id, f"{self.name} Lambda Authorizer must define 'AuthorizerPayloadFormatVersion'."
)
def generate_openapi(self) -> Dict[str, Any]:
"""
Generates OAS for the securitySchemes section
"""
authorizer_type = self._get_auth_type()
openapi: Dict[str, Any]
if authorizer_type == "AWS_IAM":
openapi = {
"type": "apiKey",
"name": "Authorization",
"in": "header",
"x-amazon-apigateway-authtype": "awsSigv4",
}
elif authorizer_type == "JWT":
openapi = {
"type": "oauth2",
APIGATEWAY_AUTHORIZER_KEY: {
"jwtConfiguration": self.jwt_configuration,
"identitySource": self.id_source,
"type": "jwt",
},
}
elif authorizer_type == "REQUEST":
openapi = {
"type": "apiKey",
"name": "Unused",
"in": "header",
APIGATEWAY_AUTHORIZER_KEY: {"type": "request"},
}
# Generate the lambda arn
partition = ArnGenerator.get_partition_name()
resource = "lambda:path/2015-03-31/functions/${__FunctionArn__}/invocations"
authorizer_uri = fnSub(
ArnGenerator.generate_arn(
partition=partition, service="apigateway", resource=resource, include_account_id=False
),
{"__FunctionArn__": self.function_arn},
)
openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerUri"] = authorizer_uri
# Set authorizerCredentials if present
function_invoke_role = self._get_function_invoke_role()
if function_invoke_role:
openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerCredentials"] = function_invoke_role
# Set identitySource if present
if self.identity:
sam_expect(self.identity, self.api_logical_id, f"Auth.Authorizers.{self.name}.Identity").to_be_a_map()
# Set authorizerResultTtlInSeconds if present
reauthorize_every = self.identity.get("ReauthorizeEvery")
if reauthorize_every is not None:
openapi[APIGATEWAY_AUTHORIZER_KEY]["authorizerResultTtlInSeconds"] = reauthorize_every
# Set identitySource if present
openapi[APIGATEWAY_AUTHORIZER_KEY]["identitySource"] = self._get_identity_source(self.identity)
# Set authorizerPayloadFormatVersion. It's a required parameter
openapi[APIGATEWAY_AUTHORIZER_KEY][
"authorizerPayloadFormatVersion"
] = self.authorizer_payload_format_version
# Set enableSimpleResponses if present
if self.enable_simple_responses:
openapi[APIGATEWAY_AUTHORIZER_KEY]["enableSimpleResponses"] = self.enable_simple_responses
else:
raise ValueError(f"Unexpected authorizer_type: {authorizer_type}")
return openapi
def _get_function_invoke_role(self) -> Optional[PassThrough]:
if not self.function_invoke_role or self.function_invoke_role == "NONE":
return None
return self.function_invoke_role
def _get_identity_source(self, auth_identity: Dict[str, Any]) -> List[str]:
"""
Generate the list of identitySource using authorizer's Identity config by flatting them.
For the format of identitySource, see:
https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-swagger-extensions-authorizer.html
It will add API GW prefix to each item:
- prefix "$request.header." to all values in "Headers"
- prefix "$request.querystring." to all values in "QueryStrings"
- prefix "$stageVariables." to all values in "StageVariables"
- prefix "$context." to all values in "Context"
"""
identity_source: List[str] = []
identity_property_path = f"Authorizers.{self.name}.Identity"
for prefix, property_name in [
("$request.header.", "Headers"),
("$request.querystring.", "QueryStrings"),
("$stageVariables.", "StageVariables"),
("$context.", "Context"),
]:
property_values = auth_identity.get(property_name)
if property_values:
sam_expect(
property_values, self.api_logical_id, f"{identity_property_path}.{property_name}"
).to_be_a_list_of(ExpectedType.STRING)
identity_source += [prefix + value for value in property_values]
return identity_source
@staticmethod
def _get_jwt_configuration(
props: Optional[Dict[str, Union[str, List[str]]]], api_logical_id: str
) -> Optional[JwtConfiguration]:
"""Make sure that JWT configuration dict keys are lower case.
ApiGatewayV2Authorizer doesn't create `AWS::ApiGatewayV2::Authorizer` but generates
Open Api which will be appended to the API's Open Api definition body.
For Open Api JWT configuration keys should be in lower case.
But for `AWS::ApiGatewayV2::Authorizer` the same keys are capitalized,
the way it's usually done in CloudFormation resources.
Users get often confused when passing capitalized key to `AWS::Serverless::HttpApi` doesn't work.
There exist a comment about that in the documentation
https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-httpapi-oauth2authorizer.html#sam-httpapi-oauth2authorizer-jwtconfiguration
but the comment doesn't prevent users from making the error.
Parameters
----------
props: jwt configuration dict with the keys either lower case or capitalized
api_logical_id: logical id of the Serverless Api resource with the jwt configuration
Returns
-------
jwt configuration dict with low case keys
"""
if not props:
return None
sam_expect(props, api_logical_id, "JwtConfiguration").to_be_a_map()
return {k.lower(): v for k, v in props.items()}