def alarm_puller()

in adot/utils/soak/soak.py [0:0]


def alarm_puller(function_name, cpu_threshold, memory_threshold):
    global state

    # Memory alarm
    cloudwatch.put_metric_alarm(
        AlarmName=memory_alarm,
        ActionsEnabled=True,
        OKActions=[],
        AlarmActions=[],
        InsufficientDataActions=[],
        MetricName="memory_utilization",
        Namespace="LambdaInsights",
        Statistic="Maximum",
        Dimensions=[
            {"Name": "function_name", "Value": function_name},
        ],
        Period=60,
        EvaluationPeriods=5,
        DatapointsToAlarm=3,
        Threshold=memory_threshold,
        ComparisonOperator="GreaterThanThreshold",
        TreatMissingData="notBreaching",
    )

    # CPU alarm
    cloudwatch.put_metric_alarm(
        AlarmName=cpu_alarm,
        ActionsEnabled=True,
        OKActions=[],
        AlarmActions=[],
        InsufficientDataActions=[],
        MetricName="cpu_total_time",
        Namespace="LambdaInsights",
        Statistic="Maximum",
        Dimensions=[
            {"Name": "function_name", "Value": function_name},
        ],
        Period=60,
        EvaluationPeriods=5,
        DatapointsToAlarm=3,
        Threshold=cpu_threshold,
        ComparisonOperator="GreaterThanThreshold",
        TreatMissingData="notBreaching",
    )

    paginator = cloudwatch.get_paginator("describe_alarms")

    while True:
        for response in paginator.paginate(AlarmNames=[memory_alarm, cpu_alarm]):
            for alarm in response["MetricAlarms"]:
                print("{} state {}".format(alarm["AlarmName"], alarm["StateValue"]))
                if alarm["StateValue"] == "ALARM":
                    state = True
                    return

        time.sleep(60)