def get_analytics_pipeline()

in cdk-iot-analytics/cdk_sap_blog/analytics/analytics.py [0:0]


def get_analytics_pipeline(scope, datastore_name, error_log_arn=None):
    m_channel = get_analytics_channel(scope)
    m_lambda = _lambda_.get_product_range(scope)

    enrich_activity_name = "CDKSAPBlogAnalyticsRegistryEnrichActivity"

    enrich_role = iam.Role(
        scope=scope,
        id="CDKSAPBlogAnalyticsRegistryEnrichRole",
        role_name="CDKSAPBlogAnalyticsRegistryEnrichRole",
        assumed_by=iam.ServicePrincipal('iotanalytics.amazonaws.com'),
    )
    enrich_role.add_to_policy(
        iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[f'arn:aws:iot:{scope.region}:{scope.account}:thing/{scope.thing_name}'],
            actions=['iot:DescribeThing',]
        )
    )
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSIoTThingsRegistration'))
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSIoTLogging'))
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('service-role/AWSIoTRuleActions')) 
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTAnalyticsFullAccess'))
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTConfigReadOnlyAccess'))
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTDataAccess'))
    enrich_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name('AWSIoTFullAccess'))

    enrich_rule_role = iam.Role(
        scope=scope,
        id="CDKSAPBlogAnalyticsRuleRegistryEnrichRole",
        role_name="CDKSAPBlogAnalyticsRuleRegistryEnrichRole",
        assumed_by=iam.ServicePrincipal('iot.amazonaws.com'),
    )
    enrich_rule_role.add_to_policy(
        iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[f"arn:aws:iotanalytics:{scope.region}:{scope.account}:channel/{m_channel.channel_name}"],
            actions=['iotanalytics:BatchPutMessage',]
        )
    )
    enrich_rule_role.node.add_dependency(m_channel)

    iot_rule = rules.get_analytics_rule(
        scope, 
        m_channel.channel_name, 
        enrich_rule_role.role_arn,
        error_log_arn=error_log_arn
    )

    pipeline = analytics.CfnPipeline(
        scope=scope,
		id="CDKSAPBlogAnalyticsPipeline",
        pipeline_name="CDKSAPBlogAnalyticsPipeline",
        pipeline_activities=[
            analytics.CfnPipeline.ActivityProperty(
                channel={
                    "channelName": m_channel.channel_name,
                    "name" : m_channel.channel_name,
                    "next" : enrich_activity_name
                }
            ),
            analytics.CfnPipeline.ActivityProperty(
                device_registry_enrich={
                    "name": enrich_activity_name,
                    "attribute": "registry",
                    "thingName": "thingname", # scope.thing_name,
                    "roleArn": enrich_role.role_arn,
                    "next": "RemoveAttributesActivity"
                }
            ),
            analytics.CfnPipeline.ActivityProperty(
                remove_attributes={
                    "name": "RemoveAttributesActivity",
                    "attributes": [
                        "registry.billingGroupName",
                        "registry.defaultClientId",
                        "registry.thingArn",
                        "registry.thingId",
                        "registry.thingName",
                        # "registry.attributes",
                        "registry.thingTypeName",
                        "registry.version"
                    ],
                    "next": m_lambda.function_name
                }
            ),
            analytics.CfnPipeline.ActivityProperty(
                lambda_={
                    "batchSize" : 3,
                    "lambdaName" : m_lambda.function_name,
                    "name" : m_lambda.function_name,
                    "next" : "SAPBlogDatastore"
                }
            ),
            analytics.CfnPipeline.ActivityProperty(
                datastore={
                    "name": "SAPBlogDatastore",
                    "datastoreName": datastore_name
                }
            )
        ]
    )
    return pipeline