in samtranslator/model/eventsources/push.py [0:0]
def _add_openapi_integration(self, api, api_id, function, manage_swagger=False): # type: ignore[no-untyped-def]
"""
Adds the path and method for this Api event source to the OpenApi body for the provided RestApi.
"""
open_api_body = api.get("DefinitionBody")
if open_api_body is None:
return
uri = _build_apigw_integration_uri(function, "${AWS::Partition}") # type: ignore[no-untyped-call]
editor = OpenApiEditor(open_api_body)
if manage_swagger and editor.has_integration(self._path, self._method):
# Cannot add the Lambda Integration, if it is already present
raise InvalidEventException(
self.relative_id,
f"API method '{self._method}' defined multiple times for path '{self._path}'.",
)
condition = None
if CONDITION in function.resource_attributes:
condition = function.resource_attributes[CONDITION]
editor.add_lambda_integration(self._path, self._method, uri, self.Auth, api.get("Auth"), condition=condition) # type: ignore[no-untyped-call]
if self.Auth:
self._add_auth_to_openapi_integration(api, api_id, editor, self.Auth)
if self.TimeoutInMillis:
editor.add_timeout_to_method(
api=api, path=self._path, method_name=self._method, timeout=self.TimeoutInMillis
)
path_parameters = re.findall("{(.*?)}", self._path)
if path_parameters:
editor.add_path_parameters_to_method( # type: ignore[no-untyped-call]
api=api, path=self._path, method_name=self._method, path_parameters=path_parameters
)
if self.PayloadFormatVersion:
editor.add_payload_format_version_to_method( # type: ignore[no-untyped-call]
api=api, path=self._path, method_name=self._method, payload_format_version=self.PayloadFormatVersion
)
api["DefinitionBody"] = editor.openapi