def to_cloudformation()

in samtranslator/model/eventsources/pull.py [0:0]


    def to_cloudformation(self, **kwargs):  # type: ignore[no-untyped-def] # noqa: PLR0912, PLR0915
        """Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
        policy to the function's execution role, if such a role is provided.

        :param dict kwargs: a dict containing the execution role generated for the function
        :returns: a list of vanilla CloudFormation Resources, to which this pull event expands
        :rtype: list
        """
        function = kwargs.get("function")

        if not function:
            raise TypeError("Missing required keyword argument: function")

        resources = []

        lambda_eventsourcemapping = LambdaEventSourceMapping(
            self.logical_id, attributes=function.get_passthrough_resource_attributes()
        )
        resources.append(lambda_eventsourcemapping)

        try:
            # Name will not be available for Alias resources
            function_name_or_arn = function.get_runtime_attr("name")
        except KeyError:
            function_name_or_arn = function.get_runtime_attr("arn")

        lambda_eventsourcemapping.FunctionName = function_name_or_arn
        lambda_eventsourcemapping.EventSourceArn = self.get_event_source_arn()
        lambda_eventsourcemapping.StartingPosition = self.StartingPosition
        lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp
        lambda_eventsourcemapping.BatchSize = self.BatchSize
        lambda_eventsourcemapping.Enabled = self.Enabled
        lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds
        lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
        lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
        lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
        lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor
        lambda_eventsourcemapping.Topics = self.Topics
        lambda_eventsourcemapping.Queues = self.Queues
        lambda_eventsourcemapping.SourceAccessConfigurations = self.SourceAccessConfigurations
        lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
        lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes
        lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria
        lambda_eventsourcemapping.KmsKeyArn = self.KmsKeyArn
        lambda_eventsourcemapping.ScalingConfig = self.ScalingConfig
        lambda_eventsourcemapping.ProvisionedPollerConfig = self.ProvisionedPollerConfig
        lambda_eventsourcemapping.MetricsConfig = self.MetricsConfig
        self._validate_filter_criteria()

        if self.KafkaBootstrapServers:
            lambda_eventsourcemapping.SelfManagedEventSource = {
                "Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
            }
        if self.ConsumerGroupId:
            consumer_group_id_structure = {"ConsumerGroupId": self.ConsumerGroupId}
            if self.resource_type == "MSK":
                lambda_eventsourcemapping.AmazonManagedKafkaEventSourceConfig = consumer_group_id_structure
            elif self.resource_type == "SelfManagedKafka":
                lambda_eventsourcemapping.SelfManagedKafkaEventSourceConfig = consumer_group_id_structure
            else:
                raise InvalidEventException(
                    self.logical_id,
                    f"Property ConsumerGroupId not defined for resource of type {self.resource_type}.",
                )

        destination_config_policy: Optional[Dict[str, Any]] = None
        if self.DestinationConfig:
            on_failure: Dict[str, Any] = sam_expect(
                self.DestinationConfig.get("OnFailure"),
                self.logical_id,
                "DestinationConfig.OnFailure",
                is_sam_event=True,
            ).to_be_a_map()

            # `Type` property is for sam to attach the right policies
            destination_type = on_failure.get("Type")

            # SAM attaches the policies for SQS, SNS or S3 only if 'Type' is given
            if destination_type:
                # delete this field as its used internally for SAM to determine the policy
                del on_failure["Type"]
                # the values 'SQS', 'SNS', and 'S3' are allowed. No intrinsics are allowed
                if destination_type not in ["SQS", "SNS", "S3"]:
                    raise InvalidEventException(
                        self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'"
                    )
                if destination_type == "SQS":
                    queue_arn = on_failure.get("Destination")
                    destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(
                        queue_arn, self.logical_id
                    )
                elif destination_type == "SNS":
                    sns_topic_arn = on_failure.get("Destination")
                    destination_config_policy = IAMRolePolicies().sns_publish_role_policy(
                        sns_topic_arn, self.logical_id
                    )
                elif destination_type == "S3":
                    s3_arn = on_failure.get("Destination")
                    destination_config_policy = IAMRolePolicies().s3_send_event_payload_role_policy(
                        s3_arn, self.logical_id
                    )

            lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

        self.add_extra_eventsourcemapping_fields(lambda_eventsourcemapping)

        if "role" in kwargs:
            self._link_policy(kwargs["role"], destination_config_policy)  # type: ignore[no-untyped-call]

        return resources