def _validate_and_inject_resource()

in samtranslator/model/sam_resources.py [0:0]


    def _validate_and_inject_resource(self, dest_config, event, logical_id, conditions):
        """
        For Event Invoke Config, if the user has not specified a destination ARN for SQS/SNS, SAM
        auto creates a SQS and SNS resource with defaults. Intrinsics are supported in the Destination
        ARN property, so to handle conditional ifs we have to inject if conditions in the auto created
        SQS/SNS resources as well as in the policy documents.
        """
        accepted_types_list = ["SQS", "SNS", "EventBridge", "Lambda"]
        auto_inject_list = ["SQS", "SNS"]
        resource = None
        policy = {}
        destination = {}
        destination["Destination"] = dest_config.get("Destination")

        resource_logical_id = logical_id + event
        if dest_config.get("Type") is None or dest_config.get("Type") not in accepted_types_list:
            raise InvalidResourceException(
                self.logical_id, "'Type: {}' must be one of {}".format(dest_config.get("Type"), accepted_types_list)
            )

        property_condition, dest_arn = self._get_or_make_condition(
            dest_config.get("Destination"), logical_id, conditions
        )
        if dest_config.get("Destination") is None or property_condition is not None:
            combined_condition = self._make_and_conditions(
                self.get_passthrough_resource_attributes().get("Condition"), property_condition, conditions
            )
            if dest_config.get("Type") in auto_inject_list:
                if dest_config.get("Type") == "SQS":
                    resource = SQSQueue(
                        resource_logical_id + "Queue", attributes=self.get_passthrough_resource_attributes()
                    )
                if dest_config.get("Type") == "SNS":
                    resource = SNSTopic(
                        resource_logical_id + "Topic", attributes=self.get_passthrough_resource_attributes()
                    )
                if combined_condition:
                    resource.set_resource_attribute("Condition", combined_condition)
                if property_condition:
                    destination["Destination"] = make_conditional(
                        property_condition, resource.get_runtime_attr("arn"), dest_arn
                    )
                else:
                    destination["Destination"] = resource.get_runtime_attr("arn")
                policy = self._add_event_invoke_managed_policy(
                    dest_config, resource_logical_id, property_condition, destination["Destination"]
                )
            else:
                raise InvalidResourceException(
                    self.logical_id, "Destination is required if Type is not {}".format(auto_inject_list)
                )
        if dest_config.get("Destination") is not None and property_condition is None:
            policy = self._add_event_invoke_managed_policy(
                dest_config, resource_logical_id, None, dest_config.get("Destination")
            )

        return resource, destination, policy