azure-cli-diff-tool/azure_cli_diff_tool/utils.py (222 lines of code) (raw):

# ----------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ----------------------------------------------------------------------------- import os import json import configparser import requests import re import csv import logging from enum import Enum from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from ._const import CONFIG_FILE_PATH, CHANGE_RULE_MESSAGE_MAPPING, CHANGE_SUGGEST_MESSAGE_MAPPING, \ EXPORTED_CSV_META_HEADER, DOWNLOAD_THREADS logger = logging.getLogger(__name__) MODULE_NAME_PATTERN = re.compile(r"az_([a-zA-Z0-9\-\_]+)_meta.json") SUBGROUP_NAME_PATTERN = re.compile(r"\[\'sub_groups\'\]\[\'([a-zA-Z0-9\-\s]+)\'\]") CMD_NAME_PATTERN = re.compile(r"\[\'commands\'\]\[\'([a-zA-Z0-9\-\s]+)\'\]") CMD_PARAMETER_PROPERTY_PATTERN = re.compile(r"\[(.*?)\]") def load_blob_config_file(): config = configparser.ConfigParser() config.read(CONFIG_FILE_PATH) return config def get_blob_config(config): blob_url = config.get("BLOB", "primary_endpoint") meta_path_prefix = config.get("BLOB", "metadata_path_prefix") index_file = config.get("BLOB", "metadata_module_index_file") return blob_url, meta_path_prefix, index_file def get_change_rule_template(rule_id="1000"): """ Return the rule message template""" return CHANGE_RULE_MESSAGE_MAPPING.get(rule_id, "Non applicable") def get_change_suggest_template(rule_id="1000"): """ Return the change suggest message template""" return CHANGE_SUGGEST_MESSAGE_MAPPING.get(rule_id, "Non applicable") class ChangeType(int, Enum): DEFAULT = 0 ADD = 1 CHANGE = 2 REMOVE = 3 class DiffLevel(int, Enum): DEFAULT = 0 INFO = 1 WARN = 2 BREAK = 3 def get_command_tree(command_name): """ input: monitor log-profiles create ret: { is_group: True, group_name: 'monitor', sub_info: { is_group: True, group_name: 'monitor log-profiles', sub_info: { is_group: False, cmd_name: 'monitor log-profiles create' } } } """ name_arr = command_name.split() ret = {} name_arr.reverse() for i, _ in enumerate(name_arr): tmp = {} if i == 0: tmp = { "is_group": False, "cmd_name": " ".join(name_arr[::-1]) } else: tmp = { "is_group": True, "group_name": " ".join(name_arr[len(name_arr): (i - 1): -1]), "sub_info": ret } ret = tmp return ret def module_meta_file_downloader(meta_file_url, meta_file_save_path, module_file, use_cache): if use_cache and os.path.exists(meta_file_save_path): print("Using cached {0} for {1}".format(meta_file_save_path, module_file)) return print("Downloading {0} for {1}".format(meta_file_url, module_file)) try: res = requests.get(meta_file_url) with open(meta_file_save_path, "w") as f: f.write(json.dumps(res.json(), indent=4)) except Exception as e: print(str(e)) def get_target_version_modules(blob_url, path_prefix, index_file, version, use_cache=False): version_meta_path = path_prefix + version version_meta_index_file = blob_url + "/" + version_meta_path + "/" + index_file version_meta_module_file_list = [] try: res = requests.get(version_meta_index_file) module_file_list = res.text.split("\n") version_meta_folder = os.getcwd() + "/" + version_meta_path if not os.path.exists(version_meta_folder): os.makedirs(version_meta_folder) version_meta_module_file_list = [(blob_url + "/" + version_meta_path + "/" + module_file, version_meta_folder + "/" + module_file, module_file) for module_file in module_file_list if module_file] with ThreadPoolExecutor(max_workers=DOWNLOAD_THREADS) as pool: download_module_jobs = [pool.submit(module_meta_file_downloader, _url, _save_path, module_file, use_cache) for _url, _save_path, module_file in version_meta_module_file_list] wait(download_module_jobs, return_when=ALL_COMPLETED) except Exception as e: print(str(e)) finally: return version_meta_module_file_list def get_target_version_module(blob_url, path_prefix, version, target_module, use_cache=False): version_meta_path = path_prefix + version module_file = "az_" + target_module + "_meta.json" version_meta_module_file_url = blob_url + "/" + version_meta_path + "/" + module_file version_meta_folder = os.getcwd() + "/" + version_meta_path if not os.path.exists(version_meta_folder): os.makedirs(version_meta_folder) version_meta_module_file_save_path = version_meta_folder + "/" + module_file try: module_meta_file_downloader(version_meta_module_file_url, version_meta_module_file_save_path, module_file, use_cache) except Exception as e: print(str(e)) finally: return [(version_meta_module_file_url, version_meta_module_file_save_path, module_file)] def extract_module_name_from_meta_file(file_name): name_res = re.findall(MODULE_NAME_PATTERN, file_name) if not name_res or len(name_res) == 0: return None return name_res[0] def extract_subgroup_name(key): subgroup_ame_res = re.findall(SUBGROUP_NAME_PATTERN, key) if not subgroup_ame_res or len(subgroup_ame_res) == 0: return False, None return True, subgroup_ame_res[-1] def extract_subgroup_property(key, subgroup_name): subgroup_key_pattern = re.compile(subgroup_name + r"\'\]\[\'([a-zA-Z0-9\-\_]+)\'\]") subgroup_key_res = re.findall(subgroup_key_pattern, key) if not subgroup_key_res or len(subgroup_key_res) == 0: return False, None return True, subgroup_key_res[0] def extract_subgroup_deprecate_property(key, deprecate_key): subgroup_deprecate_key_pattern = re.compile(deprecate_key + r"\'\]\[\'([a-zA-Z0-9\-\_]+)\'\]") subgroup_deprecate_key_res = re.findall(subgroup_deprecate_key_pattern, key) if not subgroup_deprecate_key_res or len(subgroup_deprecate_key_res) == 0: return False, None return True, subgroup_deprecate_key_res[0] def extract_cmd_name(key): cmd_name_res = re.findall(CMD_NAME_PATTERN, key) if not cmd_name_res or len(cmd_name_res) == 0: return False, None return True, cmd_name_res[0] def extract_cmd_property(key, cmd_name): cmd_key_pattern = re.compile(cmd_name + r"\'\]\[\'([a-zA-Z0-9\-\_]+)\'\]") cmd_key_res = re.findall(cmd_key_pattern, key) if not cmd_key_res or len(cmd_key_res) == 0: return False, None return True, cmd_key_res[0] def extract_cmd_deprecate_property(key, deprecate_key): cmd_deprecate_key_pattern = re.compile(deprecate_key + r"\'\]\[\'([a-zA-Z0-9\-\_]+)\'\]") cmd_deprecate_key_res = re.findall(cmd_deprecate_key_pattern, key) if not cmd_deprecate_key_res or len(cmd_deprecate_key_res) == 0: return False, None return True, cmd_deprecate_key_res[0] def extract_para_info(key): parameters_ind = key.find("['parameters']") property_ind = key.find("[", parameters_ind + 1) property_res = re.findall(CMD_PARAMETER_PROPERTY_PATTERN, key[property_ind:]) if not property_res: return None return property_res def export_meta_changes_to_json(output, output_file): if not output_file: return output output_file_folder = os.path.dirname(output_file) if output_file_folder and not os.path.exists(output_file_folder): os.makedirs(output_file_folder) with open(output_file, "w") as f_out: if output: f_out.write(json.dumps(output, indent=4)) return None def format_module_diff_csv(module_diffs): csv_res = [EXPORTED_CSV_META_HEADER] for diff_obj in module_diffs: _row = [] for attr in EXPORTED_CSV_META_HEADER: if attr == "cmd_name": _row.append(diff_obj.get(attr, None) or diff_obj.get("subgroup_name", "-")) else: _row.append(diff_obj.get(attr, None)) csv_res.append(_row) return csv_res def export_meta_changes_to_csv(module_diffs, version_diff_file): csv_res = format_module_diff_csv(module_diffs) if not version_diff_file: return csv_res diff_file_folder = os.path.dirname(version_diff_file) if diff_file_folder and not os.path.exists(diff_file_folder): os.makedirs(diff_file_folder) with open(version_diff_file, "w", newline='') as f: writer = csv.writer(f) writer.writerows(csv_res) return None def format_module_diff_dict(module_diffs): dict_res = [] for diff_obj in module_diffs: _row = {} for attr in EXPORTED_CSV_META_HEADER: if attr == "cmd_name": _row['cmd_name'] = diff_obj.get(attr, None) or diff_obj.get("subgroup_name", "-") else: _row[attr] = diff_obj.get(attr, None) dict_res.append(_row) return dict_res def export_meta_changes_to_dict(module_diffs, version_diff_file): dict_res = format_module_diff_dict(module_diffs) if not version_diff_file: return dict_res diff_file_folder = os.path.dirname(version_diff_file) if diff_file_folder and not os.path.exists(diff_file_folder): os.makedirs(diff_file_folder) with open(version_diff_file, "w", newline='') as f: writer = csv.writer(f) writer.writerows(dict_res) return None def expand_deprecate_obj(meta_obj): if type(meta_obj) is not dict: return for key, obj in meta_obj.items(): if type(obj) is dict: if obj.get("deprecate_info", None): for dekey, devalue in obj["deprecate_info"].items(): obj["deprecate_info_" + dekey] = devalue del obj["deprecate_info"] expand_deprecate_obj(obj) elif type(obj) is list: for ind, item in enumerate(obj): if type(item) is dict and item.get("deprecate_info", None): for dekey, devalue in item["deprecate_info"].items(): obj[ind]["deprecate_info_" + dekey] = devalue del item["deprecate_info"] expand_deprecate_obj(item)