in samtranslator/model/sam_resources.py [0:0]
def _validate_and_inject_resource(self, dest_config, event, logical_id, conditions):
"""
For Event Invoke Config, if the user has not specified a destination ARN for SQS/SNS, SAM
auto creates a SQS and SNS resource with defaults. Intrinsics are supported in the Destination
ARN property, so to handle conditional ifs we have to inject if conditions in the auto created
SQS/SNS resources as well as in the policy documents.
"""
accepted_types_list = ["SQS", "SNS", "EventBridge", "Lambda"]
auto_inject_list = ["SQS", "SNS"]
resource = None
policy = {}
destination = {}
destination["Destination"] = dest_config.get("Destination")
resource_logical_id = logical_id + event
if dest_config.get("Type") is None or dest_config.get("Type") not in accepted_types_list:
raise InvalidResourceException(
self.logical_id, "'Type: {}' must be one of {}".format(dest_config.get("Type"), accepted_types_list)
)
property_condition, dest_arn = self._get_or_make_condition(
dest_config.get("Destination"), logical_id, conditions
)
if dest_config.get("Destination") is None or property_condition is not None:
combined_condition = self._make_and_conditions(
self.get_passthrough_resource_attributes().get("Condition"), property_condition, conditions
)
if dest_config.get("Type") in auto_inject_list:
if dest_config.get("Type") == "SQS":
resource = SQSQueue(
resource_logical_id + "Queue", attributes=self.get_passthrough_resource_attributes()
)
if dest_config.get("Type") == "SNS":
resource = SNSTopic(
resource_logical_id + "Topic", attributes=self.get_passthrough_resource_attributes()
)
if combined_condition:
resource.set_resource_attribute("Condition", combined_condition)
if property_condition:
destination["Destination"] = make_conditional(
property_condition, resource.get_runtime_attr("arn"), dest_arn
)
else:
destination["Destination"] = resource.get_runtime_attr("arn")
policy = self._add_event_invoke_managed_policy(
dest_config, resource_logical_id, property_condition, destination["Destination"]
)
else:
raise InvalidResourceException(
self.logical_id, "Destination is required if Type is not {}".format(auto_inject_list)
)
if dest_config.get("Destination") is not None and property_condition is None:
policy = self._add_event_invoke_managed_policy(
dest_config, resource_logical_id, None, dest_config.get("Destination")
)
return resource, destination, policy