in samtranslator/model/eventsources/pull.py [0:0]
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: PLR0912, PLR0915
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
policy to the function's execution role, if such a role is provided.
:param dict kwargs: a dict containing the execution role generated for the function
:returns: a list of vanilla CloudFormation Resources, to which this pull event expands
:rtype: list
"""
function = kwargs.get("function")
if not function:
raise TypeError("Missing required keyword argument: function")
resources = []
lambda_eventsourcemapping = LambdaEventSourceMapping(
self.logical_id, attributes=function.get_passthrough_resource_attributes()
)
resources.append(lambda_eventsourcemapping)
try:
# Name will not be available for Alias resources
function_name_or_arn = function.get_runtime_attr("name")
except KeyError:
function_name_or_arn = function.get_runtime_attr("arn")
lambda_eventsourcemapping.FunctionName = function_name_or_arn
lambda_eventsourcemapping.EventSourceArn = self.get_event_source_arn()
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp
lambda_eventsourcemapping.BatchSize = self.BatchSize
lambda_eventsourcemapping.Enabled = self.Enabled
lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor
lambda_eventsourcemapping.Topics = self.Topics
lambda_eventsourcemapping.Queues = self.Queues
lambda_eventsourcemapping.SourceAccessConfigurations = self.SourceAccessConfigurations
lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes
lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria
lambda_eventsourcemapping.KmsKeyArn = self.KmsKeyArn
lambda_eventsourcemapping.ScalingConfig = self.ScalingConfig
lambda_eventsourcemapping.ProvisionedPollerConfig = self.ProvisionedPollerConfig
lambda_eventsourcemapping.MetricsConfig = self.MetricsConfig
self._validate_filter_criteria()
if self.KafkaBootstrapServers:
lambda_eventsourcemapping.SelfManagedEventSource = {
"Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
}
if self.ConsumerGroupId:
consumer_group_id_structure = {"ConsumerGroupId": self.ConsumerGroupId}
if self.resource_type == "MSK":
lambda_eventsourcemapping.AmazonManagedKafkaEventSourceConfig = consumer_group_id_structure
elif self.resource_type == "SelfManagedKafka":
lambda_eventsourcemapping.SelfManagedKafkaEventSourceConfig = consumer_group_id_structure
else:
raise InvalidEventException(
self.logical_id,
f"Property ConsumerGroupId not defined for resource of type {self.resource_type}.",
)
destination_config_policy: Optional[Dict[str, Any]] = None
if self.DestinationConfig:
on_failure: Dict[str, Any] = sam_expect(
self.DestinationConfig.get("OnFailure"),
self.logical_id,
"DestinationConfig.OnFailure",
is_sam_event=True,
).to_be_a_map()
# `Type` property is for sam to attach the right policies
destination_type = on_failure.get("Type")
# SAM attaches the policies for SQS, SNS or S3 only if 'Type' is given
if destination_type:
# delete this field as its used internally for SAM to determine the policy
del on_failure["Type"]
# the values 'SQS', 'SNS', and 'S3' are allowed. No intrinsics are allowed
if destination_type not in ["SQS", "SNS", "S3"]:
raise InvalidEventException(
self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'"
)
if destination_type == "SQS":
queue_arn = on_failure.get("Destination")
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(
queue_arn, self.logical_id
)
elif destination_type == "SNS":
sns_topic_arn = on_failure.get("Destination")
destination_config_policy = IAMRolePolicies().sns_publish_role_policy(
sns_topic_arn, self.logical_id
)
elif destination_type == "S3":
s3_arn = on_failure.get("Destination")
destination_config_policy = IAMRolePolicies().s3_send_event_payload_role_policy(
s3_arn, self.logical_id
)
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
self.add_extra_eventsourcemapping_fields(lambda_eventsourcemapping)
if "role" in kwargs:
self._link_policy(kwargs["role"], destination_config_policy) # type: ignore[no-untyped-call]
return resources