in resources/flink-on-kda/index.py [0:0]
def put_alarms(context, curr_capacity):
try:
scaling_policies = get_scaling_policies(context)
if not scaling_policies:
print("ERROR: Unable to get scaling_policies")
return
if curr_capacity > 1:
# scale in alarm (for curr_capacity > 1)
scalein_policy_arn = scaling_policies['KDAScaleIn']
print("scalein_policy_arn: " + scalein_policy_arn)
if not scalein_policy_arn:
print("ERROR - scalein_policy_arn is None in put_alarms")
return
client_cloudwatch.put_metric_alarm(
AlarmName=get_cloudwatch_alarm_in(),
AlarmDescription='KDA scale in alarm',
Metrics=[
{
"Expression": "FILL(ir/60, 0)",
"Id": "ad1",
"ReturnData": False
},
{
"Id": "ir",
"MetricStat": {
"Metric": {
"Dimensions": [
{
"Value": get_kinesis_stream(),
"Name": "StreamName"
}
],
"MetricName": "IncomingRecords",
"Namespace": "AWS/Kinesis"
},
"Period": 60,
"Stat": "Sum"
},
"ReturnData": True
}
],
Threshold=(curr_capacity-1)*get_max_throughput_per_kpu(),
ComparisonOperator='LessThanOrEqualToThreshold',
AlarmActions=[
scalein_policy_arn
],
EvaluationPeriods=2,
TreatMissingData=get_how_to_treat_missing_data(curr_capacity))
elif curr_capacity <= 1:
# scale in alarm (for capacity <= 1)
client_cloudwatch.put_metric_alarm(
AlarmName=get_cloudwatch_alarm_in(),
AlarmDescription='KDA scale in alarm',
Metrics=[
{
"Expression": "FILL(ir/60, 0)",
"Id": "ad1",
"ReturnData": False
},
{
"Id": "ir",
"MetricStat": {
"Metric": {
"Dimensions": [
{
"Value": get_kinesis_stream(),
"Name": "StreamName"
}
],
"MetricName": "IncomingRecords",
"Namespace": "AWS/Kinesis"
},
"Period": 60,
"Stat": "Sum"
},
"ReturnData": True
}
],
Threshold=0,
ComparisonOperator='LessThanOrEqualToThreshold',
EvaluationPeriods=2)
if curr_capacity < get_max_kpus():
# scale out alarm
scaleout_policy_arn = scaling_policies['KDAScaleOut']
print("scaleout_policy_arn: " + scaleout_policy_arn)
if not scaleout_policy_arn:
print("ERROR - scaleout_policy_arn is None in put_alarms")
return
client_cloudwatch.put_metric_alarm(
AlarmName=get_cloudwatch_alarm_out(),
AlarmDescription='KDA scale out alarm',
Metrics=[
{
"Expression": "FILL(ir/60, 0)",
"Id": "ad1",
"ReturnData": False
},
{
"Id": "ir",
"MetricStat": {
"Metric": {
"Dimensions": [
{
"Value": get_kinesis_stream(),
"Name": "StreamName"
}
],
"MetricName": "IncomingRecords",
"Namespace": "AWS/Kinesis"
},
"Period": 60,
"Stat": "Sum"
},
"ReturnData": True
}
],
Threshold=curr_capacity*get_max_throughput_per_kpu(),
ComparisonOperator='GreaterThanOrEqualToThreshold',
AlarmActions=[
scaleout_policy_arn
],
EvaluationPeriods=1)
except Exception as e:
print("ERROR - Exception in put_alarms")
print(e)