scripts/create_master_checklist.py (419 lines of code) (raw):

###################################################################### # # This script combines all of the existing checklists into one big # checklist, and saves it in JSON and XLSX (macrofree) formats. # # Example usage: # python3 ./scripts/create_master_checklist.py \ # --waf \ # --input-folder="./checklists" \ # --language="en" \ # --excel-file="./spreadsheet/macrofree/review_checklist_master_empty.xlsx" \ # --output-name="checklist.en.master" \ # --json-output-folder="./checklists/" \ # --xlsx-output-folder="./spreadsheet/macrofree/" # # Last updated: March 2022 # ###################################################################### import json import argparse import sys import os import requests import glob import datetime from openpyxl import load_workbook from openpyxl.worksheet.datavalidation import DataValidation from openpyxl.worksheet.table import Table from openpyxl.utils import get_column_letter # Get input arguments parser = argparse.ArgumentParser(description='Update a checklist spreadsheet with JSON-formatted Azure Resource Graph results') parser.add_argument('--input-folder', dest='input_folder', action='store', help='Input folder where the checklists to merge are stored') parser.add_argument('--language', dest='language', action='store', default='en', help='if checklist files are specified, ignore the non-English ones and only generate a spreadsheet for the English version (default: False)') parser.add_argument('--excel-file', dest='excel_file', action='store', help='You need to supply an Excel file that will be taken as template to create the XLSX file with the checklist') parser.add_argument('--json-output-folder', dest='json_output_folder', action='store', help='Folder where to store the JSON output') parser.add_argument('--xlsx-output-folder', dest='xlsx_output_folder', action='store', help='Folder where to store the macro free Excel output') parser.add_argument('--output-name', dest='output_name', action='store', help='File name (without extension) for the output files (.json and .xlsx extensions will be added automatically)') parser.add_argument('--add-services', dest='add_services', action='store_true', default=False, help='If services field should be added to the checklist items (default: False)') parser.add_argument('--add-arm-services', dest='add_arm_services', action='store_true', default=False, help='If arm-service field should be added to the checklist items (default: False)') parser.add_argument('--service-dictionary', dest='service_dictionary', action='store', help='JSON file with dictionary to map services to ARM services') parser.add_argument('--no-excel', dest='no_excel', action='store_true', default=False, help='If a macrofree Excel spreadsheet should not be generated') parser.add_argument('--no-json', dest='no_json', action='store_true', default=False, help='If a JSON file should not be generated') parser.add_argument('--no-data-validation', dest='no_data_validation', action='store_true', default=False, help='If data validation should be skipped when updating the Excel spreadsheet') parser.add_argument('--no-links', dest='no_links', action='store_true', default=False, help='If hyperlinks should be skipped when updating the Excel spreadsheet') parser.add_argument('--stats', dest='stats', action='store_true', default=False, help='If statistics about the newly generated checklist should be displayed') parser.add_argument('--show-service', dest='show_service', action='store', help='If you want to print on screen the checks corresponding to a given service (e.g. "VM", or "none")') parser.add_argument('--print-random', dest='print_random', action='store', default=0, type=int, help='Print a random list of items on screen (default is 0)') parser.add_argument('--waf', dest='waf', action='store_true', default=False, help='Only include WAF recommendations (default: False)') parser.add_argument('--verbose', dest='verbose', action='store_true', default=False, help='Run in verbose mode (default: False)') args = parser.parse_args() # Function to exclude certain checklists from being merged def checklist_is_valid(checklist_name, language): invalid_checklists = ["waf." + language + ".json"] for invalid_checklist in invalid_checklists: try: # Check the ending of the checklist_name (might include file path) and ignore casing if checklist_name[len(invalid_checklist):].lower() == invalid_checklist.lower(): return False except: pass return True # Get the ARM service name from the service name def get_arm_service_name(service_name, service_dictionary=None): svc_match_found = False for svc in service_dictionary: if service_name in svc['names']: svc_match_found = True return svc['arm'] if not svc_match_found: return None # Inspect a string and return a list of the services to which that string is related to def get_services_from_string(input_string): service_dict = { "AppSvc": ["App Service", "webapp"], "ExpressRoute": ["ExpressRoute", "Gateway Subnet"], "VPN": ["VPN", "Point-to-Site", "Site-to-Site", "Gateway Subnet"], "FrontDoor": ["Front Door", "FrontDoor"], "AppGW": ["Application Gateway", "AppGW", "AGIC"], "SQL": ["SQL"], "AVD": ["AVD", "Virtual Desktop", "WVD", "MSIX"], "AKS": ["AKS", "Kubernetes"], "AVS": ["AVS", "Azure VMware Solution", "VMware"], "Firewall": ["Azure Firewall", "Firewall Manager"], "NVA": ["NVA", "Network Virtual Appliance"], "Bastion": ["Bastion"], "SAP": ["SAP"], "VM": ["VM ", "VM.", "VM'", "VMs", "Virtual Machine"], # Characters in 'VMx' is to avoid matching 'VMware' "Storage": ["Storage", "Blob", "File", "Queue", "Table", "CORS"], "ACR": ["ACR", "Registry"], "AKV": ["AKV", "Key Vault", "Secrets", "Keys", "Certificates"], "ServiceBus": ["Service Bus", "ASB", "Queue", "Topic", "Relay"], "EventHubs": ["Event Hubs", "EventHubs", "Event Hub", "EH"], "CosmosDB": ["Cosmos DB", "CosmosDB"], "SAP": ["SAP"], "Sentinel": ["Sentinel"], "Entra": ["Entra", "AAD", "Azure AD", "Azure Active Directory", "PIM", "JIT", "Privileged Identity Management", "Just in Time", "Conditional Access", "MFA", "2FA", "Identity", "Identities", "B2B", "B2C"], "DDoS": ["DDoS", "Denial of Service"], "LoadBalancer": ["Load Balancer", "LB", "ILB", "SLB"], "DNS": ["DNS", "Domain Name System"], "TrafficManager": ["Traffic Manager", "TM"], "VNet": ["VNet", "Virtual Network", "NSG", "Network Security Group", "UDR", "User Defined Route", "IP Plan", "hub-and-spoke", "subnet"], "Defender": ["Defender", "Security Center"], "Subscriptions": ["Subscriptions", "Subscription", "Management Group"], "VWAN": ["VWAN", "Virtual WAN"], "ARS": ["ARS", "Route Server"], "Monitor": ["Monitor", "Log Analytics", "LogAnalytics", "Metrics", "Alerts"], "NetworkWatcher": ["Network Watcher", "NetworkWatcher", "Connection Monitor", "Flow logs"], "Arc": ["Arc ", "Arc-"], # Otherwise it matches 'Architecture' "RBAC": ["RBAC", "role"], "Backup": ["Backup"], "ASR": ["ASR", "Site Recovery", "Disaster Recovery"], "AzurePolicy": ["Azure Policy", "Policy", "Policies"], "APIM": ["APIM", "API Management"], "AppProxy": ["App Proxy", "AppProxy"], "WAF": ["WAF", "Web Application Firewall"], "PrivateLink": ["Private Link", "PrivateLink", "Private Endpoint"], "Cost": ["Cost", "Budget"], } services = [] for service in service_dict: for keyword in service_dict[service]: if keyword.lower() in input_string.lower(): services.append(service) return list(set(services)) # If metadata does not contain 'waf' key, return false # If metadata does contain 'waf' key and its value is valid, return true # Otherwise, return false def contains_waf(checklist_metadata): if "waf" not in checklist_metadata: return False elif checklist_metadata["waf"].lower() in ['all', 'reliability', 'security', 'performance', 'cost', 'operations']: return True else: return False # Consolidate all checklists into one big checklist object def get_consolidated_checklist(input_folder, language, service_dictionary=None): # Initialize checklist object checklist_master_data = { 'items': [], 'metadata': { 'name': 'Master checklist', 'timestamp': datetime.date.today().strftime("%B %d, %Y") } } if args.waf: checklist_master_data['metadata']['name'] = 'WAF checklist' # Find all files in the input folder matching the pattern "language*.json" if args.verbose: print("DEBUG: looking for JSON files in folder", input_folder, "with pattern *.", language + ".json...") checklist_files = glob.glob(input_folder + "/*." + language + ".json") if args.verbose: print("DEBUG: found", len(checklist_files), "JSON files") for checklist_file in checklist_files: if checklist_is_valid(checklist_file, language): # Get JSON try: with open(checklist_file) as f: checklist_data = json.load(f) if args.verbose: print("DEBUG: JSON file", checklist_file, "loaded successfully with {0} items".format(len(checklist_data["items"]))) # Verify that the checklist is not deprecated if "metadata" in checklist_data and "state" in checklist_data["metadata"] and "deprecated" in checklist_data["metadata"]["state"].lower(): if args.verbose: print("DEBUG: skipping deprecated checklist", checklist_file) else: # Additional check if we are only interested in WAF recommendations: # If the WAF argument was provided, only checklists with WAF attribute containing a valid value will be processed if not args.waf or contains_waf(checklist_data["metadata"]): # Go over each checklist item for item in checklist_data["items"]: # Add field with the name of the checklist item["checklist"] = checklist_data["metadata"]["name"] # Cleanup some fields item.pop("id", None) item.pop("cost", None) item.pop("simple", None) item.pop("ha", None) item.pop("scale", None) item.pop("security", None) if args.waf: item.pop("category", None) item.pop("subcategory", None) # Additional check if we are only interested in WAF recommendations: only items with WAF pillar and service will be added if not args.waf or ("waf" in item and "service" in item): # Add items to the master checklist checklist_master_data['items'] += [item] # Replace the master checklist severities and status sections (for a given language they should be all the same) checklist_master_data['severities'] = checklist_data['severities'] checklist_master_data['status'] = checklist_data['status'] except Exception as e: print("ERROR: Error when processing JSON file", checklist_file, "-", str(e)) # Optionally, browse the checklist items and add the services field if args.add_services and not args.waf: for item in checklist_master_data["items"]: # Get service from the checklist name services = [] if "checklist" in item: services += get_services_from_string(item["checklist"]) if "text" in item: services += get_services_from_string(item["text"]) if "category" in item: services += get_services_from_string(item["category"]) if "subcategory" in item: services += get_services_from_string(item["subcategory"]) if "description" in item: services += get_services_from_string(item["description"]) item["services"] = list(set(services)) # Optionally, browse the checklist items and add the ARM service field if args.add_arm_services and args.waf and service_dictionary: for item in checklist_master_data["items"]: arm_service = get_arm_service_name(item["service"], service_dictionary=service_dictionary) if arm_service: item["arm-service"] = arm_service if args.verbose: print("DEBUG: master checklist contains", len(checklist_master_data["items"]), "items") return checklist_master_data # Print statistics about the checklist def print_stats(checklist): print("INFO: Number of checks:", len(checklist["items"])) print("INFO: Number of categories:", len(set([item["category"] for item in checklist["items"]]))) print("INFO: Number of items with no GUID:", len([item for item in checklist["items"] if "guid" not in item])) if args.add_services: print("INFO: Number of services:", len(set([service for item in checklist["items"] for service in item["services"]]))) print("INFO: Number of items with no services:", len([item for item in checklist["items"] if len(item["services"]) == 0])) items = [] if args.show_service: if args.verbose: print ("DEBUG: Getting items for service", args.show_service, "...") if args.show_service == "none": items = [item for item in checklist["items"] if len(item["services"]) == 0] else: items = [item for item in checklist["items"] if args.show_service.lower() in [x.lower() for x in item["services"]]] for item in items: print_item(item) # Print on screen a given checklist item in a single line with fixed field widths def print_item(item): text= item["text"] cat = item["category"] subcat = item["subcategory"] id = item["id"] if "id" in item else "" checklist = item["checklist"] if "checklist" in item else "" svcs = str(item["services"]) if "services" in item else "" print("{0: <25.25} {1: <10.10} {2: <25.25} {3: <25.25} {4: <80.80} {5: <25.25}".format(checklist, id, cat, subcat, text, svcs)) # Dump JSON object to file def dump_json_file(json_object, filename): if args.verbose: print("DEBUG: dumping JSON object to file", filename) json_string = json.dumps(json_object, sort_keys=True, ensure_ascii=False, indent=4, separators=(',', ': ')) with open(filename, 'w', encoding='utf-8') as f: f.write(json_string) f.close() # Format string so that it is compatible with Excel def format4excel(input_string): # Remove equals sign at the beginning of the string if input_string and input_string[0] == "=": input_string = input_string[1:] # Return formatted string return input_string # Create macro-free Excel file with the checklist def update_excel_file(input_excel_file, output_excel_file, checklist_data): # Constants worksheet_checklist_name = 'Checklist' row1 = 8 # First row after which the Excel spreadsheet will be updated col_checklist_name = "A" row_checklist_name = "4" guid_column_index = "M" comment_column_index = "I" sample_cell_index = 'A4' # WAF checklists do not have category and subcategory if args.waf: col_checklist="A" col_waf_pillar = "B" col_services = "C" col_check = "D" col_desc = "E" col_sev = "F" col_status = "G" col_comment = "H" col_link = "I" col_training = "J" col_arg = "K" col_guid = "L" else: col_checklist="A" col_area = "B" col_subarea = "C" col_waf_pillar = "D" col_services = "E" col_check = "F" col_desc = "G" col_sev = "H" col_status = "I" col_comment = "J" col_link = "K" col_training = "L" col_arg = "M" col_guid = "N" info_link_text = 'More info' training_link_text = 'Training' worksheet_values_name = 'Values' values_row1 = 2 col_values_severity = "A" col_values_status = "B" col_values_area = "C" col_values_description = "H" last_column = col_guid # Load workbook try: wb = load_workbook(filename = input_excel_file) if args.verbose: print("DEBUG: workbook", input_excel_file, "opened successfully") except Exception as e: print("ERROR: Error when opening Excel file", input_excel_file, "-", str(e)) sys.exit(1) # Get worksheet try: ws = wb[worksheet_checklist_name] if args.verbose: print("DEBUG: worksheet", worksheet_checklist_name, "selected successfully") except Exception as e: print("ERROR: Error when selecting worksheet", worksheet_checklist_name, "-", str(e)) sys.exit(1) # Set checklist name try: ws[col_checklist_name + row_checklist_name] = checklist_data["metadata"]["name"] if args.verbose: print("DEBUG: starting filling the Excel spreadsheet with the values of checklist '{0}'".format(checklist_data["metadata"]["name"])) except Exception as e: print("ERROR: Error when selecting worksheet", worksheet_checklist_name, "-", str(e)) sys.exit(1) # Get default status from the JSON, default to "Not verified" try: status_list = checklist_data.get("status") default_status = status_list[0].get("name") if args.verbose: print ("DEBUG: default status retrieved from checklist: '{0}'".format(default_status)) except: default_status = "Not verified" if args.verbose: print ("DEBUG: Using default status 'Not verified'") pass # For each checklist item, add a row to spreadsheet row_counter = row1 for item in checklist_data.get("items"): # Read variables from JSON checklist_name = format4excel(item.get("checklist")) guid = format4excel(item.get("guid")) if not args.waf: category = format4excel(item.get("category")) subcategory = format4excel(item.get("subcategory")) waf_pillar = format4excel(item.get("waf")) text = format4excel(item.get("text")) description = format4excel(item.get("description")) severity = format4excel(item.get("severity")) link = format4excel(item.get("link")) training = format4excel(item.get("training")) status = default_status graph_query = format4excel(item.get("graph")) if args.waf: services = format4excel(item.get("service")) else: # Transform services array in a comma-separated string services = "" if "services" in item: for service in item["services"]: if len(services) > 0: services += ", " services += service # Update Excel ws[col_checklist + str(row_counter)].value = checklist_name if not args.waf: ws[col_area + str(row_counter)].value = category ws[col_subarea + str(row_counter)].value = subcategory ws[col_waf_pillar + str(row_counter)].value = waf_pillar ws[col_services + str(row_counter)].value = services ws[col_check + str(row_counter)].value = text ws[col_desc + str(row_counter)].value = description ws[col_sev + str(row_counter)].value = severity ws[col_status + str(row_counter)].value = status if not args.no_links: if link: ws[col_link + str(row_counter)].value = info_link_text ws[col_link + str(row_counter)].hyperlink = link ws[col_link + str(row_counter)].style = "Hyperlink" # ws[col_link + str(row_counter)].value = '=HYPERLINK("{}", "{}")'.format(link, info_link_text) if training: ws[col_training + str(row_counter)].value = training_link_text ws[col_training + str(row_counter)].value = training ws[col_training + str(row_counter)].style = "Hyperlink" # ws[col_training + str(row_counter)].value = '=HYPERLINK("{}", "{}")'.format(training, training_link_text) ws[col_arg + str(row_counter)].value = graph_query ws[col_guid + str(row_counter)].value = guid # Next row row_counter += 1 # Create table # Corrupts file!!!! # table_ref = "A" + str(row1 - 1) + ":" + last_column + str(row_counter - 1) # if args.verbose: # print("DEBUG: creating table for range {0}...".format(table_ref)) # table = Table(displayName="Checklist", ref=table_ref) # ws.add_table(table) # Get number of checks number_of_checks = row_counter - row1 # Display summary if args.verbose: print("DEBUG:", str(number_of_checks), "checks added to Excel spreadsheet") # Get worksheet try: wsv = wb[worksheet_values_name] if args.verbose: print("DEBUG: worksheet", worksheet_values_name, "selected successfully") except Exception as e: print("ERROR: Error when selecting worksheet", worksheet_values_name, "-", str(e)) sys.exit(1) # Update status row_counter = values_row1 for item in checklist_data.get("status"): status = item.get("name") description = item.get("description") wsv[col_values_status + str(row_counter)].value = status wsv[col_values_description + str(row_counter)].value = description row_counter += 1 # Display summary if args.verbose: print("DEBUG:", str(row_counter - values_row1), "statuses added to Excel spreadsheet") # Update severities row_counter = values_row1 for item in checklist_data.get("severities"): severity = item.get("name") wsv[col_values_severity + str(row_counter)].value = severity row_counter += 1 # Display summary if args.verbose: print("DEBUG:", str(row_counter - values_row1), "severities added to Excel spreadsheet") # Data validation # UserWarning: Data Validation extension is not supported and will be removed!!!! # dv = DataValidation(type="list", formula1='=Values!$B$2:$B$6', allow_blank=True, showDropDown=True) if not args.no_data_validation: dv = DataValidation(type="list", formula1='=Values!$B$2:$B$6', allow_blank=True) rangevar = col_status + str(row1) +':' + col_status + str(row1 + number_of_checks) if args.verbose: print("DEBUG: adding data validation to range", rangevar) dv.add(rangevar) ws.add_data_validation(dv) # Close book if args.verbose: print("DEBUG: saving workbook", output_excel_file) try: wb.save(output_excel_file) except Exception as e: print("ERROR: Error when saving Excel file to", output_excel_file, "-", str(e)) sys.exit(1) ######## # Main # ######## # Load service dictionary, if specified: service_dictionary = None if args.service_dictionary: try: with open(args.service_dictionary) as f: service_dictionary = json.load(f) if args.verbose: print("DEBUG: service dictionary loaded successfully") except Exception as e: print("ERROR: Error when loading service dictionary from", args.service_dictionary, "-", str(e)) sys.exit(1) # Download checklist if args.input_folder: # Get consolidated checklist checklist_master_data = get_consolidated_checklist(args.input_folder, args.language, service_dictionary=service_dictionary) # Dump master checklist to JSON file if not args.no_json: json_output_file = os.path.join(args.json_output_folder, args.output_name + ".json") dump_json_file(checklist_master_data, json_output_file) # Update spreadsheet if not args.no_excel: xlsx_output_file = os.path.join(args.xlsx_output_folder, args.output_name + ".xlsx") update_excel_file(args.excel_file, xlsx_output_file, checklist_master_data) # Print random items if args.print_random > 0: import random random_items = random.sample(checklist_master_data["items"], int(args.print_random)) for item in random_items: print_item(item) # Show statistics if args.stats: print_stats(checklist_master_data) else: print("ERROR: No input folder specified")