def run_rule()

in gcpdiag/lint/gke/err_2021_006_scaleup_failed.py [0:0]


def run_rule(context: models.Context, report: lint.LintReportRuleInterface):
  # skip entire rule is logging disabled
  if not apis.is_enabled(context.project_id, 'logging'):
    report.add_skipped(None, 'logging api is disabled')
    return

  # Any work to do?
  clusters = gke.get_clusters(context)
  if not clusters:
    report.add_skipped(None, 'no clusters found')

  # Correlation dicts, so that we can determine resources based on log labels:
  try:
    cluster_by_mig = {}
    cluster_migs = collections.defaultdict(set)
    for c in clusters.values():
      for np in c.nodepools:
        for mig in np.instance_groups:
          cluster_by_mig[mig.name] = c
          cluster_migs[c].add(mig.name)
  except KeyError:
    pass

  # Collect errors by mig name.
  mig_errors = {}

  # Process gce_instance logs and search for VM creation errors
  for query in gce_logs_by_project.values():
    for log_entry in query.entries:
      try:
        # Filter out non-relevant log entries.
        if log_entry['severity']!='ERROR' or \
          log_entry['protoPayload']['methodName']!='v1.compute.instances.insert' or \
          log_entry['protoPayload']['requestMetadata']['callerSuppliedUserAgent']!= \
          'GCE Managed Instance Group for GKE':
          continue
        # Determine mig name.
        m = re.search(r'/instances/([^/]+)$',
                      log_entry['protoPayload']['resourceName'])
        if not m:
          continue
        instance_name = m.group(1)
        # pylint: disable=cell-var-from-loop
        mig = list(
            filter(lambda x: is_mig_instance(x, instance_name),
                   cluster_by_mig.keys()))
        if not mig:
          continue
        if log_entry['protoPayload']['status']['message'] == 'LIMIT_EXCEEDED':
          mig_errors[mig[0]] = 'LIMIT_EXCEEDED, possibly IP exhaustion'
        else:
          mig_errors[mig[0]] = log_entry['protoPayload']['status']['message']
      except KeyError:
        pass

  # Process cluster autoscaler logs
  for query in ca_logs_by_project.values():
    for log_entry in query.entries:
      try:
        for r in log_entry['jsonPayload']['resultInfo']['results']:
          if r['errorMsg']['messageId'].startswith('scale.up.error'):
            for p in r['errorMsg']['parameters']:
              m = re.search(r'/instanceGroups/([^/]+)$', p)
              if m:
                mig_errors.setdefault(m.group(1), r['errorMsg']['messageId'])
      except KeyError:
        pass

  # Create the report.
  for _, c in sorted(clusters.items()):
    cluster_mig_errors = cluster_migs.get(c,
                                          set()).intersection(mig_errors.keys())
    if cluster_mig_errors:
      report.add_failed(
          c, 'Scale up failures detected on managed instance groups:\n. '+\
              '\n. '.join(f'{mig} ({mig_errors[mig]})' for mig in cluster_mig_errors))
    else:
      report.add_ok(c)