in device_watchdog/cdkstack/lorawan_connectivity_watchdog_stack.py [0:0]
def __init__(self, scope: cdk.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
####################################################################################
# IoT Events
# IoT Events: Execution role
iot_events_execution_role = iam.Role(self, "IoTEventsExecutionRole", assumed_by=iam.ServicePrincipal("iotevents.amazonaws.com"))
iot_events_execution_role.add_to_policy(iam.PolicyStatement(
resources=["arn:aws:iot:"+Aws.REGION+":"+Aws.ACCOUNT_ID+":topic/awsiotcorelorawan/*"],
actions=["iot:Publish"]
))
# IoT Events: Input
inputDefinitionProperty = iotevents.CfnInput.InputDefinitionProperty(
attributes=[{"jsonPath": "deviceid"},
{"jsonPath": "timestamp_ms"}]
)
iot_events_input = iotevents.CfnInput(self, "LoRaWANDeviceWatchdogInput",
input_definition=inputDefinitionProperty,
input_name="LoRaWANDeviceWatchdogInput",
input_description="Input for connectivity status updates for LoRaWAN devices"
)
# IoT Events: Detector Model
notify_if_inactive_seconds_value = cdk.CfnParameter(self, "notifyifinactivseconds", type="Number").value_as_number
lorawan_device_heartbeat_detectormodel_definition = iotevents.CfnDetectorModel.DetectorModelDefinitionProperty(
initial_state_name=lorawan_device_heartbeat_detectormodel.initial_state_name,
states=lorawan_device_heartbeat_detectormodel.get_states(self, notify_if_inactive_seconds=notify_if_inactive_seconds_value)
)
iot_events_model_1 = iotevents.CfnDetectorModel(self, "LoRaWANDeviceHeartbeatWatchdogModel",
detector_model_definition=lorawan_device_heartbeat_detectormodel_definition,
detector_model_name="LoRaWANDeviceHeartbeatWatchdogModel",
detector_model_description="Detector model for LoRaWAN device heartbeat",
key="deviceid",
evaluation_method="BATCH",
role_arn=iot_events_execution_role.role_arn)
####################################################################################
# AWS IoT TRule
iot_rule_role = iam.Role(self, "LoRaWANDeviceHeartbeatWatchdogSampleRuleRole", assumed_by=iam.ServicePrincipal("iot.amazonaws.com"))
iot_rule_role.add_to_policy(iam.PolicyStatement(
resources=["arn:aws:iotevents:"+Aws.REGION+":"+Aws.ACCOUNT_ID+":input/"+iot_events_input.input_name],
actions=["iotevents:BatchPutMessage"]
))
iot_rule_role.add_to_policy(iam.PolicyStatement(
resources=["arn:aws:iot:"+Aws.REGION+":"+Aws.ACCOUNT_ID+":topic/*"],
actions=["iot:Publish"]
))
iot_rule = iot.CfnTopicRule(self, "LoRaWANDeviceHeartbeatWatchdogSampleRule",
rule_name="LoRaWANDeviceHeartbeatWatchdogSampleRule",
topic_rule_payload = {
"sql": "SELECT WirelessDeviceId as deviceid, timestamp() as timestamp_ms from 'LoRaWANDeviceHeartbeatWatchdogSampleRule_sampletopic'",
"actions": [
{
"iotEvents": {
"inputName": iot_events_input.input_name,
"roleArn": iot_rule_role.role_arn
}
},
{
"republish": {
"roleArn": iot_rule_role.role_arn,
"topic": "awsiotcorelorawan/debug"
}}
],
"errorAction": {
"republish": {
"roleArn": iot_rule_role.role_arn,
"topic": "awsiotcorelorawan/error"
}
},
"ruleDisabled": False,
"awsIotSqlVersion": "2016-03-23",
})
####################################################################################
# SNS topic
sns_topic = sns.Topic(self, "LoRaWANDeviceNotificationTopic",
display_name="Topic to use for notifications about LoRaWAN device events",
topic_name="LoRaWANDeviceNotificationTopic"
)
email_address = cdk.CfnParameter(self, "emailforalarms")
sns_topic.add_subscription(subscriptions.EmailSubscription(email_address.value_as_string))