in gcpdiag/lint/__init__.py [0:0]
def run_rules(self, context: models.Context, result: LintResults,
rules: Iterable[LintRule]) -> None:
rules_to_run = self.filter_runnable_rules(rules)
# Run the "prepare_rule" functions first, in a single thread.
for rule in rules_to_run:
if rule.prepare_rule_f:
logging.debug('prepare_rule_f: %s', rule)
rule.prepare_rule_f(context)
# Start multiple threads for logs fetching and prefetch functions.
executor = get_executor()
# Start fetching any logs queries that were defined in prepare_rule
# functions.
logs.execute_queries(executor)
# Start fetching any serial output logs if serial output to cloud logging
# is not enabled on the project/ instance
if config.get('enable_gce_serial_buffer'):
# execute fetch job
gce_mod.execute_fetch_serial_port_outputs(executor)
# Run the "prefetch_rule" functions with multiple worker threads to speed up
# execution of the "run_rule" executions later.
for rule in rules_to_run:
if rule.prefetch_rule_f:
rule.prefetch_rule_future = executor.submit(wrap_prefetch_rule_f,
str(rule),
rule.prefetch_rule_f,
context)
# While the prefetch_rule functions are still being executed in multiple
# threads, start executing the rules, but block and wait in case the
# prefetch for a specific rule is still running.
last_threads_dump = time.time()
for rule in rules_to_run:
rule_report = result.create_rule_report(rule)
# make sure prefetch_rule_f completed
try:
if rule.prefetch_rule_future:
if rule.prefetch_rule_future.running():
logging.info('waiting for query results (%s)', rule)
while True:
try:
rule.prefetch_rule_future.result(10)
break
except concurrent.futures.TimeoutError:
pass
if config.get('verbose') >= 2:
now = time.time()
if now - last_threads_dump > 10:
logging.debug(
'THREADS: %s',
', '.join([t.name for t in threading.enumerate()]))
last_threads_dump = now
# run the rule
assert rule.run_rule_f is not None
rule.run_rule_f(context, rule_report)
except (utils.GcpApiError, googleapiclient.errors.HttpError) as err:
if isinstance(err, googleapiclient.errors.HttpError):
err = utils.GcpApiError(err)
logging.warning('%s: %s while processing rule: %s',
type(err).__name__, err, rule)
rule_report.add_skipped(None, f'API error: {err}', None)
except (RuntimeError, ValueError, KeyError, TypeError) as err:
logging.warning('%s: %s while processing rule: %s',
type(err).__name__, err, rule)
rule_report.add_skipped(None, f'Error: {err}', None)
rule_report.finish()