in tooling/generate-templates/generate-templates.py [0:0]
def main():
args = parseArguments()
data = readYamlData(args.path, False)
arm, bicep = readTemplates(args.template_path)
import datetime
import re
for category in data:
for resourceType in data[category]:
print(f"Generating templates for {len(data[category][resourceType])} alerts in {category}/{resourceType}...")
# create directories based on template types if it doesn't exist
os.makedirs(os.path.join(args.output, category, resourceType, "templates", "arm"), exist_ok=True)
os.makedirs(os.path.join(args.output, category, resourceType, "templates", "bicep"), exist_ok=True)
# Remove all files in arm and bicep directories
for root, dirs, files in os.walk(os.path.join(args.output, category, resourceType, "templates", "arm")):
for file in files:
os.remove(os.path.join(root, file))
for root, dirs, files in os.walk(os.path.join(args.output, category, resourceType, "templates", "bicep")):
for file in files:
os.remove(os.path.join(root, file))
alert_file_names = []
for alert in data[category][resourceType]:
arm_template_type = ""
alert_file_name = re.sub(r'[^a-zA-Z0-9-]', '', alert['name']) + '_' + alert['guid']
if alert["type"] == "Metric":
if 'criterionType' in alert["properties"]:
if alert["properties"]["criterionType"] == "StaticThresholdCriterion":
template_type = "metric-static"
else:
template_type = "metric-dynamic"
else:
print(f"CriterionType not found for alert {alert['guid']}")
elif alert["type"] == "Log":
template_type = "log"
elif alert["type"] == "ActivityLog":
if alert["properties"]["category"] == "Administrative":
template_type = "activity-administrative"
elif alert["properties"]["category"] == "ResourceHealth":
template_type = "activity-resourcehealth"
elif alert["properties"]["category"] == "ServiceHealth":
template_type = "activity-servicehealth"
else:
continue
if template_type == "":
print(f"Template not found for alert {alert['guid']}")
else:
# Generate ARM template
arm_template = arm[template_type]
bicep_template = bicep[template_type]
arm_template = arm_template.replace("##TELEMETRY_PID##", args.telemetry_pid)
bicep_template = bicep_template.replace("##TELEMETRY_PID##", args.telemetry_pid)
if 'description' in alert and alert["description"] is not None:
arm_template = arm_template.replace("##DESCRIPTION##", alert["description"])
bicep_template = bicep_template.replace("##DESCRIPTION##", alert["description"])
else:
arm_template = arm_template.replace("##DESCRIPTION##", "")
bicep_template = bicep_template.replace("##DESCRIPTION##", "")
if 'severity' in alert["properties"] and alert["properties"]["severity"] is not None:
arm_template = arm_template.replace("##SEVERITY##", str(int(alert["properties"]["severity"])))
bicep_template = bicep_template.replace("##SEVERITY##", str(int(alert["properties"]["severity"])))
else:
arm_template = arm_template.replace("##SEVERITY##", "")
bicep_template = bicep_template.replace("##SEVERITY##", "")
if 'autoMitigate' in alert["properties"] and alert["properties"]["autoMitigate"] is not None:
if alert["properties"]["autoMitigate"] == True:
arm_template = arm_template.replace("##AUTO_MITIGATE##", "true")
bicep_template = bicep_template.replace("##AUTO_MITIGATE##", "true")
else:
arm_template = arm_template.replace("##AUTO_MITIGATE##", "False")
bicep_template = bicep_template.replace("##AUTO_MITIGATE##", "false")
else:
arm_template = arm_template.replace("##AUTO_MITIGATE##", "")
bicep_template = bicep_template.replace("##AUTO_MITIGATE##", "")
if 'query' in alert["properties"] and alert["properties"]["query"] is not None:
arm_template = arm_template.replace("##QUERY##", json.dumps(alert["properties"]["query"].replace("\n", " ")))
query = alert["properties"]["query"].replace("\n", " ").replace("'", "\\'")
bicep_template = bicep_template.replace("##QUERY##", query)
else:
arm_template = arm_template.replace("##QUERY##", "")
bicep_template = bicep_template.replace("##QUERY##", "")
if 'metricName' in alert["properties"] and alert["properties"]["metricName"] is not None:
arm_template = arm_template.replace("##METRIC_NAME##", alert["properties"]["metricName"])
bicep_template = bicep_template.replace("##METRIC_NAME##", alert["properties"]["metricName"])
else:
arm_template = arm_template.replace("##METRIC_NAME##", "")
bicep_template = bicep_template.replace("##METRIC_NAME##", "")
if 'metricMeasureColumn' in alert["properties"] and alert["properties"]["metricMeasureColumn"] is not None:
arm_template = arm_template.replace("##METRIC_MEASURE_COLUMN##", alert["properties"]["metricMeasureColumn"])
bicep_template = bicep_template.replace("##METRIC_MEASURE_COLUMN##", alert["properties"]["metricMeasureColumn"])
else:
arm_template = arm_template.replace("##METRIC_MEASURE_COLUMN##", "")
bicep_template = bicep_template.replace("##METRIC_MEASURE_COLUMN##", "")
if 'resouceIdColumn' in alert["properties"] and alert["properties"]["resouceIdColumn"] is not None:
arm_template = arm_template.replace("##RESOURCE_ID_COLUMN##", alert["properties"]["resouceIdColumn"])
bicep_template = bicep_template.replace("##RESOURCE_ID_COLUMN##", alert["properties"]["resouceIdColumn"])
else:
arm_template = arm_template.replace("##RESOURCE_ID_COLUMN##", "")
bicep_template = bicep_template.replace("##RESOURCE_ID_COLUMN##", "")
if 'operator' in alert["properties"] and alert["properties"]["operator"] is not None:
arm_template = arm_template.replace("##OPERATOR##", alert["properties"]["operator"])
bicep_template = bicep_template.replace("##OPERATOR##", alert["properties"]["operator"])
else:
arm_template = arm_template.replace("##OPERATOR##", "")
bicep_template = bicep_template.replace("##OPERATOR##", "")
if 'failingPeriods' in alert["properties"] and alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"] is not None:
arm_template = arm_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", str(int(alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"])))
bicep_template = bicep_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", str(int(alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"])))
else:
arm_template = arm_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", "")
bicep_template = bicep_template.replace("##FAILING_PERIODS_NUMBER_OF_EVALUATION_PERIODS##", "")
if 'failingPeriods' in alert["properties"] and alert["properties"]["failingPeriods"]["numberOfEvaluationPeriods"] is not None:
arm_template = arm_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", str(int(alert["properties"]["failingPeriods"]["minFailingPeriodsToAlert"])))
bicep_template = bicep_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", str(int(alert["properties"]["failingPeriods"]["minFailingPeriodsToAlert"])))
else:
arm_template = arm_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", "")
bicep_template = bicep_template.replace("##FAILING_PERIODS_MIN_FAILING_PERIODS_TO_ALERT##", "")
if 'threshold' in alert["properties"] and alert["properties"]["threshold"] is not None:
# Convert the threshold to a float
threshold = float(alert["properties"]["threshold"])
threshold = str(round(threshold))
if threshold == "":
raise Exception(f"Threshold is empty for alert {alert['guid']}")
arm_template = arm_template.replace("##THRESHOLD##", threshold)
bicep_template = bicep_template.replace("##THRESHOLD##", threshold)
else:
arm_template = arm_template.replace("##THRESHOLD##", "")
bicep_template = bicep_template.replace("##THRESHOLD##", "")
if 'timeAggregation' in alert["properties"] and alert["properties"]["timeAggregation"] is not None:
arm_template = arm_template.replace("##TIME_AGGREGATION##", alert["properties"]["timeAggregation"])
bicep_template = bicep_template.replace("##TIME_AGGREGATION##", alert["properties"]["timeAggregation"])
else:
arm_template = arm_template.replace("##TIME_AGGREGATION##", "")
bicep_template = bicep_template.replace("##TIME_AGGREGATION##", "")
if 'windowSize' in alert["properties"] and alert["properties"]["windowSize"] is not None:
arm_template = arm_template.replace("##WINDOW_SIZE##", alert["properties"]["windowSize"])
bicep_template = bicep_template.replace("##WINDOW_SIZE##", alert["properties"]["windowSize"])
else:
arm_template = arm_template.replace("##WINDOW_SIZE##", "")
bicep_template = bicep_template.replace("##WINDOW_SIZE##", "")
if 'evaluationFrequency' in alert["properties"] and alert["properties"]["evaluationFrequency"] is not None:
arm_template = arm_template.replace("##EVALUATION_FREQUENCY##", alert["properties"]["evaluationFrequency"])
bicep_template = bicep_template.replace("##EVALUATION_FREQUENCY##", alert["properties"]["evaluationFrequency"])
else:
arm_template = arm_template.replace("##EVALUATION_FREQUENCY##", "")
bicep_template = bicep_template.replace("##EVALUATION_FREQUENCY##", "")
if 'alertSensitivity' in alert["properties"] and alert["properties"]["alertSensitivity"] is not None:
arm_template = arm_template.replace("##ALERT_SENSITIVITY##", alert["properties"]["alertSensitivity"])
bicep_template = bicep_template.replace("##ALERT_SENSITIVITY##", alert["properties"]["alertSensitivity"])
else:
arm_template = arm_template.replace("##ALERT_SENSITIVITY##", "")
bicep_template = bicep_template.replace("##ALERT_SENSITIVITY##", "")
if 'dimensions' in alert["properties"] and alert["properties"]["dimensions"] is not None:
arm_template = arm_template.replace("##DIMENSIONS##", json.dumps(alert["properties"]["dimensions"]))
dimensions = []
for dimension in alert["properties"]["dimensions"]:
values = []
for value in dimension["values"]:
values.append(f"'{value}'")
dimensions.append(f"""