in samtranslator/model/eventsources/push.py [0:0]
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
"""Returns the Lambda Permission resource allowing SNS to invoke the function this event source triggers.
:param dict kwargs: no existing resources need to be modified
:returns: a list of vanilla CloudFormation Resources, to which this SNS event expands
:rtype: list
"""
function = kwargs.get("function")
role = kwargs.get("role")
if not function:
raise TypeError("Missing required keyword argument: function")
intrinsics_resolver: IntrinsicsResolver = kwargs["intrinsics_resolver"]
# SNS -> Lambda
if not self.SqsSubscription:
subscription = self._inject_subscription(
"lambda",
function.get_runtime_attr("arn"),
self.Topic,
self.Region,
self.FilterPolicy,
self.FilterPolicyScope,
self.RedrivePolicy,
function,
)
return [self._construct_permission(function, source_arn=self.Topic), subscription] # type: ignore[no-untyped-call]
# SNS -> SQS(Create New) -> Lambda
if isinstance(self.SqsSubscription, bool):
resources = [] # type: ignore[var-annotated]
fifo_topic = self._check_fifo_topic(
get_logical_id_from_intrinsic(self.Topic), kwargs.get("original_template"), intrinsics_resolver
)
queue = self._inject_sqs_queue(function, fifo_topic) # type: ignore[no-untyped-call]
queue_arn = queue.get_runtime_attr("arn")
queue_url = queue.get_runtime_attr("queue_url")
queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url, function) # type: ignore[no-untyped-call]
subscription = self._inject_subscription(
"sqs",
queue_arn,
self.Topic,
self.Region,
self.FilterPolicy,
self.FilterPolicyScope,
self.RedrivePolicy,
function,
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn) # type: ignore[no-untyped-call]
resources = resources + event_source
resources.append(queue)
resources.append(queue_policy)
resources.append(subscription)
return resources
# SNS -> SQS(Existing) -> Lambda
resources = []
sqs_subscription: Dict[str, Any] = sam_expect(
self.SqsSubscription, self.relative_id, "SqsSubscription", is_sam_event=True
).to_be_a_map()
queue_arn = sqs_subscription.get("QueueArn")
queue_url = sqs_subscription.get("QueueUrl")
if not queue_arn or not queue_url:
raise InvalidEventException(self.relative_id, "No QueueARN or QueueURL provided.")
queue_policy_logical_id = sqs_subscription.get("QueuePolicyLogicalId")
batch_size = sqs_subscription.get("BatchSize")
enabled = sqs_subscription.get("Enabled")
queue_policy = self._inject_sqs_queue_policy( # type: ignore[no-untyped-call]
self.Topic, queue_arn, queue_url, function, queue_policy_logical_id
)
subscription = self._inject_subscription(
"sqs",
queue_arn,
self.Topic,
self.Region,
self.FilterPolicy,
self.FilterPolicyScope,
self.RedrivePolicy,
function,
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn, batch_size, enabled) # type: ignore[no-untyped-call]
resources = resources + event_source
resources.append(queue_policy)
resources.append(subscription)
return resources