def get_detector_model()

in cdk-iot-analytics/cdk_sap_blog/analytics/analytics.py [0:0]


def get_detector_model(scope, input_name, sns_arn):
    role = iam.Role(
        scope=scope,
        id="CDKSAPBlogEventDetectorRole",
        role_name="CDKSAPBlogEventDetectorRole",
        assumed_by=iam.ServicePrincipal('iotevents.amazonaws.com')
    )
    [
        role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name(policy)
        ) for policy in [
            'AWSIoTEventsFullAccess',
            'service-role/AmazonSNSRole',
            'AmazonSNSFullAccess',
            'AWSLambda_FullAccess',
            'service-role/AWSLambdaRole'
        ]
    ]

    def get_state_lambda_payload(state):
        return f"""'{{
    "d": {{
        "Equipment": "${{$input.{input_name}.Equipment}}",
        "FunctLoc": "${{$input.{input_name}.FunctLoc}}",
        "ShortText": "Temp Alarm [{state}]",
        "LongText": "Temperature Alarm in ${{$input.{input_name}.Type}} Motor"
    }}
}}'"""

    return iotevents.CfnDetectorModel(
        scope=scope,
        id="CDKSAPBlogDetectorModel",
        detector_model_name="CDKSAPBlogDetectorModel",
        detector_model_description='Detect temperature in a given range.',
        evaluation_method="BATCH",
        key="thingname",
        role_arn=role.role_arn,
        detector_model_definition=iotevents.CfnDetectorModel.DetectorModelDefinitionProperty(
            initial_state_name="TempInRange",
            states=[
                iotevents.CfnDetectorModel.StateProperty(
                    state_name="OverTemp",
                    on_input=iotevents.CfnDetectorModel.OnInputProperty(
                        events=[],
                        transition_events=[
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="overTempTooLong",
                                condition="timeout(\"hiTempDur\")",
                                actions=[],
                                next_state="OverTempAlarm"
                            ),
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="tempBelowMax",
                                condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) < convert(Decimal,$input.{input_name}.maxTemp_degC)",
                                actions=[],
                                next_state="TempInRange"
                            )
                        ]
                    ),
                    on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="startTimer",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        set_timer=iotevents.CfnDetectorModel.SetTimerProperty(
                                            duration_expression=None,
                                            timer_name="hiTempDur",
                                            seconds=300
                                        )
                                    )
                                ]
                            )
                        ]
                    ),
                    on_exit=iotevents.CfnDetectorModel.OnExitProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="deleteTimer",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        clear_timer=iotevents.CfnDetectorModel.ClearTimerProperty(
                                            timer_name="hiTempDur"
                                        )
                                    )
                                ]
                            )
                        ]
                    )
                ),
                iotevents.CfnDetectorModel.StateProperty(
                    state_name="UnderTemp",
                    on_input=iotevents.CfnDetectorModel.OnInputProperty(
                        events=[],
                        transition_events=[
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="underTempTooLong",
                                condition="timeout(\"loTempDur\")",
                                actions=[],
                                next_state="UnderTempAlarm"
                            ),
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="tempAboveMin",
                                condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) > convert(Decimal,$input.{input_name}.minTemp_degC)",
                                actions=[],
                                next_state="TempInRange"
                            )
                        ]
                    ),
                    on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="startTimer",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        set_timer=iotevents.CfnDetectorModel.SetTimerProperty(
                                            timer_name="loTempDur",
                                            seconds=300,
                                            duration_expression=None
                                        )
                                    )
                                ]
                            )
                        ]
                    ),
                    on_exit=iotevents.CfnDetectorModel.OnExitProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="deleteTimer",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        clear_timer=iotevents.CfnDetectorModel.ClearTimerProperty(
                                            timer_name="loTempDur"
                                        )
                                    )
                                ]
                            )
                        ]
                    )
                ),
                iotevents.CfnDetectorModel.StateProperty(
                    state_name="TempInRange",
                    on_input=iotevents.CfnDetectorModel.OnInputProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="readTemp",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        set_variable=iotevents.CfnDetectorModel.SetVariableProperty(
                                            variable_name="temperature_degC",
                                            value=f"convert(Decimal, $input.{input_name}.temperature_degC_mean)"
                                        )
                                    )
                                ]
                            )
                        ],
                        transition_events=[
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="tempAboveMax",
                                condition=f"convert(Decimal, $input.{input_name}.temperature_degC_mean) > convert(Decimal, $input.{input_name}.maxTemp_degC)",
                                actions=[],
                                next_state="OverTemp"
                            ),
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="tempBelowMin",
                                condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) < convert(Decimal,$input.{input_name}.minTemp_degC)",
                                actions=[],
                                next_state="UnderTemp"
                            )
                        ]
                    ),
                    on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="inRange",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        lambda_=iotevents.CfnDetectorModel.LambdaProperty(
                                            function_arn=sns_arn,
                                            payload=iotevents.CfnDetectorModel.PayloadProperty(
                                                content_expression=get_state_lambda_payload("TempInRange:inRange"),
                                                type="JSON"
                                            )
                                        )
                                    )
                                ]
                            ),
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="sendAlert",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        lambda_=iotevents.CfnDetectorModel.LambdaProperty(
                                            function_arn=sns_arn,
                                            payload=iotevents.CfnDetectorModel.PayloadProperty(
                                                content_expression=get_state_lambda_payload("TempInRange.sendAlert"),
                                                type="JSON"
                                            )
                                        )
                                    )
                                ]
                            )
                        ]
                    ),
                    on_exit=iotevents.CfnDetectorModel.OnExitProperty(
                        events=[]
                    )
                ),
                iotevents.CfnDetectorModel.StateProperty(
                    state_name="UnderTempAlarm",
                    on_input=iotevents.CfnDetectorModel.OnInputProperty(
                        events=[],
                        transition_events=[
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="tempAboveMin",
                                condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) > convert(Decimal,$input.{input_name}.minTemp_degC)",
                                actions=[],
                                next_state="TempInRange"
                            )
                        ]
                    ),
                    on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="sendAlert",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        lambda_=iotevents.CfnDetectorModel.LambdaProperty(
                                            function_arn=sns_arn,
                                            payload=iotevents.CfnDetectorModel.PayloadProperty(
                                                content_expression=get_state_lambda_payload("UnderTempAlarm.sendAlert"),
                                                type="JSON"
                                            )
                                        )
                                    )
                                ]
                            )
                        ]
                    ),
                    on_exit=iotevents.CfnDetectorModel.OnExitProperty(
                        events=[]
                    )
                ),
                iotevents.CfnDetectorModel.StateProperty(
                    state_name="OverTempAlarm",
                    on_input=iotevents.CfnDetectorModel.OnInputProperty(
                        events=[],
                        transition_events=[
                            iotevents.CfnDetectorModel.TransitionEventProperty(
                                event_name="tempBelowMax",
                                condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) < convert(Decimal,$input.{input_name}.maxTemp_degC)",
                                actions=[],
                                next_state="TempInRange"
                            )
                        ]
                    ),
                    on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
                        events=[
                            iotevents.CfnDetectorModel.EventProperty(
                                event_name="sendAlert",
                                condition="true",
                                actions=[
                                    iotevents.CfnDetectorModel.ActionProperty(
                                        lambda_=iotevents.CfnDetectorModel.LambdaProperty(
                                            function_arn=sns_arn,
                                            payload=iotevents.CfnDetectorModel.PayloadProperty(
                                                content_expression=get_state_lambda_payload("OverTempAlarm.sendAlert"),
                                                type="JSON"
                                            )
                                        )
                                    )
                                ]
                            )
                        ]
                    ),
                    on_exit=iotevents.CfnDetectorModel.OnExitProperty(
                        events=[]
                    )
                )
            ]
        ),
    )