in samtranslator/model/eventsources/pull.py [0:0]
def _validate_source_access_configurations(self, supported_types: List[str], required_type: str) -> str:
"""
Validate the SourceAccessConfigurations parameter and return the URI to
be used for policy statement creation.
"""
if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
f"No SourceAccessConfigurations for Amazon {self.resource_type} event provided.",
)
if not isinstance(self.SourceAccessConfigurations, list):
raise InvalidEventException(
self.relative_id,
"Provided SourceAccessConfigurations cannot be parsed into a list.",
)
required_type_uri: Optional[str] = None
for index, conf in enumerate(self.SourceAccessConfigurations):
sam_expect(conf, self.relative_id, f"SourceAccessConfigurations[{index}]", is_sam_event=True).to_be_a_map()
event_type: str = sam_expect(
conf.get("Type"), self.relative_id, f"SourceAccessConfigurations[{index}].Type", is_sam_event=True
).to_be_a_string()
if event_type not in supported_types:
raise InvalidEventException(
self.relative_id,
f"Invalid property Type specified in SourceAccessConfigurations. The supported values are: {supported_types}.",
)
if event_type == required_type:
if required_type_uri:
raise InvalidEventException(
self.relative_id,
f"Multiple {required_type} properties specified in SourceAccessConfigurations.",
)
required_type_uri = conf.get("URI")
if not required_type_uri:
raise InvalidEventException(
self.relative_id,
f"No {required_type} URI property specified in SourceAccessConfigurations.",
)
if not required_type_uri:
raise InvalidEventException(
self.relative_id,
f"No {required_type} property specified in SourceAccessConfigurations.",
)
return required_type_uri