def run_rules()

in gcpdiag/lint/__init__.py [0:0]


  def run_rules(self, context: models.Context, result: LintResults,
                rules: Iterable[LintRule]) -> None:

    rules_to_run = self.filter_runnable_rules(rules)

    # Run the "prepare_rule" functions first, in a single thread.
    for rule in rules_to_run:
      if rule.prepare_rule_f:
        logging.debug('prepare_rule_f: %s', rule)
        rule.prepare_rule_f(context)

    # Start multiple threads for logs fetching and prefetch functions.
    executor = get_executor()
    # Start fetching any logs queries that were defined in prepare_rule
    # functions.
    logs.execute_queries(executor)
    # Start fetching any serial output logs if serial output to cloud logging
    # is not enabled on the project/ instance
    if config.get('enable_gce_serial_buffer'):
      # execute fetch job
      gce_mod.execute_fetch_serial_port_outputs(executor)

    # Run the "prefetch_rule" functions with multiple worker threads to speed up
    # execution of the "run_rule" executions later.
    for rule in rules_to_run:
      if rule.prefetch_rule_f:
        rule.prefetch_rule_future = executor.submit(wrap_prefetch_rule_f,
                                                    str(rule),
                                                    rule.prefetch_rule_f,
                                                    context)

    # While the prefetch_rule functions are still being executed in multiple
    # threads, start executing the rules, but block and wait in case the
    # prefetch for a specific rule is still running.
    last_threads_dump = time.time()
    for rule in rules_to_run:
      rule_report = result.create_rule_report(rule)

      # make sure prefetch_rule_f completed
      try:
        if rule.prefetch_rule_future:
          if rule.prefetch_rule_future.running():
            logging.info('waiting for query results (%s)', rule)
          while True:
            try:
              rule.prefetch_rule_future.result(10)
              break
            except concurrent.futures.TimeoutError:
              pass
            if config.get('verbose') >= 2:
              now = time.time()
              if now - last_threads_dump > 10:
                logging.debug(
                    'THREADS: %s',
                    ', '.join([t.name for t in threading.enumerate()]))
                last_threads_dump = now
        # run the rule
        assert rule.run_rule_f is not None
        rule.run_rule_f(context, rule_report)
      except (utils.GcpApiError, googleapiclient.errors.HttpError) as err:
        if isinstance(err, googleapiclient.errors.HttpError):
          err = utils.GcpApiError(err)
        logging.warning('%s: %s while processing rule: %s',
                        type(err).__name__, err, rule)
        rule_report.add_skipped(None, f'API error: {err}', None)
      except (RuntimeError, ValueError, KeyError, TypeError) as err:
        logging.warning('%s: %s while processing rule: %s',
                        type(err).__name__, err, rule)
        rule_report.add_skipped(None, f'Error: {err}', None)
      rule_report.finish()