gcpdiag/runbook/output/terminal_output.py (238 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=cyclic-import """ Output implementation that prints result in human-readable format. """ import logging import os import sys import textwrap import threading from typing import Any, Optional, TextIO import blessings # pylint: disable=unused-import (lint is used in type hints) from gcpdiag import config, models, runbook from gcpdiag.runbook import constants from gcpdiag.runbook.flags import INTERACTIVE_MODE from gcpdiag.runbook.output.base_output import BaseOutput OUTPUT_WIDTH = 68 def is_cloud_shell(): return os.getenv('CLOUD_SHELL') def emoji_wrap(char): if is_cloud_shell(): # emoji not displayed as double width in Cloud Shell (bug?) return char + ' ' else: return char class TerminalOutput(BaseOutput): """ Output implementation that prints result in human-readable format. """ file: TextIO show_ok: bool show_skipped: bool log_info_for_progress_only: bool lock: threading.Lock line_unfinished: bool term: blessings.Terminal def __init__(self, file: TextIO = sys.stdout, log_info_for_progress_only: bool = True, show_ok: bool = True, show_skipped: bool = True): self.file = file self.show_ok = show_ok self.show_skipped = show_skipped self.log_info_for_progress_only = log_info_for_progress_only self.lock = threading.Lock() self.line_unfinished = False self.term = blessings.Terminal() def display_banner(self) -> None: if self.term.does_styling: print(self.term.bold(f"gcpdiag {emoji_wrap('🩺')} {config.VERSION}\n")) else: print(f'gcpdiag {config.VERSION}\n', file=sys.stderr) def display_header(self) -> None: print('Starting runbook inspection [Alpha Release]\n', file=sys.stderr) def display_runbook_description(self, tree): self.terminal_print_line(f'{self.term.yellow(tree.name)}: {tree.__doc__}') def display_footer(self, result) -> None: totals = result.get_totals_by_status() state_strs = [ f'{totals.get(state, 0)} {state}' for state in ['skipped', 'ok', 'failed', 'uncertain'] ] print(f"Rules summary: {', '.join(state_strs)}", file=sys.stderr) def get_logging_handler(self) -> logging.Handler: return _LoggingHandler(self) def print_line(self, text: str = '') -> None: """Write a line to the desired output provided as self.file.""" print(text, file=self.file, flush=True) def _wrap_indent(self, text: str, prefix: str) -> str: width = self.term.width or 80 width = min(width, 80) return textwrap.indent(textwrap.fill(text, width - len(prefix)), prefix) def _italic(self, text: str) -> str: if is_cloud_shell(): # TODO(b/201958597): Cloud Shell with tmux doesn't format italic properly at the moment return text else: return self.term.italic(text) def terminal_update_line(self, text: str) -> None: """Update the current line on the terminal.""" if self.term.width: print(self.term.move_x(0) + self.term.clear_eol() + text, end='', flush=True, file=self.file) self.line_unfinished = True else: # If it's a stream, do not output anything, assuming that the # interesting output will be passed via terminal_print_line pass def terminal_erase_line(self) -> None: """Remove the current content on the line.""" if self.line_unfinished and self.term.width: print(self.term.move_x(0) + self.term.clear_eol(), flush=True, end='', file=self.file) self.line_unfinished = False def terminal_print_line(self, text: str = '') -> None: """Write a line to the terminal, replacing any current line content, and add a line feed.""" if self.term.width: self.terminal_update_line(text) print(file=sys.stdout) else: print(text, file=sys.stdout) # flush the output, so that we can more easily grep, tee, etc. sys.stdout.flush() self.line_unfinished = False def _print_rule_header(self, rule: 'runbook.DiagnosticTree') -> None: bullet = '' if self.term.does_styling: bullet = emoji_wrap('🔎') + ' ' else: bullet = '* ' self.terminal_print_line(bullet + self.term.yellow(rule.name)) def _print_long_desc(self, rule: 'runbook.DiagnosticTree') -> None: self.terminal_print_line() self.terminal_print_line( self._italic(self._wrap_indent(rule.__doc__ or '', ' '))) self.terminal_print_line() self.terminal_print_line(' ' + rule.doc_url) def print_skipped(self, resource: Optional[models.Resource], reason: str, remediation: str = None) -> None: short_path = resource.short_path if resource is not None \ and resource.short_path is not None else '' self.terminal_print_line() self.terminal_print_line(' - ' + short_path.ljust(OUTPUT_WIDTH) + ' [' + self.term.yellow('SKIP') + ']') if reason: self.terminal_print_line(' [' + self.term.green('REASON') + ']') self.terminal_print_line(textwrap.indent(reason, ' ')) if remediation: self.terminal_print_line(' [' + self.term.green('REMEDIATION') + ']') self.terminal_print_line(textwrap.indent(remediation, ' ')) def print_ok(self, resource: models.Resource, reason: str = '') -> None: if not self.show_ok: return short_path = resource.short_path if resource is not None \ and resource.short_path is not None else '' self.terminal_print_line() self.terminal_print_line(' - ' + short_path.ljust(OUTPUT_WIDTH) + ' [' + self.term.green('OK') + ']') if reason: self.terminal_print_line(' [' + self.term.green('REASON') + ']') self.terminal_print_line(textwrap.indent(reason, ' ')) def print_failed(self, resource: models.Resource, reason: str, remediation: str) -> None: """Output test result and registers the result to be used in the runbook report. The failure assigned a human task unless program is running autonomously """ short_path = resource.short_path if resource is not None \ and resource.short_path is not None else '' self.terminal_print_line() self.terminal_print_line(' - ' + short_path.ljust(OUTPUT_WIDTH) + ' [' + self.term.red('FAIL') + ']') if reason: self.terminal_print_line(' [' + self.term.green('REASON') + ']') self.terminal_print_line(textwrap.indent(f'{reason}', ' ')) if remediation: self.terminal_print_line(' [' + self.term.green('REMEDIATION') + ']') self.terminal_print_line(textwrap.indent(f'{remediation}', ' ')) def print_uncertain(self, resource: models.Resource, reason: str, remediation: str = None) -> None: short_path = resource.short_path if resource is not None \ and resource.short_path is not None else '' self.terminal_print_line() self.terminal_print_line(' - ' + short_path.ljust(OUTPUT_WIDTH) + ' [' + self.term.yellow('UNCERTAIN') + ']') if reason: self.terminal_print_line(' [' + self.term.green('REASON') + ']') self.terminal_print_line(textwrap.indent(reason, ' ')) if remediation: self.terminal_print_line(' [' + self.term.green('REMEDIATION') + ']') self.terminal_print_line(textwrap.indent(f'{remediation}', ' ')) def info(self, message: str, step_type='INFO'): """ For informational update and getting a response from user """ self.terminal_print_line(text='' + '[' + self.term.green(step_type) + ']: ' + f'{message}') def prompt(self, message: str, kind: str = '', options: dict = None, choice_msg: str = 'Choose an option: ', non_interactive: bool = None) -> Any: """ For informational update and getting a response from user """ non_interactive = non_interactive or config.get(INTERACTIVE_MODE) if non_interactive: return self.terminal_print_line(text='' + '[' + self.term.green(kind) + ']: ' + f'{message}') self.default_answer = False self.answer = None options_text = '\n' try: if kind in constants.HUMAN_TASK and not options: for option, description in constants.HUMAN_TASK_OPTIONS.items(): options_text += '[' + self.term.green( f'{option}') + ']' + f' - {description}\n' if kind in constants.CONFIRMATION and not options: for option, description in constants.CONFIRMATION_OPTIONS.items(): options_text += '[' + self.term.green( f'{option}') + ']' + f' - {description}\n' if (kind in constants.CONFIRMATION or kind in constants.HUMAN_TASK) \ and options: for option, description in options.items(): options_text += '[' + self.term.green( f'{option}') + ']' + f' - {description}\n' if options_text: self.terminal_print_line(text=textwrap.indent(options_text, ' ')) self.answer = input(textwrap.indent(choice_msg, ' ')) except EOFError: return self.answer # pylint:disable=g-explicit-bool-comparison, We explicitly want to # distinguish between empty string and None. if self.answer == '': # User just hit enter, return default. return self.default_answer elif self.answer is None: return self.answer elif self.answer.strip().lower() in ['s', 'stop']: return constants.STOP elif self.answer.strip().lower() in ['c', 'continue']: return constants.CONTINUE elif self.answer.strip().lower() in ['u', 'uncertain']: return constants.UNCERTAIN elif self.answer.strip().lower() in ['r', 'retest']: return constants.RETEST elif self.answer.strip().lower() in ['y', 'yes']: return constants.YES elif self.answer.strip().lower() in ['n', 'no']: return constants.NO elif self.answer.strip().lower() not in [ 's', 'stop', 'c', 'continue', 'r', 'retest' ]: return self.answer.strip() return class _LoggingHandler(logging.Handler): """logging.Handler implementation used when producing a runbook report.""" output: TerminalOutput def __init__(self, output: TerminalOutput) -> None: super().__init__() self.output = output def format(self, record: logging.LogRecord) -> str: return record.getMessage() def emit(self, record: logging.LogRecord) -> None: if record.levelno == logging.INFO and self.output.log_info_for_progress_only: msg = ' ... ' + self.format(record) # make sure we don't go beyond the terminal width if self.output.term.width: term_overflow = len(msg) - self.output.term.width if term_overflow > 0: msg = msg[:-term_overflow] with self.output.lock: self.output.terminal_update_line(msg) else: msg = f'[{record.levelname}] ' + self.format(record) + ' ' # workaround for bug: # https://github.com/googleapis/google-api-python-client/issues/1116 if 'Invalid JSON content from response' in msg: return with self.output.lock: self.output.terminal_print_line(msg)