def execute()

in gcpdiag/runbook/lb/ssl_certificates.py [0:0]


  def execute(self):
    """Checks if the SSL certificate is attached to a target proxy."""
    certificate = lb.get_ssl_certificate(self.project_id, self.certificate_name)

    try:
      target_https_proxies = lb.get_target_https_proxies(self.project_id)
      target_ssl_proxies = lb.get_target_ssl_proxies(self.project_id)
    except googleapiclient.errors.HttpError as e:
      op.add_skipped(
          certificate,
          reason=f'Target proxies could not be fetched: {e}',
      )
      return

    target_proxies_with_certificate = []
    for target_proxy in target_https_proxies + target_ssl_proxies:
      if certificate.self_link in target_proxy.ssl_certificates:
        target_proxies_with_certificate.append(target_proxy)

    if not target_proxies_with_certificate:
      op.add_failed(
          certificate,
          reason=op.prep_msg(op.FAILURE_REASON, name=self.certificate_name),
          remediation=op.prep_msg(op.FAILURE_REMEDIATION),
      )
      return

    try:
      forwarding_rules = lb.get_forwarding_rules(self.project_id)
    except ValueError as e:
      op.add_skipped(
          certificate,
          reason=f'Target proxies could not be fetched: {e}',
      )
      return

    forwarding_rules_by_target_proxy = {}
    for fr in forwarding_rules:
      forwarding_rules_by_target_proxy.setdefault(fr.target, []).append(fr)

    # Filter out target proxies that are not in use by any forwarding rules
    used_target_proxies_with_certificate = [
        tp for tp in target_proxies_with_certificate
        if forwarding_rules_by_target_proxy.get(tp.full_path)
    ]

    if not used_target_proxies_with_certificate:
      op.add_failed(
          certificate,
          reason=
          ('The SSL certificate is attached to target proxies:'
           f" {', '.join([tp.full_path for tp in target_proxies_with_certificate])} that"
           ' are not in use by any forwarding rules.'),
          remediation='Please attach the target proxies to forwarding rules',
      )
      return

    # Gather forwarding rules that use target proxies with the given certificate
    forwarding_rules_with_certificate = []
    for tp in used_target_proxies_with_certificate:
      rules = forwarding_rules_by_target_proxy.get(tp.full_path)
      if rules:
        forwarding_rules_with_certificate.extend(rules)

    op.add_ok(
        certificate,
        reason=op.prep_msg(
            op.SUCCESS_REASON,
            name=self.certificate_name,
            target_proxies=', '.join(
                [tp.full_path for tp in used_target_proxies_with_certificate]),
        ),
    )

    for domain in certificate.domain_status.keys():
      if certificate.domain_status[domain] != 'ACTIVE':
        verify_dns_records = VerifyDnsRecords()
        verify_dns_records.project_id = self.project_id
        verify_dns_records.domain = domain
        verify_dns_records.certificate_name = self.certificate_name
        verify_dns_records.forwarding_rules_with_certificate = (
            forwarding_rules_with_certificate)
        self.add_child(verify_dns_records)

    verify_forwarding_rules_port = VerifyForwardingRulesPort()
    verify_forwarding_rules_port.project_id = self.project_id
    verify_forwarding_rules_port.certificate_name = self.certificate_name
    verify_forwarding_rules_port.forwarding_rules_with_certificate = (
        forwarding_rules_with_certificate)
    self.add_child(verify_forwarding_rules_port)

    verify_no_certificate_map_conflict = VerifyNoCertificateMapConflict()
    verify_no_certificate_map_conflict.project_id = self.project_id
    verify_no_certificate_map_conflict.certificate_name = self.certificate_name
    verify_no_certificate_map_conflict.target_proxies_with_certificate = (
        target_proxies_with_certificate)
    self.add_child(verify_no_certificate_map_conflict)

    check_provisioning_time = CheckProvisioningTime()
    check_provisioning_time.project_id = self.project_id
    check_provisioning_time.certificate_name = self.certificate_name
    check_provisioning_time.target_proxies_with_certificate = (
        target_proxies_with_certificate)
    check_provisioning_time.forwarding_rules_with_certificate = (
        forwarding_rules_with_certificate)
    self.add_child(check_provisioning_time)