azdev/operations/cmdcov/cmdcov.py (332 lines of code) (raw):

# ----------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ----------------------------------------------------------------------------- import os import shutil import sys import time import requests import yaml from jinja2 import FileSystemLoader, Environment from knack.log import get_logger from tqdm import tqdm from azdev.operations.regex import get_all_tested_commands_from_regex from azdev.utilities.path import get_azdev_repo_path, get_cli_repo_path, find_files logger = get_logger(__name__) try: with open(os.path.join(get_cli_repo_path(), 'scripts', 'ci', 'cmdcov.yml'), 'r') as file: config = yaml.safe_load(file) # pylint: disable=broad-exception-caught except Exception: url = "https://raw.githubusercontent.com/Azure/azure-cli/dev/scripts/ci/cmdcov.yml" response = requests.get(url) config = yaml.safe_load(response.text) ENCODING = config['ENCODING'] GLOBAL_PARAMETERS = config['GLOBAL_PARAMETERS'] GENERIC_UPDATE_PARAMETERS = config['GENERIC_UPDATE_PARAMETERS'] WAIT_CONDITION_PARAMETERS = config['WAIT_CONDITION_PARAMETERS'] OTHER_PARAMETERS = config['OTHER_PARAMETERS'] RED = config['RED'] ORANGE = config['ORANGE'] GREEN = config['GREEN'] BLUE = config['BLUE'] GOLD = config['GOLD'] RED_PCT = config['RED_PCT'] ORANGE_PCT = config['ORANGE_PCT'] GREEN_PCT = config['GREEN_PCT'] BLUE_PCT = config['BLUE_PCT'] CLI_OWN_MODULES = config['CLI_OWN_MODULES'] EXCLUDE_COMMANDS = config['EXCLUDE_COMMANDS'] GLOBAL_EXCLUDE_COMMANDS = config['GLOBAL_EXCLUDE_COMMANDS'] # pylint: disable=too-many-instance-attributes class CmdcovManager: def __init__(self, selected_mod_names=None, selected_mod_paths=None, loaded_help=None, level=None, enable_cli_own=None, exclusions=None): self.selected_mod_names = selected_mod_names self.selected_mod_paths = selected_mod_paths self.loaded_help = loaded_help self.level = level self.enable_cli_own = enable_cli_own self.all_commands = {m: [] for m in self.selected_mod_names} self.all_tested_commands = {m: [] for m in self.selected_mod_names} self.all_untested_commands = {} self.command_test_coverage = {'Total': [0, 0, 0]} self.date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self.report_date = '-'.join(self.date.replace(':', '-').split()) self.cmdcov_path = os.path.dirname(__file__) self.exclusions = exclusions self.width = 60 self.fillchar = '-' def run(self): self._get_all_commands() self._get_all_tested_commands_from_regex() self._get_all_tested_commands_from_record() self._run_command_test_coverage() html_file = self._render_html() if self.enable_cli_own: command_test_coverage = {k: v for k, v in self.command_test_coverage.items() if k in CLI_OWN_MODULES} total_tested = 0 total_untested = 0 command_test_coverage['Total'] = [0, 0, 0] for module in command_test_coverage.keys(): total_tested += command_test_coverage[module][0] if command_test_coverage[module] else 0 total_untested += command_test_coverage[module][1] if command_test_coverage[module] else 0 command_test_coverage['Total'][0] = total_tested command_test_coverage['Total'][1] = total_untested command_test_coverage['Total'][2] = f'{total_tested / (total_tested + total_untested):.3%}' self._render_cli_html(command_test_coverage) self._browse(html_file) def _get_all_commands(self): """ GLOBAL_EXCLUDE_COMMANDS: List[str] EXCLUDE_COMMANDS: Dict[str: List[str]] exclusions_comands: List[str] exclude_parameters: List[List[str]] exclusions_parameters: List[Tuple[str, str]] get all commands from loaded_help """ exclude_parameters = [] exclude_parameters += GLOBAL_PARAMETERS + GENERIC_UPDATE_PARAMETERS + WAIT_CONDITION_PARAMETERS + \ OTHER_PARAMETERS exclude_parameters = [sorted(i) for i in exclude_parameters] # some module like vm have multiple command like vm vmss disk snapshot ... # pylint: disable=too-many-nested-blocks, too-many-boolean-expressions exclusions_comands = [] exclusions_parameters = [] for c, v in self.exclusions.items(): if 'parameters' in v: for p, r in v['parameters'].items(): if 'missing_parameter_test_coverage' in r['rule_exclusions']: exclusions_parameters.append((c, p)) elif 'rule_exclusions' in v: if 'missing_command_test_coverage' in v['rule_exclusions']: exclusions_comands.append(c) print("\033[31m" + "Get all commands".center(self.width, self.fillchar) + "\033[0m") time.sleep(0.1) for _, y in tqdm(self.loaded_help.items()): module = None if hasattr(y, 'command_source') and y.command_source in self.selected_mod_names: module = y.command_source elif hasattr(y, 'command_source') and hasattr(y.command_source, 'extension_name'): ext_name = y.command_source.extension_name full_ext_name = 'azext_' + ext_name.replace('-', '_') if ext_name in self.selected_mod_names: module = ext_name elif full_ext_name in self.selected_mod_names: module = full_ext_name else: continue if (not y.deprecate_info) and module: if y.command.split()[-1] not in GLOBAL_EXCLUDE_COMMANDS and \ y.command not in EXCLUDE_COMMANDS.get(module, []) and \ y.command not in exclusions_comands: if self.level == 'argument': for parameter in y.parameters: # TODO support linter_exclusions.yml if sorted(parameter.name_source) not in exclude_parameters: opt_list = [opt for opt in parameter.name_source if opt.startswith('-')] if opt_list: self.all_commands[module].append(f'{y.command} {opt_list}') else: self.all_commands[module].append(f'{y.command}') def _get_all_tested_commands_from_regex(self): """ get all tested commands from test_*.py """ # pylint: disable=too-many-nested-blocks print("\033[31m" + "Get tested commands from regex".center(self.width, self.fillchar) + "\033[0m") time.sleep(0.1) for idx, path in enumerate(tqdm(self.selected_mod_paths)): if 'azure-cli-extensions' in path: for dirname in os.listdir(path): if dirname.startswith('azext'): test_dir = os.path.join(path, dirname, 'tests') break else: test_dir = os.path.join(path, 'tests') files = find_files(test_dir, '*.py') for f in files: with open(os.path.join(test_dir, f), 'r', encoding=ENCODING) as f: lines = f.readlines() ref = get_all_tested_commands_from_regex(lines) self.all_tested_commands[self.selected_mod_names[idx]] += ref def _get_all_tested_commands_from_record(self): """ get all tested commands from recording files """ print("\033[31m" + "Get tested commands from recording files".center(self.width, self.fillchar) + "\033[0m") time.sleep(0.1) for idx, path in enumerate(tqdm(self.selected_mod_paths)): if 'azure-cli-extensions' in path: for dirname in os.listdir(path): if dirname.startswith('azext'): test_dir = os.path.join(path, dirname, 'tests') break else: test_dir = os.path.join(path, 'tests') files = find_files(test_dir, 'test*.yaml') for f in files: with open(os.path.join(test_dir, f)) as f: # safe_load can not determine a constructor for the tag: !!python/unicode records = yaml.load(f, Loader=yaml.Loader) or {} for record in records['interactions']: # ['acr agentpool create'] command = record['request']['headers'].get('CommandName', [''])[0] # ['-n -r'] argument = record['request']['headers'].get('ParameterSetName', [''])[0] if command or argument: cmd = command + ' ' + argument self.all_tested_commands[self.selected_mod_names[idx]].append(cmd) def _run_command_test_coverage(self): """ all_commands: All commands that need to be test all_tested_commands: All commands already tested command_test_coverage: {{module1: pct}, {module2: pct}} module: vm pct: xx.xxx% """ import ast for module in self.all_commands.keys(): self.command_test_coverage[module] = [] self.all_untested_commands[module] = [] # pylint: disable=too-many-nested-blocks for module in self.all_commands.keys(): count = 0 for command in self.all_commands[module]: exist_flag = False prefix = command.rsplit('[', maxsplit=1)[0] opt_list = ast.literal_eval('[' + command.rsplit('[', maxsplit=1)[1]) if self.level == 'argument' \ else [] for cmd in self.all_tested_commands[module]: if prefix in cmd or \ module == 'rdbms' and prefix.split(maxsplit=1)[1] in cmd: if self.level == 'argument': for opt in opt_list: if opt in cmd: count += 1 exist_flag = True if exist_flag: break else: count += 1 exist_flag = True if exist_flag: break if exist_flag: break else: self.all_untested_commands[module].append(command) try: self.command_test_coverage[module] = [count, len(self.all_untested_commands[module]), f'{count / len(self.all_commands[module]):.3%}'] except ZeroDivisionError: self.command_test_coverage[module] = [0, 0, 'N/A'] self.command_test_coverage['Total'][0] += count self.command_test_coverage['Total'][1] += len(self.all_untested_commands[module]) self.command_test_coverage['Total'][2] = f'''{self.command_test_coverage["Total"][0] / (self.command_test_coverage["Total"][0] + self.command_test_coverage["Total"][1]):.3%}''' logger.warning(self.command_test_coverage) return self.command_test_coverage def _render_html(self): """ :return: Return a HTML string """ html_path = self.get_html_path() description = 'Command' if self.level == 'command' else 'Command Argument' j2_loader = FileSystemLoader(self.cmdcov_path) env = Environment(loader=j2_loader) j2_tmpl = env.get_template('./index.j2') for item in self.command_test_coverage.values(): color, percentage = self._get_color(item) item.append({'color': color, 'percentage': percentage}) total = self.command_test_coverage.pop('Total') content = j2_tmpl.render(description=description, enable_cli_own=self.enable_cli_own, date=self.date, Total=total, command_test_coverage=self.command_test_coverage) index_html = os.path.join(html_path, 'index.html') with open(index_html, 'w', encoding=ENCODING) as f: f.write(content) # render child html print("\033[31m" + "Render test coverage report".center(self.width, self.fillchar) + "\033[0m") time.sleep(0.1) for module, coverage in tqdm(self.command_test_coverage.items()): if coverage: self._render_child_html(module, coverage, self.all_untested_commands[module]) # copy source css_source = os.path.join(self.cmdcov_path, 'component.css') ico_source = os.path.join(self.cmdcov_path, 'favicon.ico') js_source = os.path.join(self.cmdcov_path, 'component.js') try: shutil.copy(css_source, html_path) shutil.copy(ico_source, html_path) shutil.copy(js_source, html_path) except IOError as e: logger.error("Unable to copy file %s", e) except Exception: # pylint: disable=broad-except logger.error("Unexpected error: %s", sys.exc_info()) return index_html def _render_cli_html(self, command_test_coverage): """ render cli own html string """ html_path = self.get_html_path() description = 'Command' if self.level == 'command' else 'Command Argument' j2_loader = FileSystemLoader(self.cmdcov_path) env = Environment(loader=j2_loader) j2_tmpl = env.get_template('./index2.j2') for module, item in command_test_coverage.items(): color, percentage = self._get_color(item) command_test_coverage[module].append({'color': color, 'percentage': percentage}) total = command_test_coverage.pop('Total') content = j2_tmpl.render(description=description, date=self.date, Total=total, command_test_coverage=command_test_coverage) index_html = os.path.join(html_path, 'index2.html') with open(index_html, 'w', encoding=ENCODING) as f: f.write(content) def _render_child_html(self, module, coverage, untested_commands): """ render every module html """ html_path = self.get_html_path() j2_loader = FileSystemLoader(self.cmdcov_path) env = Environment(loader=j2_loader) j2_tmpl = env.get_template('./module.j2') content = j2_tmpl.render(module=module, enable_cli_own=self.enable_cli_own, date=self.date, coverage=coverage, untested_commands=untested_commands) with open(f'{html_path}/{module}.html', 'w', encoding=ENCODING) as f: f.write(content) @staticmethod def _get_color(coverage): """ :param coverage: :return: color and percentage """ percentage = int(round(float(coverage[2][:-1]), 0)) if coverage[2] != 'N/A' else coverage[2] if percentage == 'N/A': color = 'N/A' elif percentage < RED_PCT: color = RED elif percentage < ORANGE_PCT: color = ORANGE elif percentage < GREEN_PCT: color = GREEN elif percentage < BLUE_PCT: color = BLUE else: color = GOLD return color, percentage def get_html_path(self): """ :return: html_path """ root_path = get_azdev_repo_path() html_path = os.path.join(root_path, 'cmd_coverage', self.level, f'{self.report_date}') if not os.path.exists(html_path): os.makedirs(html_path) return html_path @staticmethod def get_container_name(): """ Generate container name in storage account. It is also an identifier of the pipeline run. :return: """ import datetime import random import string logger.warning('Enter get_container_name()') container_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') random_id = ''.join(random.choice(string.digits) for _ in range(6)) name = container_time + '-' + random_id logger.warning('Exit get_container_name()') return name @staticmethod def upload_files(container, html_path, account_key): """ Upload html and json files to container """ logger.warning('Enter upload_files()') # Create container cmd = 'az storage container create -n {} --account-name clitestresultstac --account-key {}' \ ' --public-access container'.format(container, account_key) os.system(cmd) # Upload files for root, dirs, files in os.walk(html_path): logger.debug(dirs) for name in files: if name.endswith('html') or name.endswith('css'): fullpath = os.path.join(root, name) cmd = 'az storage blob upload -f {} -c {} -n {} --account-name clitestresultstac' cmd = cmd.format(fullpath, container, name) logger.warning('Running: %s', cmd) os.system(cmd) logger.warning('Exit upload_files()') @staticmethod def _browse(uri, browser_name=None): # throws ImportError, webbrowser.Error """Browse uri with named browser. Default browser is customizable by $BROWSER""" def is_wsl(): # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364 # Run `uname -a` to get 'release' without python # - WSL 1: '4.4.0-19041-Microsoft' # - WSL 2: '4.19.128-microsoft-standard' import platform uname = platform.uname() platform_name = getattr(uname, 'system', uname[0]).lower() release = getattr(uname, 'release', uname[2]).lower() return platform_name == 'linux' and 'microsoft' in release import webbrowser # Lazy import. Some distro may not have this. if browser_name: browser_opened = webbrowser.get(browser_name).open(uri) else: # This one can survive BROWSER=nonexist, while get(None).open(...) can not browser_opened = webbrowser.open(uri) logger.warning(uri) # In WSL which doesn't have www-browser, try launching browser with PowerShell if not browser_opened and is_wsl(): try: import subprocess # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe # Ampersand (&) should be quoted exit_code = subprocess.call( ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(uri)]) browser_opened = exit_code == 0 except FileNotFoundError: # WSL might be too old pass return browser_opened