tooling/generate-templates/generate-templates.py (263 lines of code) (raw):
import os
import yaml
import json
import argparse
# Parse command line arguments
def parseArguments():
parser = argparse.ArgumentParser(description='This script will generate ARM and Bicep templates for each alert.')
parser.add_argument('-p', '--path', type=str, required=False, metavar='path', help='Path to services directory', default='../../services')
parser.add_argument('-o', '--output', type=str, required=False, metavar='output', help='Path to output directory', default='../../services')
parser.add_argument('-s', '--template_path', type=str, required=False, metavar='template_path', help='Path to templates directory', default='templates')
parser.add_argument('-t', '--telemetry_pid', type=str, required=False, metavar='telemetry_pid', help='Telemetry PID', default='pid-8bb7cf8a-bcf7-4264-abcb-703ace2fc84d')
args = parser.parse_args()
return args
def readYamlData(dir, export_hidden):
# Walk the directory tree and load all the alerts.yaml files
# into a list of dictionaries using the folder path as the structure
# for the dictionary.
data = {}
for root, dirs, files in os.walk(dir):
for file in files:
if file == 'alerts.yaml':
with open(os.path.join(root, file)) as f:
# Get current folder name and parent folder name
# to use as keys in the dictionary
resourceType = os.path.basename(root)
resouceCategory = os.path.basename(os.path.dirname(root))
if not resouceCategory in data:
data[resouceCategory] = {}
if not resourceType in data[resouceCategory]:
data[resouceCategory][resourceType] = []
alerts = yaml.safe_load(f)
if alerts:
for alert in alerts:
if (not export_hidden) and ('visible' in alert) and (alert['visible'] == False):
continue
data[resouceCategory][resourceType].append(alert)
return data
def readTemplates(template_path):
arm = {}
bicep = {}
# Read ARM templates from arm directory into a string
for root, dirs, files in os.walk(os.path.join(template_path, 'arm')):
for file in files:
if file.endswith('.json'):
# read the file into a string
with open(os.path.join(root, file)) as f:
arm[file.replace('.json','')] = f.read()
# Read Bicep templates from arm directory into a string
for root, dirs, files in os.walk(os.path.join(template_path, 'bicep')):
for file in files:
if file.endswith('.bicep'):
# read the file into a string
with open(os.path.join(root, file)) as f:
bicep[file.replace('.bicep','')] = f.read()
return arm, bicep
def main():
args = parseArguments()
data = readYamlData(args.path, False)
arm, bicep = readTemplates(args.template_path)
import datetime
import re
for category in data:
for resourceType in data[category]:
print(f"Generating templates for {len(data[category][resourceType])} alerts in {category}/{resourceType}...")
# create directories based on template types if it doesn't exist
os.makedirs(os.path.join(args.output, category, resourceType, "templates", "arm"), exist_ok=True)
os.makedirs(os.path.join(args.output, category, resourceType, "templates", "bicep"), exist_ok=True)
# Remove all files in arm and bicep directories
for root, dirs, files in os.walk(os.path.join(args.output, category, resourceType, "templates", "arm")):
for file in files:
os.remove(os.path.join(root, file))
for root, dirs, files in os.walk(os.path.join(args.output, category, resourceType, "templates", "bicep")):
for file in files:
os.remove(os.path.join(root, file))
alert_file_names = []
for alert in data[category][resourceType]:
arm_template_type = ""
alert_file_name = re.sub(r'[^a-zA-Z0-9-]', '', alert['name']) + '_' + alert['guid']
if alert["type"] == "Metric":
if 'criterionType' in alert["properties"]:
if alert["properties"]["criterionType"] == "StaticThresholdCriterion":
template_type = "metric-static"
else:
template_type = "metric-dynamic"
else:
print(f"CriterionType not found for alert {alert['guid']}")
elif alert["type"] == "Log":
template_type = "log"
elif alert["type"] == "ActivityLog":
if alert["properties"]["category"] == "Administrative":
template_type = "activity-administrative"
elif alert["properties"]["category"] == "ResourceHealth":
template_type = "activity-resourcehealth"
elif alert["properties"]["category"] == "ServiceHealth":
template_type = "activity-servicehealth"
else:
continue
if template_type == "":
print(f"Template not found for alert {alert['guid']}")
else:
# Generate ARM template
arm_template = arm[template_type]
bicep_template = bicep[template_type]
arm_template = arm_template.replace("##TELEMETRY_PID##", args.telemetry_pid)
bicep_template = bicep_template.replace("##TELEMETRY_PID##", args.telemetry_pid)
if 'description' in alert and alert["description"] is not None:
arm_template = arm_template.replace("##DESCRIPTION##", alert["description"])
bicep_template = bicep_template.replace("##DESCRIPTION##", alert["description"])
else:
arm_template = arm_template.replace("##DESCRIPTION##", "")
bicep_template = bicep_template.replace("##DESCRIPTION##", "")
if 'severity' in alert["properties"] and alert["properties"]["severity"] is not None:
arm_template = arm_template.replace("##SEVERITY##", str(int(alert["properties"]["severity"])))
bicep_template = bicep_template.replace("##SEVERITY##", str(int(alert["properties"]["severity"])))
else:
arm_template = arm_template.replace("##SEVERITY##", "")
bicep_template = bicep_template.replace("##SEVERITY##", "")
if 'autoMitigate' in alert["properties"] and alert["properties"]["autoMitigate"] is not None:
if alert["properties"]["autoMitigate"] == True:
arm_template = arm_template.replace("##AUTO_MITIGATE##", "true")
bicep_template = bicep_template.replace("##AUTO_MITIGATE##", "true")
else:
arm_template = arm_template.replace("##AUTO_MITIGATE##", "False")
bicep_template = bicep_template.replace("##AUTO_MITIGATE##", "false")
else:
arm_template = arm_template.replace("##AUTO_MITIGATE##", "")
bicep_template = bicep_template.replace("##AUTO_MITIGATE##", "")
if 'query' in alert["properties"] and alert["properties"]["query"] is not None:
arm_template = arm_template.replace("##QUERY##", json.dumps(alert["properties"]["query"].replace("\n", " ")))
query = alert["properties"]["query"].replace("\n", " ").replace("'", "\\'")
bicep_template = bicep_template.replace("##QUERY##", query)
else:
arm_template = arm_template.replace("##QUERY##", "")
bicep_template = bicep_template.replace("##QUERY##", "")
if 'metricName' in alert["properties"] and alert["properties"]["metricName"] is not None:
arm_template = arm_template.replace("##METRIC_NAME##", alert["properties"]["metricName"])
bicep_template = bicep_template.replace("##METRIC_NAME##", alert["properties"]["metricName"])
else:
arm_template = arm_template.replace("##METRIC_NAME##", "")
bicep_template = bicep_template.replace("##METRIC_NAME##", "")
if 'metricMeasureColumn' in alert["properties"] and alert["properties"]["metricMeasureColumn"] is not None:
arm_template = arm_template.replace("##METRIC_MEASURE_COLUMN##", alert["properties"]["metricMeasureColumn"])
bicep_template = bicep_template.replace("##METRIC_MEASURE_COLUMN##", alert["properties"]["metricMeasureColumn"])
else:
arm_template = arm_template.replace("##METRIC_MEASURE_COLUMN##", "")
bicep_template = bicep_template.replace("##METRIC_MEASURE_COLUMN##", "")
if 'resouceIdColumn' in alert["properties"] and alert["properties"]["resouceIdColumn"] is not None:
arm_template = arm_template.replace("##RESOURCE_ID_COLUMN##", alert["properties"]["resouceIdColumn"])
bicep_template = bicep_template.replace("##RESOURCE_ID_COLUMN##", alert["properties"]["resouceIdColumn"])
else:
arm_template = arm_template.replace("##RESOURCE_ID_COLUMN##", "")
bicep_template = bicep_template.replace("##RESOURCE_ID_COLUMN##", "")
if 'operator' in alert["properties"] and alert["properties"]["operator"] is not None:
arm_template = arm_template.replace("##OPERATOR##", alert["properties"]["operator"])
bicep_template = bicep_template.replace("##OPERATOR##", alert["properties"]["operator"])
else:
arm_template = arm_template.replace("##OPERATOR##", "")
bicep_template = bicep_template.replace("##OPERATOR##", "")
if 'failingPeriods' in alert["properties"] and alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"] is not None:
arm_template = arm_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", str(int(alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"])))
bicep_template = bicep_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", str(int(alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"])))
else:
arm_template = arm_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", "")
bicep_template = bicep_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", "")
if 'failingPeriods' in alert["properties"] and alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"] is not None:
arm_template = arm_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", str(int(alert["properties"]["failingPeriods"]["minFailingPeriodsToAlert"])))
bicep_template = bicep_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", str(int(alert["properties"]["failingPeriods"]["minFailingPeriodsToAlert"])))
else:
arm_template = arm_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", "")
bicep_template = bicep_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", "")
if 'threshold' in alert["properties"] and alert["properties"]["threshold"] is not None:
# Convert the threshold to a float
threshold = float(alert["properties"]["threshold"])
threshold = str(round(threshold))
if threshold == "":
raise Exception(f"Threshold is empty for alert {alert['guid']}")
arm_template = arm_template.replace("##THRESHOLD##", threshold)
bicep_template = bicep_template.replace("##THRESHOLD##", threshold)
else:
arm_template = arm_template.replace("##THRESHOLD##", "")
bicep_template = bicep_template.replace("##THRESHOLD##", "")
if 'timeAggregation' in alert["properties"] and alert["properties"]["timeAggregation"] is not None:
arm_template = arm_template.replace("##TIME_AGGREGATION##", alert["properties"]["timeAggregation"])
bicep_template = bicep_template.replace("##TIME_AGGREGATION##", alert["properties"]["timeAggregation"])
else:
arm_template = arm_template.replace("##TIME_AGGREGATION##", "")
bicep_template = bicep_template.replace("##TIME_AGGREGATION##", "")
if 'windowSize' in alert["properties"] and alert["properties"]["windowSize"] is not None:
arm_template = arm_template.replace("##WINDOW_SIZE##", alert["properties"]["windowSize"])
bicep_template = bicep_template.replace("##WINDOW_SIZE##", alert["properties"]["windowSize"])
else:
arm_template = arm_template.replace("##WINDOW_SIZE##", "")
bicep_template = bicep_template.replace("##WINDOW_SIZE##", "")
if 'evaluationFrequency' in alert["properties"] and alert["properties"]["evaluationFrequency"] is not None:
arm_template = arm_template.replace("##EVALUATION_FREQUENCY##", alert["properties"]["evaluationFrequency"])
bicep_template = bicep_template.replace("##EVALUATION_FREQUENCY##", alert["properties"]["evaluationFrequency"])
else:
arm_template = arm_template.replace("##EVALUATION_FREQUENCY##", "")
bicep_template = bicep_template.replace("##EVALUATION_FREQUENCY##", "")
if 'alertSensitivity' in alert["properties"] and alert["properties"]["alertSensitivity"] is not None:
arm_template = arm_template.replace("##ALERT_SENSITIVITY##", alert["properties"]["alertSensitivity"])
bicep_template = bicep_template.replace("##ALERT_SENSITIVITY##", alert["properties"]["alertSensitivity"])
else:
arm_template = arm_template.replace("##ALERT_SENSITIVITY##", "")
bicep_template = bicep_template.replace("##ALERT_SENSITIVITY##", "")
if 'dimensions' in alert["properties"] and alert["properties"]["dimensions"] is not None:
arm_template = arm_template.replace("##DIMENSIONS##", json.dumps(alert["properties"]["dimensions"]))
dimensions = []
for dimension in alert["properties"]["dimensions"]:
values = []
for value in dimension["values"]:
values.append(f"'{value}'")
dimensions.append(f"""
{{
name: '{dimension["name"]}'
operator: '{dimension["operator"]}'
values: [{",".join(values)}]
}}""")
bicep_template = bicep_template.replace("##DIMENSIONS##", "[" + "".join(dimensions) + "]")
else:
arm_template = arm_template.replace("##DIMENSIONS##", "[]")
bicep_template = bicep_template.replace("##DIMENSIONS##", "[]")
if 'operationName' in alert["properties"] and alert["properties"]["operationName"] is not None:
arm_template = arm_template.replace("##OPERATION_NAME##", alert["properties"]["operationName"])
bicep_template = bicep_template.replace("##OPERATION_NAME##", alert["properties"]["operationName"])
else:
arm_template = arm_template.replace("##OPERATION_NAME##", "")
bicep_template = bicep_template.replace("##OPERATION_NAME##", "")
if 'status' in alert["properties"] and alert["properties"]["status"] is not None:
arm_template = arm_template.replace("##STATUS##", json.dumps(alert["properties"]["status"]))
statuses = []
for status in alert["properties"]["status"]:
statuses.append(f"'{status}'")
bicep_template = bicep_template.replace("##STATUS##", f"[{",".join(statuses)}]")
else:
arm_template = arm_template.replace("##STATUS##", "")
bicep_template = bicep_template.replace("##STATUS##", "")
if 'causes' in alert["properties"] and alert["properties"]["causes"] is not None:
causes = []
for cause in alert["properties"]["causes"]:
causes.append(f"""
{{
"field": "properties.cause",
"equals": "{cause}"
}}""")
arm_template = arm_template.replace("##CAUSES##", f"{",".join(causes)}")
causes = []
for cause in alert["properties"]["causes"]:
causes.append(f"""
{{
field: 'properties.cause'
equals: '{cause}'
}}""")
bicep_template = bicep_template.replace("##CAUSES##", f"{"".join(causes)}")
else:
arm_template = arm_template.replace("##CAUSES##", "")
bicep_template = bicep_template.replace("##CAUSES##", "")
if 'currentHealthStatus' in alert["properties"] and alert["properties"]["currentHealthStatus"] is not None:
currentHealthStatuses = []
for currentHealthStatus in alert["properties"]["currentHealthStatus"]:
currentHealthStatuses.append(f"""
{{
"field": "properties.currentHealthStatus",
"equals": "{currentHealthStatus}"
}}""")
arm_template = arm_template.replace("##CURRENT_HEALTH_STATUS##", f"{",".join(currentHealthStatuses)}")
currentHealthStatuses = []
for currentHealthStatus in alert["properties"]["currentHealthStatus"]:
currentHealthStatuses.append(f"""
{{
field: 'properties.currentHealthStatus'
equals: '{currentHealthStatus}'
}}""")
bicep_template = bicep_template.replace("##CURRENT_HEALTH_STATUS##", f"{"".join(currentHealthStatuses)}")
else:
arm_template = arm_template.replace("##CURRENT_HEALTH_STATUS##", "")
bicep_template = bicep_template.replace("##CURRENT_HEALTH_STATUS##", "")
if 'incidentType' in alert["properties"] and alert["properties"]["incidentType"] is not None:
arm_template = arm_template.replace("##INCIDENT_TYPE##", alert["properties"]["incidentType"])
bicep_template = bicep_template.replace("##INCIDENT_TYPE##", alert["properties"]["incidentType"])
else:
arm_template = arm_template.replace("##INCIDENT_TYPE##", "")
bicep_template = bicep_template.replace("##INCIDENT_TYPE##", "")
# Check if alert name already exists
if alert_file_name in alert_file_names:
raise Exception(f"Alert name {alert_file_name} already exists in the list of alerts for {category}/{resourceType}")
else:
alert_file_names.append(alert_file_name)
with open(os.path.join(args.output, category, resourceType, "templates", "arm", alert_file_name + '.json'), 'w') as f:
f.write(arm_template)
# Save the Bicep template
with open(os.path.join(args.output, category, resourceType, "templates", "bicep", alert_file_name + '.bicep'), 'w') as f:
f.write(bicep_template)
if __name__ == '__main__':
main()