in cdk-iot-analytics/cdk_sap_blog/analytics/analytics.py [0:0]
def get_analytics_pipeline(scope, datastore_name, error_log_arn=None):
m_channel = get_analytics_channel(scope)
m_lambda = _lambda_.get_product_range(scope)
enrich_activity_name = "CDKSAPBlogAnalyticsRegistryEnrichActivity"
enrich_role = iam.Role(
scope=scope,
id="CDKSAPBlogAnalyticsRegistryEnrichRole",
role_name="CDKSAPBlogAnalyticsRegistryEnrichRole",
assumed_by=iam.ServicePrincipal('iotanalytics.amazonaws.com'),
)
enrich_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[f'arn:aws:iot:{scope.region}:{scope.account}:thing/{scope.thing_name}'],
actions=['iot:DescribeThing',]
)
)
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSIoTThingsRegistration'))
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSIoTLogging'))
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSIoTRuleActions'))
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTAnalyticsFullAccess'))
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTConfigReadOnlyAccess'))
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTDataAccess'))
enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTFullAccess'))
enrich_rule_role = iam.Role(
scope=scope,
id="CDKSAPBlogAnalyticsRuleRegistryEnrichRole",
role_name="CDKSAPBlogAnalyticsRuleRegistryEnrichRole",
assumed_by=iam.ServicePrincipal('iot.amazonaws.com'),
)
enrich_rule_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[f"arn:aws:iotanalytics:{scope.region}:{scope.account}:channel/{m_channel.channel_name}"],
actions=['iotanalytics:BatchPutMessage',]
)
)
enrich_rule_role.node.add_dependency(m_channel)
iot_rule = rules.get_analytics_rule(
scope,
m_channel.channel_name,
enrich_rule_role.role_arn,
error_log_arn=error_log_arn
)
pipeline = analytics.CfnPipeline(
scope=scope,
id="CDKSAPBlogAnalyticsPipeline",
pipeline_name="CDKSAPBlogAnalyticsPipeline",
pipeline_activities=[
analytics.CfnPipeline.ActivityProperty(
channel={
"channelName": m_channel.channel_name,
"name" : m_channel.channel_name,
"next" : enrich_activity_name
}
),
analytics.CfnPipeline.ActivityProperty(
device_registry_enrich={
"name": enrich_activity_name,
"attribute": "registry",
"thingName": "thingname", # scope.thing_name,
"roleArn": enrich_role.role_arn,
"next": "RemoveAttributesActivity"
}
),
analytics.CfnPipeline.ActivityProperty(
remove_attributes={
"name": "RemoveAttributesActivity",
"attributes": [
"registry.billingGroupName",
"registry.defaultClientId",
"registry.thingArn",
"registry.thingId",
"registry.thingName",
# "registry.attributes",
"registry.thingTypeName",
"registry.version"
],
"next": m_lambda.function_name
}
),
analytics.CfnPipeline.ActivityProperty(
lambda_={
"batchSize" : 3,
"lambdaName" : m_lambda.function_name,
"name" : m_lambda.function_name,
"next" : "SAPBlogDatastore"
}
),
analytics.CfnPipeline.ActivityProperty(
datastore={
"name": "SAPBlogDatastore",
"datastoreName": datastore_name
}
)
]
)
return pipeline