azure-cli-diff-tool/azure_cli_diff_tool/meta_change_detect.py (417 lines of code) (raw):
# -----------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -----------------------------------------------------------------------------
import logging
import requests
import os.path
from .utils import get_command_tree, ChangeType, extract_cmd_name, extract_subgroup_name, extract_subgroup_property, \
extract_subgroup_deprecate_property, extract_cmd_property, extract_cmd_deprecate_property, DiffLevel
from .meta_change import (CmdAdd, CmdRemove, CmdPropAdd, CmdPropRemove, CmdPropUpdate,
ParaAdd, ParaRemove, ParaPropAdd, ParaPropRemove, ParaPropUpdate,
SubgroupAdd, SubgroupRemove, SubgroupPropAdd, SubgroupPropRemove, SubgroupPropUpdate)
from ._const import (SUBGROUP_PROPERTY_ADD_BREAK_LIST, SUBGROUP_PROPERTY_ADD_WARN_LIST,
SUBGROUP_PROPERTY_REMOVE_BREAK_LIST, SUBGROUP_PROPERTY_REMOVE_WARN_LIST,
SUBGROUP_PROPERTY_UPDATE_BREAK_LIST, SUBGROUP_PROPERTY_UPDATE_WARN_LIST,
CMD_PROPERTY_ADD_BREAK_LIST, CMD_PROPERTY_ADD_WARN_LIST,
CMD_PROPERTY_REMOVE_BREAK_LIST, CMD_PROPERTY_REMOVE_WARN_LIST,
CMD_PROPERTY_UPDATE_BREAK_LIST, CMD_PROPERTY_UPDATE_WARN_LIST,
PARA_PROPERTY_REMOVE_BREAK_LIST, PARA_PROPERTY_REMOVE_WARN_LIST,
PARA_PROPERTY_ADD_BREAK_LIST, PARA_PROPERTY_ADD_WARN_LIST,
PARA_PROPERTY_UPDATE_BREAK_LIST, PARA_PROPERTY_UPDATE_WARN_LIST,
CMD_REMOVE_SUFFIX_WARN_LIST,
META_CHANDE_WHITELIST_FILE_URL,
META_CHANDE_WHITELIST_FILE_PATH)
logger = logging.getLogger(__name__)
class MetaChangeDetect:
EXPORTED_META_PROPERTY = ["rule_id", "rule_link_url", "is_break", "diff_level",
"rule_message", "suggest_message", "cmd_name", "subgroup_name"]
CHECKED_PARA_PROPERTY = ["name", "options", "required", "choices", "id_part", "nargs", "default", "desc",
"aaz_type", "type", "aaz_default", "aaz_choices",
"deprecate_info_target", "deprecate_info_redirect", "deprecate_info_hide",
"deprecate_info_expiration",
"options_deprecate_info"]
def __init__(self, deep_diff=None, base_meta=None, diff_meta=None):
self.deep_diff = {}
if deep_diff:
self.deep_diff = deep_diff
else:
logger.info("None diffs from cmd meta json")
if base_meta["module_name"] != diff_meta["module_name"]:
print(f'Comparing two modules with different name, base mod: {base_meta["module_name"]},'
f' diff mod: {diff_meta["module_name"]}')
self.module_name = diff_meta["module_name"]
self.base_meta = base_meta
self.diff_meta = diff_meta
self.diff_objs = []
self.cmd_set_with_parameter_change = set()
self.meta_change_whitelist = set()
self.__get_meta_change_whitelist__()
def __get_meta_change_whitelist__(self):
remote_res = requests.get(META_CHANDE_WHITELIST_FILE_URL)
if remote_res.status_code != 200:
logger.warning("remote meta change whitelist fetch error, use local dict")
if not os.path.exists(META_CHANDE_WHITELIST_FILE_PATH):
logger.info("meta_change_whitelist.txt not exist, skipped")
return
with open(META_CHANDE_WHITELIST_FILE_PATH, "r") as f_in:
for line in f_in:
white_key = line.rstrip()
self.meta_change_whitelist.add(white_key)
else:
logger.info("remote meta change whitelist fetch success")
content = remote_res.text
for line in content.split("\n"):
white_key = line.rstrip()
self.meta_change_whitelist.add(white_key)
@staticmethod
def __search_cmd_obj(cmd_name, search_meta):
command_tree = get_command_tree(cmd_name)
meta_search = search_meta
while True:
if "is_group" not in command_tree:
break
if command_tree["is_group"]:
group_name = command_tree["group_name"]
meta_search = meta_search["sub_groups"][group_name]
command_tree = command_tree["sub_info"]
else:
cmd_name = command_tree["cmd_name"]
meta_search = meta_search["commands"][cmd_name]
break
return meta_search
def __log_cmd_with_parameter_change(self, cmd_name):
self.cmd_set_with_parameter_change.add(cmd_name)
def __process_subgroup_items(self, dict_key, subgroup_name, diff_type):
has_subgroup_key, subgroup_property = extract_subgroup_property(dict_key, subgroup_name)
if not has_subgroup_key:
if diff_type == ChangeType.REMOVE:
diff_obj = SubgroupRemove(subgroup_name)
else:
diff_obj = SubgroupAdd(subgroup_name)
self.diff_objs.append(diff_obj)
return
# deal with deprecate_info expanded key's add/remove/update
if diff_type == ChangeType.ADD:
if subgroup_property in SUBGROUP_PROPERTY_ADD_WARN_LIST:
diff_obj = SubgroupPropAdd(subgroup_name, subgroup_property, False, DiffLevel.WARN)
elif subgroup_property in SUBGROUP_PROPERTY_ADD_BREAK_LIST:
diff_obj = SubgroupPropAdd(subgroup_name, subgroup_property, True, DiffLevel.BREAK)
else:
diff_obj = SubgroupPropAdd(subgroup_name, subgroup_property, False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
else:
if subgroup_property in SUBGROUP_PROPERTY_REMOVE_WARN_LIST:
diff_obj = SubgroupPropRemove(subgroup_name, subgroup_property, False, DiffLevel.WARN)
elif subgroup_property in SUBGROUP_PROPERTY_REMOVE_BREAK_LIST:
diff_obj = SubgroupPropRemove(subgroup_name, subgroup_property, True, DiffLevel.BREAK)
else:
diff_obj = SubgroupPropRemove(subgroup_name, subgroup_property, False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
def __process_cmd_items(self, dict_key, cmd_name, diff_type):
has_cmd_key, cmd_property = extract_cmd_property(dict_key, cmd_name)
if not has_cmd_key:
if diff_type == ChangeType.REMOVE:
if cmd_name.split()[-1] in CMD_REMOVE_SUFFIX_WARN_LIST:
diff_obj = CmdRemove(cmd_name, False, DiffLevel.WARN)
else:
diff_obj = CmdRemove(cmd_name, True, DiffLevel.BREAK)
else:
diff_obj = CmdAdd(cmd_name)
self.diff_objs.append(diff_obj)
return
if cmd_property == "parameters":
self.__log_cmd_with_parameter_change(cmd_name)
return
if diff_type == ChangeType.ADD:
if cmd_property in CMD_PROPERTY_ADD_WARN_LIST:
diff_obj = CmdPropAdd(cmd_name, cmd_property, False, DiffLevel.WARN)
elif cmd_property in CMD_PROPERTY_ADD_BREAK_LIST:
diff_obj = CmdPropAdd(cmd_name, cmd_property, True, DiffLevel.BREAK)
else:
diff_obj = CmdPropAdd(cmd_name, cmd_property, False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
else:
if cmd_property in CMD_PROPERTY_REMOVE_WARN_LIST:
diff_obj = CmdPropRemove(cmd_name, cmd_property, False, DiffLevel.WARN)
elif cmd_property in CMD_PROPERTY_REMOVE_BREAK_LIST:
diff_obj = CmdPropRemove(cmd_name, cmd_property, True, DiffLevel.BREAK)
else:
diff_obj = CmdPropRemove(cmd_name, cmd_property, False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
def __iter_dict_items(self, dict_items, diff_type):
if diff_type not in [ChangeType.REMOVE, ChangeType.ADD]:
raise Exception("Unsupported dict item type")
for dict_key in dict_items:
has_cmd, cmd_name = extract_cmd_name(dict_key)
if not has_cmd or not cmd_name:
has_subgroup, subgroup_name = extract_subgroup_name(dict_key)
if not has_subgroup or not subgroup_name:
continue
self.__process_subgroup_items(dict_key, subgroup_name, diff_type)
continue
self.__process_cmd_items(dict_key, cmd_name, diff_type)
def __iter_list_items(self, list_items, diff_type):
"""
['parameters'][3]
['parameters'][0]['options'][1]
['parameters'][0]['choices'][0]
['parameters'][5]['options_deprecate_info'][1]
"""
if diff_type not in [ChangeType.REMOVE, ChangeType.ADD]:
raise Exception("Unsupported dict item type")
for key, _ in list_items.items():
has_cmd, cmd_name = extract_cmd_name(key)
if not has_cmd or not cmd_name:
print("extract cmd failed for " + key)
continue
has_cmd_key, cmd_property = extract_cmd_property(key, cmd_name)
if not has_cmd_key:
continue
if cmd_property == "parameters":
self.__log_cmd_with_parameter_change(cmd_name)
def check_dict_item_remove(self):
if not self.deep_diff.get("dictionary_item_removed", None):
return
dict_item_removed = self.deep_diff["dictionary_item_removed"]
self.__iter_dict_items(dict_item_removed, ChangeType.REMOVE)
def check_dict_item_add(self):
if not self.deep_diff.get("dictionary_item_added", None):
return
dict_item_added = self.deep_diff["dictionary_item_added"]
self.__iter_dict_items(dict_item_added, ChangeType.ADD)
def check_list_item_remove(self):
if not self.deep_diff.get("iterable_item_removed", None):
return
list_item_remove = self.deep_diff["iterable_item_removed"]
self.__iter_list_items(list_item_remove, ChangeType.REMOVE)
def check_list_item_add(self):
if not self.deep_diff.get("iterable_item_added", None):
return
list_item_add = self.deep_diff["iterable_item_added"]
self.__iter_list_items(list_item_add, ChangeType.ADD)
def __process_subgroup_value_change(self, key, subgroup_name, old_value, new_value):
has_subgroup_prop, subgroup_property = extract_subgroup_property(key, subgroup_name)
if not has_subgroup_prop:
# subgroup key does not change independently, it must be followed by some cmd property
return
if subgroup_property in SUBGROUP_PROPERTY_UPDATE_WARN_LIST:
diff_obj = SubgroupPropUpdate(subgroup_name, subgroup_property, False, DiffLevel.WARN, old_value, new_value)
elif subgroup_property in SUBGROUP_PROPERTY_UPDATE_BREAK_LIST:
diff_obj = SubgroupPropUpdate(subgroup_name, subgroup_property, True, DiffLevel.BREAK, old_value, new_value)
else:
diff_obj = SubgroupPropUpdate(subgroup_name, subgroup_property, False, DiffLevel.INFO, old_value, new_value)
self.diff_objs.append(diff_obj)
def __process_cmd_value_change(self, key, cmd_name, old_value, new_value):
has_cmd_prop, cmd_property = extract_cmd_property(key, cmd_name)
if not has_cmd_prop:
# cmd key does not change independently, it must followed by some cmd property
return
if cmd_property == "parameters":
self.__log_cmd_with_parameter_change(cmd_name)
return
if cmd_property in CMD_PROPERTY_UPDATE_WARN_LIST:
diff_obj = CmdPropUpdate(cmd_name, cmd_property, False, DiffLevel.WARN, old_value, new_value)
elif cmd_property in CMD_PROPERTY_UPDATE_BREAK_LIST:
diff_obj = CmdPropUpdate(cmd_name, cmd_property, True, DiffLevel.BREAK, old_value, new_value)
else:
diff_obj = CmdPropUpdate(cmd_name, cmd_property, False, DiffLevel.INFO, old_value, new_value)
self.diff_objs.append(diff_obj)
def check_value_change(self):
"""
['sub_groups']['acr']['deprecate_info_redirect']
['commands']['acr helm show']['deprecate_info_redirect']
"""
if not self.deep_diff.get("values_changed", None):
return
value_changes = self.deep_diff["values_changed"]
for key, value_obj in value_changes.items():
old_value = value_obj["old_value"]
new_value = value_obj["new_value"]
has_cmd, cmd_name = extract_cmd_name(key)
if not has_cmd or not cmd_name:
has_subgroup, subgroup_name = extract_subgroup_name(key)
if not has_subgroup or not subgroup_name:
print("extract cmd or sub group failed for " + key)
continue
self.__process_subgroup_value_change(key, subgroup_name, old_value, new_value)
continue
self.__process_cmd_value_change(key, cmd_name, old_value, new_value)
@staticmethod
def __search_para_with_name_and_options(base_para_obj, search_parameters):
para_name = base_para_obj["name"]
para_option_set = set(base_para_obj["options"])
for para_obj in search_parameters:
name = para_obj["name"]
option_set = set(para_obj.get("options", []))
if para_name == name or para_option_set.issubset(option_set):
# parameter obj which has the same name or new option list contains old option list,
# is same parameter obj
# if name is changed and new option list lost element in old option list, then is different
return para_obj
return None
def __process_parameter_value_update(self, cmd_name, prop, base_para_obj, base_val, cmp_val):
if base_val == cmp_val:
return
if prop == "options_deprecate_info":
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, False, DiffLevel.INFO,
base_val, cmp_val)
self.diff_objs.append(diff_obj)
return
if isinstance(base_val, list) and isinstance(cmp_val, list):
if set(base_val).issubset(set(cmp_val)):
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, False, DiffLevel.INFO,
base_val, cmp_val)
else:
if prop in PARA_PROPERTY_UPDATE_WARN_LIST:
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, False, DiffLevel.WARN,
base_val, cmp_val)
else:
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, True, DiffLevel.BREAK,
base_val, cmp_val)
self.diff_objs.append(diff_obj)
return
if prop in PARA_PROPERTY_UPDATE_WARN_LIST:
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, False, DiffLevel.WARN,
base_val, cmp_val)
elif prop in PARA_PROPERTY_UPDATE_BREAK_LIST:
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, True, DiffLevel.BREAK,
base_val, cmp_val)
else:
diff_obj = ParaPropUpdate(cmd_name, base_para_obj["name"], prop, False, DiffLevel.INFO,
base_val, cmp_val)
self.diff_objs.append(diff_obj)
def check_cmd_parameter_diff(self, cmd_name, base_parameters, cmp_parameters):
"""check cmd parameter diff"""
for base_para_obj in base_parameters:
base_para_obj["checked"] = True
cmp_para_obj = self.__search_para_with_name_and_options(base_para_obj, cmp_parameters)
if cmp_para_obj is None:
# cmd lost parameter obj, is break
diff_obj = ParaRemove(cmd_name, base_para_obj["name"], True, DiffLevel.BREAK)
# add flag to avoid duplicate compare
self.diff_objs.append(diff_obj)
continue
cmp_para_obj["checked"] = True
for prop in self.CHECKED_PARA_PROPERTY:
if prop not in base_para_obj and prop not in cmp_para_obj:
continue
if prop in base_para_obj and prop not in cmp_para_obj:
# prop dropped in new para obj
prop_value = base_para_obj[prop]
if prop in PARA_PROPERTY_REMOVE_WARN_LIST:
diff_obj = ParaPropRemove(cmd_name, base_para_obj["name"], prop, prop_value,
False, DiffLevel.WARN)
elif prop in PARA_PROPERTY_REMOVE_BREAK_LIST:
diff_obj = ParaPropRemove(cmd_name, base_para_obj["name"], prop, prop_value,
True, DiffLevel.BREAK)
else:
diff_obj = ParaPropRemove(cmd_name, base_para_obj["name"], prop, prop_value,
False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
continue
if prop not in base_para_obj and prop in cmp_para_obj:
# prop added in new para obj
prop_value = cmp_para_obj[prop]
if prop in PARA_PROPERTY_ADD_WARN_LIST:
diff_obj = ParaPropAdd(cmd_name, base_para_obj["name"], prop, prop_value,
False, DiffLevel.WARN)
elif prop in PARA_PROPERTY_ADD_BREAK_LIST:
diff_obj = ParaPropAdd(cmd_name, base_para_obj["name"], prop, prop_value,
True, DiffLevel.BREAK)
else:
diff_obj = ParaPropAdd(cmd_name, base_para_obj["name"], prop, prop_value,
False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
continue
# prop exists in both new and old para obj, value needs to be checked
base_val = base_para_obj[prop]
cmp_val = cmp_para_obj[prop]
self.__process_parameter_value_update(cmd_name, prop, base_para_obj, base_val, cmp_val)
continue
# check added parameter obj, if obj is required, then is break
for cmp_para_obj in cmp_parameters:
if "checked" in cmp_para_obj and cmp_para_obj["checked"]:
continue
para_name = cmp_para_obj["name"]
required = cmp_para_obj.get("required", None)
if required:
diff_obj = ParaAdd(cmd_name, para_name, True, DiffLevel.BREAK)
else:
diff_obj = ParaAdd(cmd_name, para_name, False, DiffLevel.INFO)
self.diff_objs.append(diff_obj)
def check_cmds_parameter_diff(self):
"""
deal with command parameter diffs
"""
for cmd_name in self.cmd_set_with_parameter_change:
cmd_base = self.__search_cmd_obj(cmd_name, self.base_meta)
cmd_cmp = self.__search_cmd_obj(cmd_name, self.diff_meta)
base_parameters = cmd_base.get("parameters", [])
cmp_parameters = cmd_cmp.get("parameters", [])
self.check_cmd_parameter_diff(cmd_name, base_parameters, cmp_parameters)
def filter_diffs_by_whitelist(self):
"""
filter_diffs_by_whitelist
"""
new_diff_objs = []
for obj in self.diff_objs:
if obj.filter_key and obj.is_break and "\t".join(obj.filter_key) in self.meta_change_whitelist:
continue
new_diff_objs.append(obj)
self.diff_objs = new_diff_objs
def check_deep_diffs(self):
self.check_dict_item_remove()
self.check_dict_item_add()
self.check_list_item_remove()
self.check_list_item_add()
self.check_value_change()
self.check_cmds_parameter_diff()
self.filter_diffs_by_whitelist()
@staticmethod
def fill_subgroup_rules(obj, ret_mod, rule):
command_group_info = ret_mod
group_name_arr = obj.subgroup_name.split(" ")
start_level = 1
while start_level < len(group_name_arr):
group_name = " ".join(group_name_arr[:start_level])
if group_name not in command_group_info["sub_groups"]:
command_group_info["sub_groups"][group_name] = {
"name": group_name,
"commands": {},
"sub_groups": {}
}
start_level += 1
command_group_info = command_group_info["sub_groups"][group_name]
group_name = obj.subgroup_name
group_rules = []
if group_name not in command_group_info["sub_groups"]:
command_group_info["sub_groups"] = {group_name: group_rules}
group_rules = command_group_info["sub_groups"][group_name]
group_rules.append(rule)
command_group_info["sub_groups"][group_name] = group_rules
@staticmethod
def fill_cmd_rules(obj, ret_mod, rule):
command_tree = get_command_tree(obj.cmd_name)
command_group_info = ret_mod
while True:
if "is_group" not in command_tree:
break
if command_tree["is_group"]:
group_name = command_tree["group_name"]
if group_name not in command_group_info["sub_groups"]:
command_group_info["sub_groups"][group_name] = {
"name": group_name,
"commands": {},
"sub_groups": {}
}
command_tree = command_tree["sub_info"]
command_group_info = command_group_info["sub_groups"][group_name]
else:
cmd_name = command_tree["cmd_name"]
command_rules = []
if cmd_name in command_group_info["commands"]:
command_rules = command_group_info["commands"][cmd_name]
command_rules.append(rule)
command_group_info["commands"][cmd_name] = command_rules
break
def export_meta_changes(self, only_break, output_type="text"):
ret_objs = []
ret_mod = {
"module_name": self.module_name,
"name": "az",
"commands": {},
"sub_groups": {}
}
for obj in self.diff_objs:
if only_break and not obj.is_break:
continue
if obj.is_ignore:
continue
ret = {}
for prop in self.EXPORTED_META_PROPERTY:
if hasattr(obj, prop):
ret[prop] = getattr(obj, prop)
ret["rule_name"] = obj.__class__.__name__
if output_type == "dict":
ret_objs.append(ret)
elif output_type == "text":
ret_objs.append(str(obj))
if output_type != "tree":
continue
if not hasattr(obj, "cmd_name") and not hasattr(obj, "subgroup_name"):
logger.info("unsupported rule ignored")
elif not hasattr(obj, "cmd_name") and hasattr(obj, "subgroup_name"):
self.fill_subgroup_rules(obj, ret_mod, ret)
elif not hasattr(obj, "subgroup_name") and hasattr(obj, "cmd_name"):
self.fill_cmd_rules(obj, ret_mod, ret)
else:
logger.info("unsupported rule ignored")
return ret_objs if output_type in ["text", "dict"] else ret_mod