in samtranslator/model/eventsources/pull.py [0:0]
def to_cloudformation(self, **kwargs):
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
policy to the function's execution role, if such a role is provided.
:param dict kwargs: a dict containing the execution role generated for the function
:returns: a list of vanilla CloudFormation Resources, to which this pull event expands
:rtype: list
"""
function = kwargs.get("function")
if not function:
raise TypeError("Missing required keyword argument: function")
resources = []
lambda_eventsourcemapping = LambdaEventSourceMapping(
self.logical_id, attributes=function.get_passthrough_resource_attributes()
)
resources.append(lambda_eventsourcemapping)
try:
# Name will not be available for Alias resources
function_name_or_arn = function.get_runtime_attr("name")
except NotImplementedError:
function_name_or_arn = function.get_runtime_attr("arn")
if self.requires_stream_queue_broker and not self.Stream and not self.Queue and not self.Broker:
raise InvalidEventException(
self.relative_id,
"No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
)
if self.Stream and not self.StartingPosition:
raise InvalidEventException(self.relative_id, "StartingPosition is required for Kinesis, DynamoDB and MSK.")
lambda_eventsourcemapping.FunctionName = function_name_or_arn
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue or self.Broker
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
lambda_eventsourcemapping.BatchSize = self.BatchSize
lambda_eventsourcemapping.Enabled = self.Enabled
lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor
lambda_eventsourcemapping.Topics = self.Topics
lambda_eventsourcemapping.Queues = self.Queues
lambda_eventsourcemapping.SourceAccessConfigurations = self.SourceAccessConfigurations
lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes
lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria
self._validate_filter_criteria()
if self.KafkaBootstrapServers:
lambda_eventsourcemapping.SelfManagedEventSource = {
"Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
}
destination_config_policy = None
if self.DestinationConfig:
if self.DestinationConfig.get("OnFailure") is None:
raise InvalidEventException(self.logical_id, "'OnFailure' is a required field for 'DestinationConfig'")
# `Type` property is for sam to attach the right policies
destination_type = self.DestinationConfig.get("OnFailure").get("Type")
# SAM attaches the policies for SQS or SNS only if 'Type' is given
if destination_type:
# delete this field as its used internally for SAM to determine the policy
del self.DestinationConfig["OnFailure"]["Type"]
# the values 'SQS' and 'SNS' are allowed. No intrinsics are allowed
if destination_type not in ["SQS", "SNS"]:
raise InvalidEventException(self.logical_id, "The only valid values for 'Type' are 'SQS' and 'SNS'")
if destination_type == "SQS":
queue_arn = self.DestinationConfig.get("OnFailure").get("Destination")
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(
queue_arn, self.logical_id
)
elif destination_type == "SNS":
sns_topic_arn = self.DestinationConfig.get("OnFailure").get("Destination")
destination_config_policy = IAMRolePolicies().sns_publish_role_policy(
sns_topic_arn, self.logical_id
)
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
if "role" in kwargs:
self._link_policy(kwargs["role"], destination_config_policy)
return resources