azdev/operations/breaking_change/__init__.py (265 lines of code) (raw):
# -----------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -----------------------------------------------------------------------------
import re
import time
from collections import defaultdict
from importlib import import_module
import packaging.version
from knack.log import get_logger
from azdev.operations.statistics import _create_invoker_and_load_cmds # pylint: disable=protected-access
from azdev.utilities import require_azure_cli, display, heading, output, calc_selected_mod_names
from azdev.utilities.path import calc_selected_modules
# pylint: disable=no-else-return
logger = get_logger(__name__)
class BreakingChangeItem:
def __init__(self, module, command, detail, target_version):
self.module = module
self.command = command
self.detail = detail
self.target_version = target_version
self.group_ref = None
def calc_ref(self, loader):
if not loader:
return
if self.command in loader.command_group_table:
self.group_ref = self.command.split()
else:
self.group_ref = self.command.split()[:-1]
@property
def prepended_detail(self):
prepended_lines = [f'- {line}' if line else '' for line in self.detail.split('\n')]
return '\n'.join(prepended_lines)
def _load_commands():
start = time.time()
display('Initializing with loading command table...')
from azure.cli.core import get_default_cli # pylint: disable=import-error
az_cli = get_default_cli()
# load commands, args, and help
# The arguments must be loaded before the `EVENT_INVOKER_POST_CMD_TBL_CREATE` event.
# This is because we generate the `deprecate_info` and `upcoming_breaking_change` tags from pre-announcement data
# during the event.
# If the arguments are not loaded beforehand, this information will not be included.
_create_invoker_and_load_cmds(az_cli, load_arguments=True)
stop = time.time()
logger.info('Commands loaded in %i sec', stop - start)
display('Commands loaded in {} sec'.format(stop - start))
command_loader = az_cli.invocation.commands_loader
if not command_loader.command_table:
logger.warning('No commands selected to check.')
return command_loader
def _handle_custom_breaking_changes(module, command):
"""
Collect Custom Pre-Announcement defined in `_breaking_change.py`
:param module: module name
:param command: command name
:return: A generated returns Custom Pre-Announcements defined in `_breaking_change.py`
"""
from azure.cli.core.breaking_change import upcoming_breaking_changes
yield from _handle_custom_breaking_change(module, command, upcoming_breaking_changes.get(command))
yield from _handle_custom_breaking_change(module, command, upcoming_breaking_changes.get(f'az {command}'))
for key in upcoming_breaking_changes:
if key.startswith(command + '.') or key.startswith(f'az {command}.'):
yield from _handle_custom_breaking_change(module, command, upcoming_breaking_changes[key])
def _handle_custom_breaking_change(module, command, breaking_change):
"""
Handle a BreakingChange item defined in `_breaking_change.py`. We need this method because the item stored could
be a list or object
"""
from azure.cli.core.breaking_change import BreakingChange
if isinstance(breaking_change, str):
yield BreakingChangeItem(module, command, breaking_change, None)
elif isinstance(breaking_change, BreakingChange):
yield BreakingChangeItem(module, command, breaking_change.message, breaking_change.target_version.version())
elif isinstance(breaking_change, list):
for bc in breaking_change:
yield from _handle_custom_breaking_change(module, command, bc)
def _handle_status_tag(module, command, status_tag):
from knack.deprecation import Deprecated
from azure.cli.core.breaking_change import MergedStatusTag, UpcomingBreakingChangeTag, TargetVersion
if isinstance(status_tag, MergedStatusTag):
for tag in status_tag.tags:
yield from _handle_status_tag(module, command, tag)
else:
detail = status_tag._get_message(status_tag) # pylint: disable=protected-access
version = None
if isinstance(status_tag, Deprecated):
version = status_tag.expiration
elif isinstance(status_tag, UpcomingBreakingChangeTag):
if isinstance(status_tag.target_version, TargetVersion):
version = status_tag.target_version.version()
elif isinstance(status_tag.target_version, str):
version = status_tag.target_version
if version is None:
version_match = re.search(r'\d+\.\d+\.\d+', detail)
if version_match:
version = version_match.group(0)
yield BreakingChangeItem(module, command, detail, version)
def _handle_command_deprecation(module, command, deprecate_info):
yield from _handle_status_tag(module, command, deprecate_info)
def _calc_target_of_arg_deprecation(arg_name, arg_settings):
from knack.deprecation import Deprecated
option_str_list = []
depr = arg_settings.get('deprecate_info')
for option in arg_settings.get('option_list', []):
if isinstance(option, str):
option_str_list.append(option)
elif isinstance(option, Deprecated):
option_str_list.append(option.target)
if option_str_list:
return '/'.join(option_str_list)
elif hasattr(depr, 'target'):
return depr.target
else:
return arg_name
def _handle_arg_deprecation(module, command, target, deprecation_info):
deprecation_info.target = target
yield from _handle_status_tag(module, command, deprecation_info)
def _handle_options_deprecation(module, command, options):
from knack.deprecation import Deprecated
deprecate_option_map = defaultdict(lambda: [])
for option in options:
if isinstance(option, Deprecated):
key = f'{option.redirect}|{option.expiration}|{option.hide}'
deprecate_option_map[key].append(option)
for _, depr_list in deprecate_option_map.items():
target = '/'.join([depr.target for depr in depr_list])
depr = depr_list[0]
depr.target = target
yield from _handle_status_tag(module, command, depr)
def _handle_command_breaking_changes(module, command, command_info, source):
if source == "deprecate_info":
if hasattr(command_info, "deprecate_info") and command_info.deprecate_info:
yield from _handle_command_deprecation(module, command, command_info.deprecate_info)
for argument_name, argument in command_info.arguments.items():
arg_settings = argument.type.settings
depr = arg_settings.get('deprecate_info')
if depr:
bc_target = _calc_target_of_arg_deprecation(argument_name, arg_settings)
yield from _handle_arg_deprecation(module, command, bc_target, depr)
yield from _handle_options_deprecation(module, command, arg_settings.get('options_list', []))
if source == "pre_announce":
yield from _handle_custom_breaking_changes(module, command)
def _handle_command_group_deprecation(module, command, deprecate_info):
yield from _handle_status_tag(module, command, deprecate_info)
def _handle_command_group_breaking_changes(module, command_group_name, command_group_info, source):
if source == "deprecate_info":
if hasattr(command_group_info, 'group_kwargs') and command_group_info.group_kwargs.get('deprecate_info'):
yield from _handle_command_group_deprecation(module, command_group_name,
command_group_info.group_kwargs.get('deprecate_info'))
if source == "pre_announce":
yield from _handle_custom_breaking_changes(module, command_group_name)
def _get_mod_ext_name(loader):
# There could be different name with module name in extension.
# For example, module name of `application-insights` is azext_applicationinsights
try:
module_source = next(iter(loader.command_table.values())).command_source
if isinstance(module_source, str):
return module_source
else:
return module_source.extension_name
except StopIteration:
logger.warning('There is no command in Loader(%s)', loader)
mod_path = loader.__class__.__module__
mod_name = mod_path.rsplit('.', maxsplit=1)[-1]
mod_name = mod_name.replace('azext_', '', 1)
return mod_name
def _iter_and_prepare_module_loader(command_loader, selected_mod_names):
for loader in command_loader.loaders:
module_path = loader.__class__.__module__
module_name = module_path.rsplit('.', maxsplit=1)[-1]
if module_name and module_name not in selected_mod_names:
continue
_breaking_change_module = f'{module_path}._breaking_change'
try:
import_module(_breaking_change_module)
except ImportError:
pass
loader.skip_applicability = True
yield module_name, loader
def _handle_module(module, loader, source):
start = time.time()
for command, command_info in loader.command_table.items():
if command:
yield from _handle_command_breaking_changes(module, command, command_info, source)
for command_group_name, command_group in loader.command_group_table.items():
if command_group_name:
yield from _handle_command_group_breaking_changes(module, command_group_name, command_group, source)
stop = time.time()
logger.info('Module %s finished in %i sec', module, stop - start)
display('Module {} finished loaded in {} sec'.format(module, stop - start))
def _handle_core(source):
start = time.time()
if source == "pre_announce":
core_module = 'azure.cli.core'
_breaking_change_module = f'{core_module}._breaking_change'
try:
import_module(_breaking_change_module)
except ImportError:
pass
yield from _handle_custom_breaking_changes('core', '')
yield from _handle_custom_breaking_changes('core', '_core')
stop = time.time()
logger.info('Core finished in %i sec', stop - start)
display('Core finished loaded in {} sec'.format(stop - start))
def _handle_upcoming_breaking_changes(command_loader, selected_mod_names, source):
if 'core' in selected_mod_names or 'azure-cli-core' in selected_mod_names:
yield from _handle_core(source)
for module, loader in _iter_and_prepare_module_loader(command_loader, selected_mod_names):
for bc_item in _handle_module(module, loader, source):
bc_item.calc_ref(loader)
yield bc_item
def _filter_breaking_changes(iterator, max_version=None):
if not max_version:
yield from iterator
return
try:
parsed_max_version = packaging.version.parse(max_version)
except packaging.version.InvalidVersion:
logger.warning('Invalid target version: %s; '
'Will present all upcoming breaking changes as alternative.', max_version)
yield from iterator
return
for item in iterator:
if item.target_version:
try:
target_version = packaging.version.parse(item.target_version)
if target_version <= parsed_max_version:
yield item
except packaging.version.InvalidVersion:
logger.warning('Invalid version from `%s`: %s', item.command, item.target_version)
# pylint: disable=unnecessary-lambda-assignment
def _group_breaking_change_items(iterator, group_by_version=False):
if group_by_version:
upcoming_breaking_changes = defaultdict( # module to command
lambda: defaultdict( # command to version
lambda: {'group_ref': None, 'items': defaultdict( # version to list of breaking changes
lambda: [])}))
else:
upcoming_breaking_changes = defaultdict( # module to command
lambda: defaultdict( # command to list of breaking changes
lambda: {'group_ref': None, 'items': []}))
for item in iterator:
version = item.target_version if item.target_version else 'Unspecific'
upcoming_breaking_changes[item.module][item.command]['group_ref'] = item.group_ref
if group_by_version:
upcoming_breaking_changes[item.module][item.command]['items'][version].append(item.prepended_detail)
else:
upcoming_breaking_changes[item.module][item.command]['items'].append(item.prepended_detail)
return upcoming_breaking_changes
def collect_upcoming_breaking_changes(modules=None, target_version='NextWindow', source=None, group_by_version=None,
output_format='structure', no_head=False, no_tail=False,
include_whl_extensions=False):
if target_version == 'NextWindow':
from azure.cli.core.breaking_change import NEXT_BREAKING_CHANGE_RELEASE
target_version = NEXT_BREAKING_CHANGE_RELEASE
elif target_version.lower() == 'none':
target_version = None
require_azure_cli()
selected_modules = calc_selected_modules(modules, include_whl_extensions=include_whl_extensions)
cli_mod_names = list(selected_modules['core'].keys()) + list(selected_modules['mod'].keys())
ext_mod_names = list(selected_modules['ext'].keys())
if cli_mod_names or ext_mod_names:
display('Modules selected: {}\n'.format(', '.join(cli_mod_names + ext_mod_names)))
command_loader = _load_commands()
heading('Collecting Breaking Change Pre-announcement')
breaking_changes = []
if cli_mod_names:
cli_breaking_changes = _handle_upcoming_breaking_changes(command_loader, cli_mod_names, source)
cli_breaking_changes = _filter_breaking_changes(cli_breaking_changes, target_version)
breaking_changes.extend(cli_breaking_changes)
if ext_mod_names:
ext_breaking_changes = _handle_upcoming_breaking_changes(command_loader, ext_mod_names, 'pre_announce')
breaking_changes.extend(ext_breaking_changes)
breaking_changes = _group_breaking_change_items(breaking_changes, group_by_version)
if output_format == 'structure':
return breaking_changes
elif output_format == 'markdown':
from jinja2 import Environment, PackageLoader
env = Environment(loader=PackageLoader('azdev', 'operations/breaking_change'),
trim_blocks=True)
template = env.get_template('markdown_template.jinja2')
output(template.render({
'module_bc': breaking_changes,
'no_head': no_head,
'no_tail': no_tail,
}), end='' if no_tail else '\n')
return None