def lambda_handler()

in functions/delete_alarms/app.py [0:0]


def lambda_handler(event, context):

    logger.info(json.dumps(event, indent=2, default=default))

    mwaa_environments = mwaa.list_environments()["Environments"]

    logger.info(f"Airflow environments: {json.dumps(mwaa_environments, indent=2)}")

    alarms_to_delete = []
    paginator = cloudwatch.get_paginator("describe_alarms")
    for response in paginator.paginate(AlarmNamePrefix="Airflow-"):
        logger.debug(json.dumps(response["MetricAlarms"], indent=2, default=default))
        for alarm in response["MetricAlarms"]:
            alarm_arn = alarm["AlarmArn"]
            alarm_tags = cloudwatch.list_tags_for_resource(ResourceARN=alarm_arn)
            # if alarm is tagged with a nonexistent MWAA environment, mark it for deletion
            if not has_tag_with_value_in_list(
                alarm_tags["Tags"], "MWAAEnvironment", mwaa_environments
            ):
                alarms_to_delete.append(alarm["AlarmName"])

    if len(alarms_to_delete) == 0:
        logger.info("No alarms to delete.")
        return

    logger.info("Deleting alarms: " + ",".join(alarms_to_delete))
    cloudwatch.delete_alarms(AlarmNames=alarms_to_delete)