def put_alarms()

in resources/flink-on-kda/index.py [0:0]


def put_alarms(context, curr_capacity):
    try:
        
        scaling_policies = get_scaling_policies(context)
        if not scaling_policies:
            print("ERROR: Unable to get scaling_policies")
            return
        
        if curr_capacity > 1:
            # scale in alarm (for curr_capacity > 1)
            scalein_policy_arn = scaling_policies['KDAScaleIn']
            print("scalein_policy_arn: " + scalein_policy_arn)
            if not scalein_policy_arn:
                print("ERROR - scalein_policy_arn is None in put_alarms")
                return

            client_cloudwatch.put_metric_alarm(
                AlarmName=get_cloudwatch_alarm_in(),
                AlarmDescription='KDA scale in alarm',
                Metrics=[
                    {
                        "Expression": "FILL(ir/60, 0)",
                        "Id": "ad1",
                        "ReturnData": False
                    },
                    {
                        "Id": "ir",
                        "MetricStat": {
                            "Metric": {
                                "Dimensions": [
                                    {
                                        "Value": get_kinesis_stream(),
                                        "Name": "StreamName"
                                    }
                                ],
                                "MetricName": "IncomingRecords",
                                "Namespace": "AWS/Kinesis"
                            },
                            "Period": 60,
                            "Stat": "Sum"
                        },
                        "ReturnData": True
                    }
                ],
                Threshold=(curr_capacity-1)*get_max_throughput_per_kpu(),
                ComparisonOperator='LessThanOrEqualToThreshold',
                AlarmActions=[
                    scalein_policy_arn
                ],
                EvaluationPeriods=2,
                TreatMissingData=get_how_to_treat_missing_data(curr_capacity))
        elif curr_capacity <= 1:
            # scale in alarm (for capacity <= 1)
            client_cloudwatch.put_metric_alarm(
                AlarmName=get_cloudwatch_alarm_in(),
                AlarmDescription='KDA scale in alarm',
                Metrics=[
                    {
                        "Expression": "FILL(ir/60, 0)",
                        "Id": "ad1",
                        "ReturnData": False
                    },
                    {
                        "Id": "ir",
                        "MetricStat": {
                            "Metric": {
                                "Dimensions": [
                                    {
                                        "Value": get_kinesis_stream(),
                                        "Name": "StreamName"
                                    }
                                ],
                                "MetricName": "IncomingRecords",
                                "Namespace": "AWS/Kinesis"
                            },
                            "Period": 60,
                            "Stat": "Sum"
                        },
                        "ReturnData": True
                    }
                ],
                Threshold=0,
                ComparisonOperator='LessThanOrEqualToThreshold',
                EvaluationPeriods=2)

        if curr_capacity < get_max_kpus():
            # scale out alarm
            scaleout_policy_arn = scaling_policies['KDAScaleOut']
            print("scaleout_policy_arn: " + scaleout_policy_arn)
            if not scaleout_policy_arn:
                print("ERROR - scaleout_policy_arn is None in put_alarms")
                return
    
            client_cloudwatch.put_metric_alarm(
                AlarmName=get_cloudwatch_alarm_out(),
                AlarmDescription='KDA scale out alarm',
                Metrics=[
                    {
                        "Expression": "FILL(ir/60, 0)",
                        "Id": "ad1",
                        "ReturnData": False
                    },
                    {
                        "Id": "ir",
                        "MetricStat": {
                            "Metric": {
                                "Dimensions": [
                                    {
                                        "Value": get_kinesis_stream(),
                                        "Name": "StreamName"
                                    }
                                ],
                                "MetricName": "IncomingRecords",
                                "Namespace": "AWS/Kinesis"
                            },
                            "Period": 60,
                            "Stat": "Sum"
                        },
                        "ReturnData": True
                    }
                ],
                Threshold=curr_capacity*get_max_throughput_per_kpu(),
                ComparisonOperator='GreaterThanOrEqualToThreshold',
                AlarmActions=[
                    scaleout_policy_arn
                ],
                EvaluationPeriods=1)
    except Exception as e:
        print("ERROR - Exception in put_alarms")
        print(e)