in samtranslator/model/api/api_generator.py [0:0]
def _construct_api_domain(self, rest_api):
"""
Constructs and returns the ApiGateway Domain and BasepathMapping
"""
if self.domain is None:
return None, None, None
if self.domain.get("DomainName") is None or self.domain.get("CertificateArn") is None:
raise InvalidResourceException(
self.logical_id, "Custom Domains only works if both DomainName and CertificateArn" " are provided."
)
self.domain["ApiDomainName"] = "{}{}".format(
"ApiGatewayDomainName", logical_id_generator.LogicalIdGenerator("", self.domain.get("DomainName")).gen()
)
domain = ApiGatewayDomainName(self.domain.get("ApiDomainName"), attributes=self.passthrough_resource_attributes)
domain.DomainName = self.domain.get("DomainName")
endpoint = self.domain.get("EndpointConfiguration")
if endpoint is None:
endpoint = "REGIONAL"
self.domain["EndpointConfiguration"] = "REGIONAL"
elif endpoint not in ["EDGE", "REGIONAL", "PRIVATE"]:
raise InvalidResourceException(
self.logical_id,
"EndpointConfiguration for Custom Domains must be"
" one of {}.".format(["EDGE", "REGIONAL", "PRIVATE"]),
)
if endpoint == "REGIONAL":
domain.RegionalCertificateArn = self.domain.get("CertificateArn")
else:
domain.CertificateArn = self.domain.get("CertificateArn")
domain.EndpointConfiguration = {"Types": [endpoint]}
mutual_tls_auth = self.domain.get("MutualTlsAuthentication", None)
if mutual_tls_auth:
if isinstance(mutual_tls_auth, dict):
if not set(mutual_tls_auth.keys()).issubset({"TruststoreUri", "TruststoreVersion"}):
invalid_keys = list()
for key in mutual_tls_auth.keys():
if not key in {"TruststoreUri", "TruststoreVersion"}:
invalid_keys.append(key)
invalid_keys.sort()
raise InvalidResourceException(
",".join(invalid_keys),
"Available MutualTlsAuthentication fields are {}.".format(
["TruststoreUri", "TruststoreVersion"]
),
)
domain.MutualTlsAuthentication = {}
if mutual_tls_auth.get("TruststoreUri", None):
domain.MutualTlsAuthentication["TruststoreUri"] = mutual_tls_auth["TruststoreUri"]
if mutual_tls_auth.get("TruststoreVersion", None):
domain.MutualTlsAuthentication["TruststoreVersion"] = mutual_tls_auth["TruststoreVersion"]
else:
raise InvalidResourceException(
mutual_tls_auth,
"MutualTlsAuthentication must be a map with at least one of the following fields {}.".format(
["TruststoreUri", "TruststoreVersion"]
),
)
if self.domain.get("SecurityPolicy", None):
domain.SecurityPolicy = self.domain["SecurityPolicy"]
if self.domain.get("OwnershipVerificationCertificateArn", None):
domain.OwnershipVerificationCertificateArn = self.domain["OwnershipVerificationCertificateArn"]
# Create BasepathMappings
if self.domain.get("BasePath") and isinstance(self.domain.get("BasePath"), str):
basepaths = [self.domain.get("BasePath")]
elif self.domain.get("BasePath") and isinstance(self.domain.get("BasePath"), list):
basepaths = self.domain.get("BasePath")
else:
basepaths = None
basepath_resource_list = []
if basepaths is None:
basepath_mapping = ApiGatewayBasePathMapping(
self.logical_id + "BasePathMapping", attributes=self.passthrough_resource_attributes
)
basepath_mapping.DomainName = ref(self.domain.get("ApiDomainName"))
basepath_mapping.RestApiId = ref(rest_api.logical_id)
basepath_mapping.Stage = ref(rest_api.logical_id + ".Stage")
basepath_resource_list.extend([basepath_mapping])
else:
for path in basepaths:
path = "".join(e for e in path if e.isalnum())
logical_id = "{}{}{}".format(self.logical_id, path, "BasePathMapping")
basepath_mapping = ApiGatewayBasePathMapping(
logical_id, attributes=self.passthrough_resource_attributes
)
basepath_mapping.DomainName = ref(self.domain.get("ApiDomainName"))
basepath_mapping.RestApiId = ref(rest_api.logical_id)
basepath_mapping.Stage = ref(rest_api.logical_id + ".Stage")
basepath_mapping.BasePath = path
basepath_resource_list.extend([basepath_mapping])
# Create the Route53 RecordSetGroup resource
record_set_group = None
if self.domain.get("Route53") is not None:
route53 = self.domain.get("Route53")
if route53.get("HostedZoneId") is None and route53.get("HostedZoneName") is None:
raise InvalidResourceException(
self.logical_id,
"HostedZoneId or HostedZoneName is required to enable Route53 support on Custom Domains.",
)
logical_id = logical_id_generator.LogicalIdGenerator(
"", route53.get("HostedZoneId") or route53.get("HostedZoneName")
).gen()
record_set_group = Route53RecordSetGroup(
"RecordSetGroup" + logical_id, attributes=self.passthrough_resource_attributes
)
if "HostedZoneId" in route53:
record_set_group.HostedZoneId = route53.get("HostedZoneId")
if "HostedZoneName" in route53:
record_set_group.HostedZoneName = route53.get("HostedZoneName")
record_set_group.RecordSets = self._construct_record_sets_for_domain(self.domain)
return domain, basepath_resource_list, record_set_group