in tooling/query-metrics-alerts/parse_analysis.py [0:0]
def main():
args = parseArguments()
resourceTypes = getResourceTypes(args.provider_list)
metricDefinitions = {}
if args.load_metrics:
with open(args.load_metrics, "r", encoding="utf-8") as file:
metricDefinitions = json.load(file)
else:
metricDefinitions = parseMetricFiles(args.parse_metrics)
outputToJsonFile(metricDefinitions, args.save_metrics)
# add metrics to resourceTypes
for key in metricDefinitions:
if not key in resourceTypes.keys():
print(f"Did not find resource type: {key}")
continue
resourceTypes[key]['metrics'] = metricDefinitions[key]
parseAnalysisFile(args.query_results, resourceTypes)
# remove metrics that don't have alerts
for rt in resourceTypes:
if not 'metrics' in resourceTypes[rt].keys(): continue
for metric in list(resourceTypes[rt]['metrics']):
if not 'alert' in resourceTypes[rt]['metrics'][metric].keys():
resourceTypes[rt]['metrics'].pop(metric)
dir = '../../services'
for rt in resourceTypes:
if not 'metrics' in resourceTypes[rt].keys(): continue
# sort metrics based on alert numRules descending if alerts exist
resourceTypes[rt]['metrics'] = dict(sorted(resourceTypes[rt]['metrics'].items(), key=lambda item: item[1]['alert']['numRules'], reverse=True))
for metric in resourceTypes[rt]["metrics"]:
if not 'alert' in resourceTypes[rt]["metrics"][metric].keys(): continue
if resourceTypes[rt]["metrics"][metric]["alert"]["numRules"] < args.threshold: continue
category = resourceTypes[rt]['category']
type = resourceTypes[rt]['type']
alert = resourceTypes[rt]["metrics"][metric]["alert"]
description = resourceTypes[rt]["metrics"][metric]["Description"]
# create directory based on category if it doesn't exist
category = resourceTypes[rt]['category'].replace('Microsoft.', '')
if not os.path.exists(os.path.join(dir, category, '_index.md')):
os.makedirs(os.path.join(dir, category), exist_ok=True)
with open(os.path.join(dir, category, '_index.md'), 'w+') as f:
f.write(f"---\ntitle: {category}\ngeekdocCollapseSection: true\ngeekdocHidden: false\n---\n")
# create directory based on type if it doesn't exist
subdir = type.split('/')[0]
if not os.path.exists(os.path.join(dir, category, subdir, '_index.md')):
os.makedirs(os.path.join(dir, category, subdir), exist_ok=True)
with open(os.path.join(dir, category, subdir, '_index.md'), 'w+') as f:
f.write(f"---\ntitle: {subdir}\ngeekdocCollapseSection: true\ngeekdocHidden: false\n---\n\n")
f.write('{{< alertList name="alertList" >}}')
# load existing yaml file if it exists
filename = os.path.join(dir, category, subdir, "alerts.yaml")
if not os.path.exists(filename):
mode = 'w'
else:
mode = 'r+'
with open(filename, mode) as f:
try:
data = yaml.load(f, Loader=yaml.FullLoader)
except:
data = []
# remove all alerts that have a tag of auto-generated and is not visible
for i in range(len(data)):
if data[i]['type'] != 'Metric': continue
if "tags" not in data[i].keys(): continue
if 'auto-generated' in data[i]['tags']:
if data[i]["visible"] == False:
data.pop(i)
break
addAlert = True
name = metric
# if type has more than one segment, slice off the first segment
if len(type.split('/')) > 1:
name = f"{'/'.join(type.split('/')[1:])} - {metric}"
# Find record where proerpites.metricName == metric
for i in range(len(data)):
if data[i]['type'] == 'Metric':
if data[i]['name'] == name:
data[i]['description'] = description
break
popped_alert = None
# find record where properties.metricName == metric and tag contains auto-generated
for i in range(len(data)):
if data[i]['type'] != 'Metric': continue
if "tags" not in data[i].keys(): continue
if data[i]['properties']['metricName'] == metric and 'auto-generated' in data[i]['tags']:
if data[i]['verified'] == False:
popped_alert = data.pop(i)
break
else:
addAlert = False
break
if addAlert:
# add alert to yaml file
new_alert = {
"name": name,
"description": description,
"type": "Metric",
"verified": False,
"visible": True,
"tags": ["auto-generated", f"agc-{alert['numRules']}"],
"properties": {
"metricName": metric,
"metricNamespace": f"{resourceTypes[rt]['category']}/{type}",
"severity": alert['severity'],
"windowSize": alert['windowSize'],
"evaluationFrequency": alert['frequency'],
"timeAggregation": alert['timeAggregation'].capitalize(),
"operator": formatOperator(alert['operator']),
"criterionType": formatCriterion(alert['criterionType']),
}
}
if popped_alert:
if 'references' in popped_alert.keys():
new_alert['references'] = popped_alert['references']
if 'dimensions' in alert.keys():
if alert['dimensions'] != '[]':
new_alert['properties']['dimensions'] = json.loads(alert['dimensions'])
if new_alert['properties']['criterionType'] == 'DynamicThresholdCriterion':
new_alert['properties']['failingPeriods'] = json.loads(alert['failingPeriods'])
new_alert['properties']['alertSensitivity'] = alert['alertSensitivity'].capitalize()
else:
new_alert['properties']['threshold'] = alert['threshold']
data.append(new_alert)
# write yaml file
outputToYamlFile(data, filename)
print(f"Adding alert defintion: resource: {category}/{type} metric: {metric} numRules: {alert['numRules']}")