azdev/operations/statistics/__init__.py (275 lines of code) (raw):

# ----------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ----------------------------------------------------------------------------- import ast import inspect import json import os import re import textwrap import time from importlib import import_module from pathlib import Path from knack.log import get_logger from azdev.utilities import ( heading, display, get_path_table, require_azure_cli, filter_by_git_diff) from .util import filter_modules logger = get_logger(__name__) def list_command_table(modules=None, git_source=None, git_target=None, git_repo=None, include_whl_extensions=False, statistics_only=False): require_azure_cli() from azure.cli.core import get_default_cli # pylint: disable=import-error heading('List Command Table') # allow user to run only on CLI or extensions cli_only = modules == ['CLI'] ext_only = modules == ['EXT'] if cli_only or ext_only: modules = None selected_modules = get_path_table(include_only=modules, include_whl_extensions=include_whl_extensions) if cli_only: selected_modules['ext'] = {} if ext_only: selected_modules['mod'] = {} selected_modules['core'] = {} # filter down to only modules that have changed based on git diff selected_modules = filter_by_git_diff(selected_modules, git_source, git_target, git_repo) if not any(selected_modules.values()): logger.warning('No commands selected to check.') selected_mod_names = list(selected_modules['mod'].keys()) selected_mod_names += list(selected_modules['core'].keys()) selected_mod_names += list(selected_modules['ext'].keys()) # selected_mod_paths = list(selected_modules['mod'].values()) + list(selected_modules['core'].values()) + \ # list(selected_modules['ext'].values()) if selected_mod_names: display('Modules: {}\n'.format(', '.join(selected_mod_names))) start = time.time() display('Initializing with command table and help files...') az_cli = get_default_cli() # load commands, args, and help _create_invoker_and_load_cmds(az_cli) stop = time.time() logger.info('Commands and help loaded in %i sec', stop - start) command_loader = az_cli.invocation.commands_loader # trim command table and help to just selected_modules command_loader = filter_modules( command_loader, modules=selected_mod_names, include_whl_extensions=include_whl_extensions) if not command_loader.command_table: logger.warning('No commands selected to check.') commands = [] codegen_v2_command_count = 0 codegen_v1_command_count = 0 for command_name, command in command_loader.command_table.items(): command_info = { "name": command_name, "source": _get_command_source(command_name, command) } module_loader = command_loader.cmd_to_loader_map[command_name] codegen_info = _command_codegen_info(command_name, command, module_loader) if codegen_info: command_info['codegen_version'] = codegen_info['version'] command_info['codegen_type'] = codegen_info['type'] if codegen_info['version'] == "v2": codegen_v2_command_count += 1 if codegen_info['version'] == "v1": codegen_v1_command_count += 1 commands.append(command_info) if statistics_only: return { "total": len(commands), "codegenV2": codegen_v2_command_count, "codegenV1": codegen_v1_command_count, } display(f"Total Commands: {len(commands)}\t " f"CodeGen V2 Commands: {codegen_v2_command_count}\t " f"CodeGen V1 Commands: {codegen_v1_command_count}") commands = sorted(commands, key=lambda a: a['name']) return commands def diff_command_tables(table_path, diff_table_path, statistics_only=False): with open(table_path, 'r') as f: commands = json.load(f) with open(diff_table_path, 'r') as f: new_commands = json.load(f) command_table = {} for command in commands: command_table[command['name']] = command added_commands = [] migrated_commands = [] for command in new_commands: name = command['name'] if name not in command_table: added_commands.append(command) elif command != command_table[name] and command.get('codegen_version', None) != command_table[name].get( 'codegen_version', None): migrated_commands.append(command) added_v1_commands_count = 0 added_v2_commands_count = 0 for command in added_commands: if command.get('codegen_version', None) == "v2": added_v2_commands_count += 1 elif command.get('codegen_version', None) == "v1": added_v1_commands_count += 1 migrated_v1_commands_count = 0 migrated_v2_commands_count = 0 for command in migrated_commands: if command.get('codegen_version', None) == "v2": migrated_v2_commands_count += 1 elif command.get('codegen_version', None) == "v1": migrated_v1_commands_count += 1 if statistics_only: return { "newCommands": { "total": len(added_commands), "codegenV2": added_v2_commands_count, "codegenv1": added_v1_commands_count, }, "migratedCommands": { "total": len(migrated_commands), "codegenV2": migrated_v2_commands_count, "codegenv1": migrated_v1_commands_count, } } display(f"Total New Commands: {len(added_commands)}\t " f"CodeGen V2 Commands: {added_v2_commands_count}\t " f"CodeGen V1 Commands: {added_v1_commands_count}") display(f"Total Migrated Commands: {len(migrated_commands)}\t " f"CodeGen V2 Commands: {migrated_v2_commands_count}\t " f"CodeGen V1 Commands: {migrated_v1_commands_count}") return { "newCommands": added_commands, "migratedCommands": migrated_commands, } def _get_command_source(command_name, command): from azure.cli.core.commands import ExtensionCommandSource # pylint: disable=import-error if isinstance(command.command_source, ExtensionCommandSource): return { "module": command.command_source.extension_name, "isExtension": True } if command.command_source is None: raise ValueError('Command: `%s`, has no command source.' % command_name) # command is from module return { "module": command.command_source, "isExtension": False } def _create_invoker_and_load_cmds(cli_ctx, load_arguments=False): from knack.events import ( EVENT_INVOKER_PRE_CMD_TBL_CREATE, EVENT_INVOKER_POST_CMD_TBL_CREATE) from azure.cli.core.commands import register_cache_arguments from azure.cli.core.commands.arm import register_global_subscription_argument, register_ids_argument start_time = time.time() register_global_subscription_argument(cli_ctx) register_ids_argument(cli_ctx) register_cache_arguments(cli_ctx) invoker = cli_ctx.invocation_cls(cli_ctx=cli_ctx, commands_loader_cls=cli_ctx.commands_loader_cls, parser_cls=cli_ctx.parser_cls, help_cls=cli_ctx.help_cls) cli_ctx.invocation = invoker invoker.commands_loader.skip_applicability = True cli_ctx.raise_event(EVENT_INVOKER_PRE_CMD_TBL_CREATE, args=[]) invoker.commands_loader.load_command_table(None) invoker.commands_loader.command_name = '' if load_arguments: from azure.cli.core.commands.events import EVENT_INVOKER_PRE_LOAD_ARGUMENTS, EVENT_INVOKER_POST_LOAD_ARGUMENTS cli_ctx.raise_event(EVENT_INVOKER_PRE_LOAD_ARGUMENTS, commands_loader=invoker.commands_loader) invoker.commands_loader.load_arguments() cli_ctx.raise_event(EVENT_INVOKER_POST_LOAD_ARGUMENTS, commands_loader=invoker.commands_loader) cli_ctx.raise_event(EVENT_INVOKER_POST_CMD_TBL_CREATE, commands_loader=invoker.commands_loader) invoker.parser.cli_ctx = cli_ctx invoker.parser.load_command_table(invoker.commands_loader) end_time = time.time() logger.info('Time to load entire command table: %.3f sec', end_time - start_time) import_aaz_express = re.compile(r'^\s*from (.*\.)?aaz(\..*)? .*$') command_args_express = re.compile(r'^.*[\s\(]command_args=.*$') def _command_codegen_info(command_name, command, module_loader): # pylint: disable=unused-argument, too-many-branches, too-many-statements from azure.cli.core.commands import AzCliCommand try: from azure.cli.core.aaz import AAZCommand if isinstance(command, AAZCommand): return { "version": "v2", "type": "Atomic" } except ImportError: pass if isinstance(command, AzCliCommand): if 'command_operation' not in command.command_kwargs: return None command_operation = command.command_kwargs['command_operation'] is_v2_convenience = False is_generated = False if getattr(command_operation, 'op_path', None): operation_path = command_operation.op_path operation_module_path = operation_path.split("#")[0] op = command_operation.get_op_handler(operation_path) func_map = _get_module_functions(operation_module_path) op_source = _expand_all_functions(op, func_map) for line in op_source.splitlines(): if import_aaz_express.match(line): is_v2_convenience = True break if command_args_express.match(line): is_v2_convenience = True path_parts = list(Path(inspect.getfile(op)).parts) if "generated" in path_parts: is_generated = True if not is_v2_convenience and getattr(command_operation, 'getter_op_path', None): op = command_operation.get_op_handler(command_operation.getter_op_path) op_source = inspect.getsource(op) for line in op_source.splitlines(): if import_aaz_express.match(line): is_v2_convenience = True break if command_args_express.match(line): is_v2_convenience = True path_parts = list(Path(inspect.getfile(op)).parts) if "generated" in path_parts: is_generated = True if not is_v2_convenience and getattr(command_operation, 'setter_op_path', None): op = command_operation.get_op_handler(command_operation.setter_op_path) op_source = inspect.getsource(op) for line in op_source.splitlines(): if import_aaz_express.match(line): is_v2_convenience = True break if command_args_express.match(line): is_v2_convenience = True path_parts = list(Path(inspect.getfile(op)).parts) if "generated" in path_parts: is_generated = True if not is_v2_convenience and getattr(command_operation, 'custom_function_op_path', None): op = command_operation.get_op_handler(command_operation.custom_function_op_path) op_source = inspect.getsource(op) for line in op_source.splitlines(): if import_aaz_express.match(line): is_v2_convenience = True break if command_args_express.match(line): is_v2_convenience = True path_parts = list(Path(inspect.getfile(op)).parts) if "generated" in path_parts: is_generated = True if is_v2_convenience: return { "version": "v2", "type": "Convenience" } if is_generated: return { "version": "v1", "type": "SDK" } return None def _get_module_functions(path): try: module = import_module(path) functions = inspect.getmembers(module, predicate=inspect.isfunction) return dict(functions) except ModuleNotFoundError: return None # bypass functions in sdk def _expand_all_functions(func, func_map): source = "" try: source = textwrap.dedent(inspect.getsource(func)) except (OSError, TypeError): # https://docs.python.org/3/library/inspect.html#inspect.getsource logger.warning("Cannot retrieve the source code of %s.", func) if func_map is None: return source tree = ast.parse(source) for node in ast.walk(tree): if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): function_name = node.func.id function = func_map.get(function_name, None) # skip recursion and `locals()` if function_name == func.__name__ or function is None: continue source += _expand_all_functions(function, func_map) return source