in cdk-iot-analytics/cdk_sap_blog/analytics/analytics.py [0:0]
def get_detector_model(scope, input_name, sns_arn):
role = iam.Role(
scope=scope,
id="CDKSAPBlogEventDetectorRole",
role_name="CDKSAPBlogEventDetectorRole",
assumed_by=iam.ServicePrincipal('iotevents.amazonaws.com')
)
[
role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name(policy)
) for policy in [
'AWSIoTEventsFullAccess',
'service-role/AmazonSNSRole',
'AmazonSNSFullAccess',
'AWSLambda_FullAccess',
'service-role/AWSLambdaRole'
]
]
def get_state_lambda_payload(state):
return f"""'{{
"d": {{
"Equipment": "${{$input.{input_name}.Equipment}}",
"FunctLoc": "${{$input.{input_name}.FunctLoc}}",
"ShortText": "Temp Alarm [{state}]",
"LongText": "Temperature Alarm in ${{$input.{input_name}.Type}} Motor"
}}
}}'"""
return iotevents.CfnDetectorModel(
scope=scope,
id="CDKSAPBlogDetectorModel",
detector_model_name="CDKSAPBlogDetectorModel",
detector_model_description='Detect temperature in a given range.',
evaluation_method="BATCH",
key="thingname",
role_arn=role.role_arn,
detector_model_definition=iotevents.CfnDetectorModel.DetectorModelDefinitionProperty(
initial_state_name="TempInRange",
states=[
iotevents.CfnDetectorModel.StateProperty(
state_name="OverTemp",
on_input=iotevents.CfnDetectorModel.OnInputProperty(
events=[],
transition_events=[
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="overTempTooLong",
condition="timeout(\"hiTempDur\")",
actions=[],
next_state="OverTempAlarm"
),
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="tempBelowMax",
condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) < convert(Decimal,$input.{input_name}.maxTemp_degC)",
actions=[],
next_state="TempInRange"
)
]
),
on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="startTimer",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
set_timer=iotevents.CfnDetectorModel.SetTimerProperty(
duration_expression=None,
timer_name="hiTempDur",
seconds=300
)
)
]
)
]
),
on_exit=iotevents.CfnDetectorModel.OnExitProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="deleteTimer",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
clear_timer=iotevents.CfnDetectorModel.ClearTimerProperty(
timer_name="hiTempDur"
)
)
]
)
]
)
),
iotevents.CfnDetectorModel.StateProperty(
state_name="UnderTemp",
on_input=iotevents.CfnDetectorModel.OnInputProperty(
events=[],
transition_events=[
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="underTempTooLong",
condition="timeout(\"loTempDur\")",
actions=[],
next_state="UnderTempAlarm"
),
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="tempAboveMin",
condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) > convert(Decimal,$input.{input_name}.minTemp_degC)",
actions=[],
next_state="TempInRange"
)
]
),
on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="startTimer",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
set_timer=iotevents.CfnDetectorModel.SetTimerProperty(
timer_name="loTempDur",
seconds=300,
duration_expression=None
)
)
]
)
]
),
on_exit=iotevents.CfnDetectorModel.OnExitProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="deleteTimer",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
clear_timer=iotevents.CfnDetectorModel.ClearTimerProperty(
timer_name="loTempDur"
)
)
]
)
]
)
),
iotevents.CfnDetectorModel.StateProperty(
state_name="TempInRange",
on_input=iotevents.CfnDetectorModel.OnInputProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="readTemp",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
set_variable=iotevents.CfnDetectorModel.SetVariableProperty(
variable_name="temperature_degC",
value=f"convert(Decimal, $input.{input_name}.temperature_degC_mean)"
)
)
]
)
],
transition_events=[
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="tempAboveMax",
condition=f"convert(Decimal, $input.{input_name}.temperature_degC_mean) > convert(Decimal, $input.{input_name}.maxTemp_degC)",
actions=[],
next_state="OverTemp"
),
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="tempBelowMin",
condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) < convert(Decimal,$input.{input_name}.minTemp_degC)",
actions=[],
next_state="UnderTemp"
)
]
),
on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="inRange",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
lambda_=iotevents.CfnDetectorModel.LambdaProperty(
function_arn=sns_arn,
payload=iotevents.CfnDetectorModel.PayloadProperty(
content_expression=get_state_lambda_payload("TempInRange:inRange"),
type="JSON"
)
)
)
]
),
iotevents.CfnDetectorModel.EventProperty(
event_name="sendAlert",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
lambda_=iotevents.CfnDetectorModel.LambdaProperty(
function_arn=sns_arn,
payload=iotevents.CfnDetectorModel.PayloadProperty(
content_expression=get_state_lambda_payload("TempInRange.sendAlert"),
type="JSON"
)
)
)
]
)
]
),
on_exit=iotevents.CfnDetectorModel.OnExitProperty(
events=[]
)
),
iotevents.CfnDetectorModel.StateProperty(
state_name="UnderTempAlarm",
on_input=iotevents.CfnDetectorModel.OnInputProperty(
events=[],
transition_events=[
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="tempAboveMin",
condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) > convert(Decimal,$input.{input_name}.minTemp_degC)",
actions=[],
next_state="TempInRange"
)
]
),
on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="sendAlert",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
lambda_=iotevents.CfnDetectorModel.LambdaProperty(
function_arn=sns_arn,
payload=iotevents.CfnDetectorModel.PayloadProperty(
content_expression=get_state_lambda_payload("UnderTempAlarm.sendAlert"),
type="JSON"
)
)
)
]
)
]
),
on_exit=iotevents.CfnDetectorModel.OnExitProperty(
events=[]
)
),
iotevents.CfnDetectorModel.StateProperty(
state_name="OverTempAlarm",
on_input=iotevents.CfnDetectorModel.OnInputProperty(
events=[],
transition_events=[
iotevents.CfnDetectorModel.TransitionEventProperty(
event_name="tempBelowMax",
condition=f"convert(Decimal,$input.{input_name}.temperature_degC_mean) < convert(Decimal,$input.{input_name}.maxTemp_degC)",
actions=[],
next_state="TempInRange"
)
]
),
on_enter=iotevents.CfnDetectorModel.OnEnterProperty(
events=[
iotevents.CfnDetectorModel.EventProperty(
event_name="sendAlert",
condition="true",
actions=[
iotevents.CfnDetectorModel.ActionProperty(
lambda_=iotevents.CfnDetectorModel.LambdaProperty(
function_arn=sns_arn,
payload=iotevents.CfnDetectorModel.PayloadProperty(
content_expression=get_state_lambda_payload("OverTempAlarm.sendAlert"),
type="JSON"
)
)
)
]
)
]
),
on_exit=iotevents.CfnDetectorModel.OnExitProperty(
events=[]
)
)
]
),
)