in samtranslator/model/eventsources/pull.py [0:0]
def get_secret_key(self, source_access_configurations: List[Any]) -> Tuple[Optional[str], Optional[str], bool]:
authentication_uri = None
has_vpc_subnet = False
has_vpc_security_group = False
authentication_uri_2 = None
if not isinstance(source_access_configurations, list):
raise InvalidEventException(
self.relative_id,
"SourceAccessConfigurations for self managed kafka event should be a list.",
)
for config in source_access_configurations:
sam_expect(config, self.relative_id, "SourceAccessConfigurations").to_be_a_map()
if config.get("Type") == "VPC_SUBNET":
self.validate_uri(config.get("URI"), "VPC_SUBNET")
has_vpc_subnet = True
elif config.get("Type") == "VPC_SECURITY_GROUP":
self.validate_uri(config.get("URI"), "VPC_SECURITY_GROUP")
has_vpc_security_group = True
elif config.get("Type") in self.AUTH_MECHANISM:
if authentication_uri:
raise InvalidEventException(
self.relative_id,
"Multiple auth mechanism properties specified in SourceAccessConfigurations for self managed kafka event.",
)
self.validate_uri(config.get("URI"), "auth mechanism")
authentication_uri = config.get("URI")
elif config.get("Type") == "SERVER_ROOT_CA_CERTIFICATE":
self.validate_uri(config.get("URI"), "SERVER_ROOT_CA_CERTIFICATE")
authentication_uri_2 = config.get("URI")
else:
raise InvalidEventException(
self.relative_id,
"Invalid SourceAccessConfigurations Type provided for self managed kafka event.",
)
if (not has_vpc_subnet and has_vpc_security_group) or (has_vpc_subnet and not has_vpc_security_group):
raise InvalidEventException(
self.relative_id,
"VPC_SUBNET and VPC_SECURITY_GROUP in SourceAccessConfigurations for SelfManagedKafka must be both provided.",
)
return authentication_uri, authentication_uri_2, (has_vpc_subnet and has_vpc_security_group)