in adot/utils/soak/soak.py [0:0]
def alarm_puller(function_name, cpu_threshold, memory_threshold):
global state
# Memory alarm
cloudwatch.put_metric_alarm(
AlarmName=memory_alarm,
ActionsEnabled=True,
OKActions=[],
AlarmActions=[],
InsufficientDataActions=[],
MetricName="memory_utilization",
Namespace="LambdaInsights",
Statistic="Maximum",
Dimensions=[
{"Name": "function_name", "Value": function_name},
],
Period=60,
EvaluationPeriods=5,
DatapointsToAlarm=3,
Threshold=memory_threshold,
ComparisonOperator="GreaterThanThreshold",
TreatMissingData="notBreaching",
)
# CPU alarm
cloudwatch.put_metric_alarm(
AlarmName=cpu_alarm,
ActionsEnabled=True,
OKActions=[],
AlarmActions=[],
InsufficientDataActions=[],
MetricName="cpu_total_time",
Namespace="LambdaInsights",
Statistic="Maximum",
Dimensions=[
{"Name": "function_name", "Value": function_name},
],
Period=60,
EvaluationPeriods=5,
DatapointsToAlarm=3,
Threshold=cpu_threshold,
ComparisonOperator="GreaterThanThreshold",
TreatMissingData="notBreaching",
)
paginator = cloudwatch.get_paginator("describe_alarms")
while True:
for response in paginator.paginate(AlarmNames=[memory_alarm, cpu_alarm]):
for alarm in response["MetricAlarms"]:
print("{} state {}".format(alarm["AlarmName"], alarm["StateValue"]))
if alarm["StateValue"] == "ALARM":
state = True
return
time.sleep(60)