in samtranslator/model/stepfunctions/events.py [0:0]
def _add_swagger_integration(self, api, api_id, resource, role, intrinsics_resolver): # type: ignore[no-untyped-def]
"""Adds the path and method for this Api event source to the Swagger body for the provided RestApi.
:param model.apigateway.ApiGatewayRestApi rest_api: the RestApi to which the path and method should be added.
"""
swagger_body = api.get("DefinitionBody")
if swagger_body is None:
return
integration_uri = fnSub("arn:${AWS::Partition}:apigateway:${AWS::Region}:states:action/StartExecution")
editor = SwaggerEditor(swagger_body)
if editor.has_integration(self.Path, self.Method):
# Cannot add the integration, if it is already present
raise InvalidEventException(
self.relative_id,
f'API method "{self.Method}" defined multiple times for path "{self.Path}".',
)
condition = None
if CONDITION in resource.resource_attributes:
condition = resource.resource_attributes[CONDITION]
request_template = (
self._generate_request_template_unescaped(resource)
if self.UnescapeMappingTemplate
else self._generate_request_template(resource)
)
editor.add_state_machine_integration( # type: ignore[no-untyped-call]
self.Path,
self.Method,
integration_uri,
role.get_runtime_attr("arn"),
request_template,
condition=condition,
)
# self.Stage is not None as it is set in _get_permissions()
# before calling this method.
# TODO: refactor to remove this cast
stage = cast(str, self.Stage)
if self.Auth:
PushApi.add_auth_to_swagger(
self.Auth, api, api_id, self.relative_id, self.Method, self.Path, stage, editor, intrinsics_resolver
)
api["DefinitionBody"] = editor.swagger