tooling/query-metrics-alerts/parse_analysis.py (220 lines of code) (raw):

import json import yaml import argparse import os import pandas as pd # Parse command line arguments def parseArguments(): parser = argparse.ArgumentParser(description='This script analyzes the results of a MetricAlertsRules query.') parser.add_argument('-q', '--query-results', type=str, required=False, metavar='file', help='Path to analyze results', default='analysis_results.csv') parser.add_argument('-p', '--provider-list', type=str, required=False, metavar='file', help='Path to provider list', default='provider_list.json') parser.add_argument('-m', '--parse-metrics', type=str, required=False, metavar='dir', help='Path to Azure repo containing metric definitions', default="../../../azure-reference-other/azure-monitor-ref/supported-metrics/includes") parser.add_argument('-l', '--load-metrics', type=str, required=False, metavar='file', help='Path to metric definitions', default="metric_definitions.json") parser.add_argument('-s', '--save-metrics', type=str, required=False, metavar='file', help='Path to metric definitions', default="metric_definitions.json") parser.add_argument('-a', '--amba-dir', type=str, required=False, metavar='file', help='Path to metric definitions', default="../..") parser.add_argument('-t', '--threshold', type=str, required=False, metavar='file', help='Set minimum number for rules in order to be included in output', default=250) args = parser.parse_args() return args # Output the alerts to a JSON file def outputToJsonFile(data, filename): # Write the results to a file with open(filename, "w+") as f: json.dump(data, f, indent=2) # Output the query results to a JSON file def outputToYamlFile(data, filename): # Write the results to a file with open(filename, "w+") as f: yaml.dump(data, f, indent=2, default_flow_style=False, sort_keys=False) def getResourceTypes(file): with open(file, "r", encoding="utf-8") as file: providers = json.load(file) # Get a list of resource types from the provider list resourceTypes = {} for p in providers: category = p["namespace"] if (category.startswith('Microsoft')): for rt in p["resourceTypes"]: type = rt["resourceType"] resourceTypes[f"{category}/{type}".lower()] = { "category": category, "type": type } return resourceTypes def parseMetricFile(file): metrics = {} with open(file, 'r', encoding="utf-8") as f: lines = f.readlines() # Remove lines that don't begin with | lines = [line for line in lines if line.startswith('|')] # remove second line lines.pop(1) header = lines[0].split('|') header = [item.strip() for item in header if item.strip() != ''] for line in lines[1:]: line = line.split('|') line = line[1:-1] line = [item.strip() for item in line] metric = {} for i in range(len(header)): metric[header[i]] = line[i] metric['Description'] = metric.pop("Metric").split('<p>')[-1] key = metric['Name'].replace('`', '') metric.pop('Name') metrics[key] = metric return metrics def parseMetricFiles(dir): metricDefinitions = {} # Loop through each file in the directory for filename in os.listdir(dir): if filename.endswith(".md"): category = '.'.join(filename.split('-')[0:2]) type = '/'.join(filename.split('-')[2:-2]) file = os.path.join(dir, filename) metrics = parseMetricFile(file) # Check if category and type are in resourceTypes case insensitive key = f"{category}/{type}".lower() metricDefinitions[key] = metrics return metricDefinitions def parseAnalysisFile(file, resourceTypes): # Read the CSV file using Pandas alerts = pd.read_csv(file, header=0).to_dict('records') for alert in alerts: type = alert['resourceType'] metric = alert['metricName'] # Check if the metric is in the metrics list for each resource type if type in resourceTypes.keys(): if 'metrics' in resourceTypes[type].keys(): # find key in metrics that matches lowercase metric name key = [k for k in resourceTypes[type]['metrics'].keys() if k.lower() == metric.lower()] if len(key) > 0: # drop first keys in alert alert.pop('resourceType') alert.pop('metricName') resourceTypes[type]['metrics'][key[0]]['alert'] = alert else: print(f"Did not find metric: {metric} in {type}") def formatOperator(value): match value.lower(): case 'lessthan': return 'LessThan' case 'lessthanorequal': return 'LessThanOrEqual' case 'greaterthan': return 'GreaterThan' case 'greaterthanorequal': return 'GreaterThanOrEqual' case 'greaterorlessthan': return 'GreaterOrLessThan' case 'equal': return 'Equal' case _: return value def formatCriterion(value): match value.lower(): case 'staticthresholdcriterion': return 'StaticThresholdCriterion' case 'dynamicthresholdcriterion': return 'DynamicThresholdCriterion' case _: return value def main(): args = parseArguments() resourceTypes = getResourceTypes(args.provider_list) metricDefinitions = {} if args.load_metrics: with open(args.load_metrics, "r", encoding="utf-8") as file: metricDefinitions = json.load(file) else: metricDefinitions = parseMetricFiles(args.parse_metrics) outputToJsonFile(metricDefinitions, args.save_metrics) # add metrics to resourceTypes for key in metricDefinitions: if not key in resourceTypes.keys(): print(f"Did not find resource type: {key}") continue resourceTypes[key]['metrics'] = metricDefinitions[key] parseAnalysisFile(args.query_results, resourceTypes) # remove metrics that don't have alerts for rt in resourceTypes: if not 'metrics' in resourceTypes[rt].keys(): continue for metric in list(resourceTypes[rt]['metrics']): if not 'alert' in resourceTypes[rt]['metrics'][metric].keys(): resourceTypes[rt]['metrics'].pop(metric) dir = '../../services' for rt in resourceTypes: if not 'metrics' in resourceTypes[rt].keys(): continue # sort metrics based on alert numRules descending if alerts exist resourceTypes[rt]['metrics'] = dict(sorted(resourceTypes[rt]['metrics'].items(), key=lambda item: item[1]['alert']['numRules'], reverse=True)) for metric in resourceTypes[rt]["metrics"]: if not 'alert' in resourceTypes[rt]["metrics"][metric].keys(): continue if resourceTypes[rt]["metrics"][metric]["alert"]["numRules"] < args.threshold: continue category = resourceTypes[rt]['category'] type = resourceTypes[rt]['type'] alert = resourceTypes[rt]["metrics"][metric]["alert"] description = resourceTypes[rt]["metrics"][metric]["Description"] # create directory based on category if it doesn't exist category = resourceTypes[rt]['category'].replace('Microsoft.', '') if not os.path.exists(os.path.join(dir, category, '_index.md')): os.makedirs(os.path.join(dir, category), exist_ok=True) with open(os.path.join(dir, category, '_index.md'), 'w+') as f: f.write(f"---\ntitle: {category}\ngeekdocCollapseSection: true\ngeekdocHidden: false\n---\n") # create directory based on type if it doesn't exist subdir = type.split('/')[0] if not os.path.exists(os.path.join(dir, category, subdir, '_index.md')): os.makedirs(os.path.join(dir, category, subdir), exist_ok=True) with open(os.path.join(dir, category, subdir, '_index.md'), 'w+') as f: f.write(f"---\ntitle: {subdir}\ngeekdocCollapseSection: true\ngeekdocHidden: false\n---\n\n") f.write('{{< alertList name="alertList" >}}') # load existing yaml file if it exists filename = os.path.join(dir, category, subdir, "alerts.yaml") if not os.path.exists(filename): mode = 'w' else: mode = 'r+' with open(filename, mode) as f: try: data = yaml.load(f, Loader=yaml.FullLoader) except: data = [] # remove all alerts that have a tag of auto-generated and is not visible for i in range(len(data)): if data[i]['type'] != 'Metric': continue if "tags" not in data[i].keys(): continue if 'auto-generated' in data[i]['tags']: if data[i]["visible"] == False: data.pop(i) break addAlert = True name = metric # if type has more than one segment, slice off the first segment if len(type.split('/')) > 1: name = f"{'/'.join(type.split('/')[1:])} - {metric}" # Find record where proerpites.metricName == metric for i in range(len(data)): if data[i]['type'] == 'Metric': if data[i]['name'] == name: data[i]['description'] = description break popped_alert = None # find record where properties.metricName == metric and tag contains auto-generated for i in range(len(data)): if data[i]['type'] != 'Metric': continue if "tags" not in data[i].keys(): continue if data[i]['properties']['metricName'] == metric and 'auto-generated' in data[i]['tags']: if data[i]['verified'] == False: popped_alert = data.pop(i) break else: addAlert = False break if addAlert: # add alert to yaml file new_alert = { "name": name, "description": description, "type": "Metric", "verified": False, "visible": True, "tags": ["auto-generated", f"agc-{alert['numRules']}"], "properties": { "metricName": metric, "metricNamespace": f"{resourceTypes[rt]['category']}/{type}", "severity": alert['severity'], "windowSize": alert['windowSize'], "evaluationFrequency": alert['frequency'], "timeAggregation": alert['timeAggregation'].capitalize(), "operator": formatOperator(alert['operator']), "criterionType": formatCriterion(alert['criterionType']), } } if popped_alert: if 'references' in popped_alert.keys(): new_alert['references'] = popped_alert['references'] if 'dimensions' in alert.keys(): if alert['dimensions'] != '[]': new_alert['properties']['dimensions'] = json.loads(alert['dimensions']) if new_alert['properties']['criterionType'] == 'DynamicThresholdCriterion': new_alert['properties']['failingPeriods'] = json.loads(alert['failingPeriods']) new_alert['properties']['alertSensitivity'] = alert['alertSensitivity'].capitalize() else: new_alert['properties']['threshold'] = alert['threshold'] data.append(new_alert) # write yaml file outputToYamlFile(data, filename) print(f"Adding alert defintion: resource: {category}/{type} metric: {metric} numRules: {alert['numRules']}") if __name__ == "__main__": main()