in functions/delete_alarms/app.py [0:0]
def lambda_handler(event, context):
logger.info(json.dumps(event, indent=2, default=default))
mwaa_environments = mwaa.list_environments()["Environments"]
logger.info(f"Airflow environments: {json.dumps(mwaa_environments, indent=2)}")
alarms_to_delete = []
paginator = cloudwatch.get_paginator("describe_alarms")
for response in paginator.paginate(AlarmNamePrefix="Airflow-"):
logger.debug(json.dumps(response["MetricAlarms"], indent=2, default=default))
for alarm in response["MetricAlarms"]:
alarm_arn = alarm["AlarmArn"]
alarm_tags = cloudwatch.list_tags_for_resource(ResourceARN=alarm_arn)
# if alarm is tagged with a nonexistent MWAA environment, mark it for deletion
if not has_tag_with_value_in_list(
alarm_tags["Tags"], "MWAAEnvironment", mwaa_environments
):
alarms_to_delete.append(alarm["AlarmName"])
if len(alarms_to_delete) == 0:
logger.info("No alarms to delete.")
return
logger.info("Deleting alarms: " + ",".join(alarms_to_delete))
cloudwatch.delete_alarms(AlarmNames=alarms_to_delete)