azdev/operations/style.py (192 lines of code) (raw):

# ----------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ----------------------------------------------------------------------------- from glob import glob import multiprocessing import os import sys from knack.log import get_logger from knack.util import CLIError, CommandResultItem from azdev.utilities import ( display, heading, py_cmd, get_path_table, EXTENSION_PREFIX, get_azdev_config, get_azdev_config_dir, require_azure_cli, filter_by_git_diff) logger = get_logger(__name__) # pylint: disable=too-many-statements def check_style(modules=None, pylint=False, pep8=False, git_source=None, git_target=None, git_repo=None): heading('Style Check') # allow user to run only on CLI or extensions cli_only = modules == ['CLI'] ext_only = modules == ['EXT'] if cli_only or ext_only: modules = None selected_modules = get_path_table(include_only=modules) # remove these two non-modules selected_modules['core'].pop('azure-cli-nspkg', None) selected_modules['core'].pop('azure-cli-command_modules-nspkg', None) pep8_result = None pylint_result = None if pylint: try: require_azure_cli() except CLIError: raise CLIError('usage error: --pylint requires Azure CLI to be installed.') if cli_only: ext_names = None selected_modules['ext'] = {} if ext_only: mod_names = None selected_modules['mod'] = {} selected_modules['core'] = {} # filter down to only modules that have changed based on git diff selected_modules = filter_by_git_diff(selected_modules, git_source, git_target, git_repo) if not any(selected_modules.values()): raise CLIError('No modules selected.') mod_names = list(selected_modules['mod'].keys()) + list(selected_modules['core'].keys()) ext_names = list(selected_modules['ext'].keys()) if mod_names: display('Modules: {}\n'.format(', '.join(mod_names))) if ext_names: display('Extensions: {}\n'.format(', '.join(ext_names))) # if neither flag provided, same as if both were provided if not any([pylint, pep8]): pep8 = True pylint = True exit_code_sum = 0 if pylint: pylint_result = run_pylint(selected_modules) exit_code_sum += pylint_result.exit_code if pylint_result.error: display('Pylint: PASSED') logger.error(pylint_result.error.output.decode('utf-8')) logger.error('Pylint: FAILED\n') else: display('Pylint: PASSED\n') if pep8: pep8_result = _run_pep8(selected_modules) exit_code_sum += pep8_result.exit_code if pep8_result.error: logger.error(pep8_result.error.output.decode('utf-8')) logger.error('Flake8: FAILED\n') else: display('Flake8: PASSED\n') sys.exit(exit_code_sum) def _combine_command_result(cli_result, ext_result): final_result = CommandResultItem(None) def apply_result(item): if item: final_result.exit_code += item.exit_code if item.error: if final_result.error: try: final_result.error.message += item.error.message except AttributeError: final_result.error.message += str(item.error) else: final_result.error = item.error setattr(final_result.error, 'message', '') if item.result: if final_result.result: final_result.result += item.result else: final_result.result = item.result apply_result(cli_result) apply_result(ext_result) return final_result def run_pylint(modules, checkers=None, env=None, disable_all=False, enable=None): def get_core_module_paths(modules): core_paths = [] for p in modules["core"].values(): _, tail = os.path.split(p) for x in str(tail).split("-"): p = os.path.join(p, x) core_paths.append(p) return core_paths cli_paths = get_core_module_paths(modules) + list(modules["mod"].values()) ext_paths = [] for path in list(modules["ext"].values()): glob_pattern = os.path.normcase(os.path.join("{}*".format(EXTENSION_PREFIX))) ext_paths.append(glob(os.path.join(path, glob_pattern))[0]) def run(paths, rcfile, desc, checkers=None, env=None, disable_all=False, enable=None): if not paths: return None logger.debug("Using rcfile file: %s", rcfile) logger.debug("Running on %s: %s", desc, "\n".join(paths)) command = "pylint {} --rcfile={} --jobs {}".format( " ".join(paths), rcfile, multiprocessing.cpu_count() ) if checkers is not None: command += ' --load-plugins {}'.format(",".join(checkers)) if disable_all: command += ' --disable=all' if enable is not None: command += ' --enable {}'.format(",".join(enable)) return py_cmd(command, message="Running pylint on {}...".format(desc), env=env) cli_pylintrc, ext_pylintrc = _config_file_path("pylint") cli_result = run(cli_paths, cli_pylintrc, "modules", checkers=checkers, env=env, disable_all=disable_all, enable=enable) ext_result = run(ext_paths, ext_pylintrc, "extensions", checkers=checkers, env=env, disable_all=disable_all, enable=enable) return _combine_command_result(cli_result, ext_result) def _run_pep8(modules): cli_paths = list(modules["core"].values()) + list(modules["mod"].values()) ext_paths = list(modules["ext"].values()) def run(paths, rcfile, desc): if not paths: return None logger.debug("Using config file: %s", rcfile) logger.debug("Running on %s:\n%s", desc, "\n".join(paths)) command = "flake8 --statistics --append-config={} {}".format( rcfile, " ".join(paths) ) return py_cmd(command, message="Running flake8 on {}...".format(desc)) cli_config, ext_config = _config_file_path("flake8") cli_result = run(cli_paths, cli_config, "modules") ext_result = run(ext_paths, ext_config, "extensions") return _combine_command_result(cli_result, ext_result) def _config_file_path(style_type="pylint"): from configparser import NoSectionError try: cli_repo_path = get_azdev_config().get("cli", "repo_path") except NoSectionError: cli_repo_path = None try: ext_repo_path = get_azdev_config().get("ext", "repo_paths").split(',')[0] except NoSectionError: ext_repo_path = None if style_type not in ["pylint", "flake8"]: raise ValueError("style_tyle value allows only: pylint, flake8.") config_file_mapping = { "pylint": "pylintrc", "flake8": ".flake8", } default_config_file_mapping = { "cli": { "pylint": "cli_pylintrc", "flake8": "cli.flake8" }, "ext": { "pylint": "ext_pylintrc", "flake8": "ext.flake8" } } if cli_repo_path: cli_config_path = os.path.join(cli_repo_path, config_file_mapping[style_type]) if not os.path.exists(cli_config_path): cli_config_path = os.path.join( get_azdev_config_dir(), "config_files", default_config_file_mapping["cli"][style_type], ) else: cli_config_path = os.path.join( get_azdev_config_dir(), "config_files", default_config_file_mapping["cli"][style_type], ) if ext_repo_path: ext_config_path = os.path.join(ext_repo_path, config_file_mapping[style_type]) if not os.path.exists(ext_config_path): ext_config_path = os.path.join( get_azdev_config_dir(), "config_files", default_config_file_mapping["ext"][style_type], ) else: ext_config_path = os.path.join( get_azdev_config_dir(), "config_files", default_config_file_mapping["ext"][style_type], ) return cli_config_path, ext_config_path