in samtranslator/model/eventsources/push.py [0:0]
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
"""Returns the CloudWatch Events/EventBridge Rule and Lambda Permission to which
this CloudWatch Events/EventBridge event source corresponds.
:param dict kwargs: no existing resources need to be modified
:returns: a list of vanilla CloudFormation Resources, to which this CloudWatch Events/EventBridge event expands
:rtype: list
"""
function = kwargs.get("function")
if not function:
raise TypeError("Missing required keyword argument: function")
resources = []
passthrough_resource_attributes = function.get_passthrough_resource_attributes()
events_rule = EventsRule(self.logical_id, attributes=passthrough_resource_attributes)
events_rule.EventBusName = self.EventBusName
events_rule.EventPattern = self.Pattern
events_rule.Name = self.RuleName
source_arn = events_rule.get_runtime_attr("arn")
dlq_queue_arn = None
if self.DeadLetterConfig is not None:
EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig) # type: ignore[no-untyped-call]
dlq_queue_arn, dlq_resources = EventBridgeRuleUtils.get_dlq_queue_arn_and_resources( # type: ignore[no-untyped-call]
self, source_arn, passthrough_resource_attributes
)
resources.extend(dlq_resources)
if self.State and self.Enabled is not None:
raise InvalidEventException(self.relative_id, "State and Enabled Properties cannot both be specified.")
if self.State:
events_rule.State = self.State
if self.Enabled is not None:
events_rule.State = "ENABLED" if self.Enabled else "DISABLED"
events_rule.Targets = [self._construct_target(function, dlq_queue_arn)] # type: ignore[no-untyped-call]
resources.append(events_rule)
resources.append(self._construct_permission(function, source_arn=source_arn)) # type: ignore[no-untyped-call]
return resources