in gcpdiag/runbook/lb/ssl_certificates.py [0:0]
def execute(self):
"""Checks if the SSL certificate is attached to a target proxy."""
certificate = lb.get_ssl_certificate(self.project_id, self.certificate_name)
try:
target_https_proxies = lb.get_target_https_proxies(self.project_id)
target_ssl_proxies = lb.get_target_ssl_proxies(self.project_id)
except googleapiclient.errors.HttpError as e:
op.add_skipped(
certificate,
reason=f'Target proxies could not be fetched: {e}',
)
return
target_proxies_with_certificate = []
for target_proxy in target_https_proxies + target_ssl_proxies:
if certificate.self_link in target_proxy.ssl_certificates:
target_proxies_with_certificate.append(target_proxy)
if not target_proxies_with_certificate:
op.add_failed(
certificate,
reason=op.prep_msg(op.FAILURE_REASON, name=self.certificate_name),
remediation=op.prep_msg(op.FAILURE_REMEDIATION),
)
return
try:
forwarding_rules = lb.get_forwarding_rules(self.project_id)
except ValueError as e:
op.add_skipped(
certificate,
reason=f'Target proxies could not be fetched: {e}',
)
return
forwarding_rules_by_target_proxy = {}
for fr in forwarding_rules:
forwarding_rules_by_target_proxy.setdefault(fr.target, []).append(fr)
# Filter out target proxies that are not in use by any forwarding rules
used_target_proxies_with_certificate = [
tp for tp in target_proxies_with_certificate
if forwarding_rules_by_target_proxy.get(tp.full_path)
]
if not used_target_proxies_with_certificate:
op.add_failed(
certificate,
reason=
('The SSL certificate is attached to target proxies:'
f" {', '.join([tp.full_path for tp in target_proxies_with_certificate])} that"
' are not in use by any forwarding rules.'),
remediation='Please attach the target proxies to forwarding rules',
)
return
# Gather forwarding rules that use target proxies with the given certificate
forwarding_rules_with_certificate = []
for tp in used_target_proxies_with_certificate:
rules = forwarding_rules_by_target_proxy.get(tp.full_path)
if rules:
forwarding_rules_with_certificate.extend(rules)
op.add_ok(
certificate,
reason=op.prep_msg(
op.SUCCESS_REASON,
name=self.certificate_name,
target_proxies=', '.join(
[tp.full_path for tp in used_target_proxies_with_certificate]),
),
)
for domain in certificate.domain_status.keys():
if certificate.domain_status[domain] != 'ACTIVE':
verify_dns_records = VerifyDnsRecords()
verify_dns_records.project_id = self.project_id
verify_dns_records.domain = domain
verify_dns_records.certificate_name = self.certificate_name
verify_dns_records.forwarding_rules_with_certificate = (
forwarding_rules_with_certificate)
self.add_child(verify_dns_records)
verify_forwarding_rules_port = VerifyForwardingRulesPort()
verify_forwarding_rules_port.project_id = self.project_id
verify_forwarding_rules_port.certificate_name = self.certificate_name
verify_forwarding_rules_port.forwarding_rules_with_certificate = (
forwarding_rules_with_certificate)
self.add_child(verify_forwarding_rules_port)
verify_no_certificate_map_conflict = VerifyNoCertificateMapConflict()
verify_no_certificate_map_conflict.project_id = self.project_id
verify_no_certificate_map_conflict.certificate_name = self.certificate_name
verify_no_certificate_map_conflict.target_proxies_with_certificate = (
target_proxies_with_certificate)
self.add_child(verify_no_certificate_map_conflict)
check_provisioning_time = CheckProvisioningTime()
check_provisioning_time.project_id = self.project_id
check_provisioning_time.certificate_name = self.certificate_name
check_provisioning_time.target_proxies_with_certificate = (
target_proxies_with_certificate)
check_provisioning_time.forwarding_rules_with_certificate = (
forwarding_rules_with_certificate)
self.add_child(check_provisioning_time)