in samtranslator/model/eventsources/push.py [0:0]
def to_cloudformation(self, **kwargs):
"""Returns the Lambda Permission resource allowing SNS to invoke the function this event source triggers.
:param dict kwargs: no existing resources need to be modified
:returns: a list of vanilla CloudFormation Resources, to which this SNS event expands
:rtype: list
"""
function = kwargs.get("function")
role = kwargs.get("role")
if not function:
raise TypeError("Missing required keyword argument: function")
# SNS -> Lambda
if not self.SqsSubscription:
subscription = self._inject_subscription(
"lambda",
function.get_runtime_attr("arn"),
self.Topic,
self.Region,
self.FilterPolicy,
function,
)
return [self._construct_permission(function, source_arn=self.Topic), subscription]
# SNS -> SQS(Create New) -> Lambda
if isinstance(self.SqsSubscription, bool):
resources = []
queue = self._inject_sqs_queue(function)
queue_arn = queue.get_runtime_attr("arn")
queue_url = queue.get_runtime_attr("queue_url")
queue_policy = self._inject_sqs_queue_policy(self.Topic, queue_arn, queue_url, function)
subscription = self._inject_subscription(
"sqs", queue_arn, self.Topic, self.Region, self.FilterPolicy, function
)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn)
resources = resources + event_source
resources.append(queue)
resources.append(queue_policy)
resources.append(subscription)
return resources
# SNS -> SQS(Existing) -> Lambda
resources = []
queue_arn = self.SqsSubscription.get("QueueArn", None)
queue_url = self.SqsSubscription.get("QueueUrl", None)
if not queue_arn or not queue_url:
raise InvalidEventException(self.relative_id, "No QueueARN or QueueURL provided.")
queue_policy_logical_id = self.SqsSubscription.get("QueuePolicyLogicalId", None)
batch_size = self.SqsSubscription.get("BatchSize", None)
enabled = self.SqsSubscription.get("Enabled", None)
queue_policy = self._inject_sqs_queue_policy(
self.Topic, queue_arn, queue_url, function, queue_policy_logical_id
)
subscription = self._inject_subscription("sqs", queue_arn, self.Topic, self.Region, self.FilterPolicy, function)
event_source = self._inject_sqs_event_source_mapping(function, role, queue_arn, batch_size, enabled)
resources = resources + event_source
resources.append(queue_policy)
resources.append(subscription)
return resources