in samcli/commands/local/lib/swagger/parser.py [0:0]
def get_authorizers(self, event_type: str = Route.API) -> Dict[str, Authorizer]:
"""
Parse Swagger document and returns a list of Authorizer objects
Parameters
----------
event_type: str
String indicating what type of API Gateway this is
Returns
-------
dict[str, Authorizer]
A map of authorizer names and Authorizer objects found in the body definition
"""
authorizers: Dict[str, Authorizer] = {}
authorizer_dict = {}
document_version = self._get_document_version()
if document_version.startswith(SwaggerParser._2_X_VERSION):
LOG.debug("Parsing Swagger document using 2.0 specification")
authorizer_dict = self.swagger.get(SwaggerParser._SWAGGER_SECURITY_DEFINITIONS, {})
elif document_version.startswith(SwaggerParser._3_X_VERSION):
LOG.debug("Parsing Swagger document using 3.0 specification")
authorizer_dict = self.swagger.get(SwaggerParser._SWAGGER_COMPONENTS, {}).get(
SwaggerParser._SWAGGER_SECURITY_SCHEMES, {}
)
else:
raise InvalidOasVersion(
f"An invalid OpenApi version was detected: '{document_version}', must be one of 2.x or 3.x",
)
for auth_name, properties in authorizer_dict.items():
authorizer_object = properties.get(self._AUTHORIZER_KEY)
if not authorizer_object:
LOG.warning("Skip parsing unsupported authorizer '%s'", auth_name)
continue
authorizer_type = authorizer_object.get(SwaggerParser._AUTHORIZER_TYPE, "").lower()
payload_version = authorizer_object.get(SwaggerParser._AUTHORIZER_PAYLOAD_VERSION)
if event_type == Route.HTTP and payload_version not in LambdaAuthorizer.PAYLOAD_VERSIONS:
raise InvalidSecurityDefinition(f"Authorizer '{auth_name}' contains an invalid payload version")
if event_type == Route.API:
payload_version = LambdaAuthorizer.PAYLOAD_V1
lambda_name = LambdaUri.get_function_name(authorizer_object.get(SwaggerParser._AUTHORIZER_LAMBDA_URI))
if not lambda_name:
LOG.warning("Unable to parse authorizerUri '%s' for authorizer '%s', skipping", lambda_name, auth_name)
continue
# only add authorizer if it is Lambda token or request based (not jwt)
if authorizer_type not in LambdaAuthorizer.VALID_TYPES:
LOG.warning("Lambda authorizer '%s' type '%s' is unsupported, skipping", auth_name, authorizer_type)
continue
identity_sources = self._get_lambda_identity_sources(
auth_name, authorizer_type, event_type, properties, authorizer_object
)
validation_expression = authorizer_object.get(SwaggerParser._AUTHORIZER_LAMBDA_VALIDATION)
if event_type == Route.HTTP and validation_expression:
validation_expression = None
LOG.warning(
"Validation expressions is only available on REST APIs, ignoring for Lambda authorizer '%s'",
auth_name,
)
enable_simple_response = authorizer_object.get(SwaggerParser._AUTHORIZER_SIMPLE_RESPONSES, False)
if (
event_type != Route.HTTP
or payload_version != LambdaAuthorizer.PAYLOAD_V2
or not isinstance(enable_simple_response, bool)
):
enable_simple_response = False
if authorizer_object.get(SwaggerParser._AUTHORIZER_SIMPLE_RESPONSES) is not None:
LOG.warning(
"Simple responses are only available on HTTP APIs with payload version "
"2.0, ignoring for Lambda authorizer '%s'",
auth_name,
)
# token based authorizers must have an identity source defined
# this is determined by taking the header key in the properties
# to form the identity source in a previous method call
if not identity_sources and authorizer_type == LambdaAuthorizer.TOKEN:
LOG.warning(
"Skip parsing Lambda authorizer '%s', must contain valid "
"identity sources for Rest Api based token authorizers",
auth_name,
)
continue
lambda_authorizer = LambdaAuthorizer(
authorizer_name=auth_name,
type=authorizer_type,
payload_version=payload_version,
lambda_name=lambda_name,
identity_sources=identity_sources,
validation_string=validation_expression,
use_simple_response=enable_simple_response,
)
authorizers[auth_name] = lambda_authorizer
LOG.debug("Parsing Lambda authorizer '%s' type '%s'", auth_name, authorizer_type)
return authorizers