constructor()

in source/lib/topic-analysis-workflow/topic-orchestration-construct.ts [40:289]


    constructor(scope: cdk.Construct, id: string, props: TopicOrchestrationProps) {
        super(scope, id);

        const lambdaPublishEventPolicy = new iam.Policy(this, 'LambdaEventBusPolicy');
        lambdaPublishEventPolicy.addStatements(new iam.PolicyStatement({
            effect: iam.Effect.ALLOW,
            resources: [ props.eventBus.eventBusArn ],
            actions: [ 'events:PutEvents' ],
        }));

        const topicScheduleRule = new events.Rule (this, 'TopicSchedule', {
            schedule: events.Schedule.expression(`${props.topicSchedule}`)
        });

        const submitTopicTask = new StepFuncLambdaTask(this, 'SubmitTopic', {
            taskName: 'Submit',
            lambdaFunctionProps: {
                runtime: lambda.Runtime.NODEJS_14_X,
                handler: 'index.handler',
                code: lambda.Code.fromAsset(`${__dirname}/../../lambda/wf-submit-topic-model`),
                environment: {
                    INGESTION_WINDOW: props.ingestionWindow,
                    RAW_BUCKET_FEED: props.rawBucket.bucketName,
                    // select prefix names if the source type has topic modelling set to true. Sending the prefix values to the
                    // lambda function as comma separated values
                    SOURCE_PREFIX: props.platformTypes.filter((type) => type.topicModelling).map((type) => type.name).join(","),
                    NUMBER_OF_TOPICS: props.numberofTopics,
                    STACK_NAME: cdk.Aws.STACK_NAME
                },
                timeout: cdk.Duration.minutes(10),
                memorySize: 256
            },
            outputPath: '$.Payload'
        });

        const lambdaSubmitJobPolicy = new iam.Policy(this, 'LambdaSubmitJobPolicy', {
            statements: [ new iam.PolicyStatement({
                effect: iam.Effect.ALLOW,
                actions: [
                    'comprehend:StartTopicsDetectionJob'
                ],
                resources: [ '*' ]
            })]
        });

        lambdaSubmitJobPolicy.attachToRole(submitTopicTask.lambdaFunction.role as iam.Role);
        (lambdaSubmitJobPolicy.node.defaultChild as iam.CfnPolicy).cfnOptions.metadata = {
            cfn_nag: {
                rules_to_suppress: [{
                  id: 'W12',
                  reason: `The * resource allows lambda function to access Amazon Comprehend services for Topic Detection. The Comprehend services not have a resource arn. This permission is retricted to the lambda function responsible for accessing the Amazon Comprehend service`
                }]
            }
        };

        const lambdaToS3 = new LambdaToS3(this, 'TopicIngestion', {
            existingLambdaObj: submitTopicTask.lambdaFunction,
            bucketProps: {
                versioned: false,
                serverAccessLogsBucket: props.s3LoggingBucket,
                serverAccessLogsPrefix: `${id}-TopicIngestion/`
            }
        });

        props.rawBucket.grantRead(submitTopicTask.lambdaFunction.role as iam.Role);

        const comprehendTopicAnalysisRole = new iam.Role (this, 'TopicAnalysisRole', {
            assumedBy: new iam.ServicePrincipal('comprehend.amazonaws.com'),
        });
        const lambdaComprehendPassRolePolicy = new iam.Policy(this, 'LambdaComprehendPassPolicy', {
            statements: [
                new iam.PolicyStatement({
                    effect: iam.Effect.ALLOW,
                    actions: [ 'iam:GetRole', 'iam:PassRole' ],
                    resources: [ comprehendTopicAnalysisRole.roleArn ]
                })
            ]
        });
        lambdaComprehendPassRolePolicy.attachToRole(submitTopicTask.lambdaFunction.role as iam.Role);

        lambdaToS3.s3Bucket!.grantReadWrite(comprehendTopicAnalysisRole);

        submitTopicTask.lambdaFunction.addEnvironment('INGESTION_S3_BUCKET_NAME', lambdaToS3.s3Bucket!.bucketName);
        submitTopicTask.lambdaFunction.addEnvironment('DATA_ACCESS_ARN', comprehendTopicAnalysisRole.roleArn);

        const platformTypes = props.platformTypes.filter((type) => type.topicModelling).map((type) => type.name)
        const parallel = new sfn.Parallel(this, 'PublishTopic', {
            comment: 'Parallely process various platform types',
            inputPath: '$',
            outputPath: '$'
        });

        const _publishTopicsMappings = new SqsToLambda(this, 'PublishTopicMapping', {
            lambdaFunctionProps: {
                runtime: lambda.Runtime.PYTHON_3_8,
                handler: 'lambda_function.topic_mapping_handler',
                code: lambda.Code.fromAsset('lambda/wf_publish_topic_model'),
                environment: {
                    RAW_DATA_FEED: props.rawBucket.bucketName,
                    EVENT_BUS_NAME: props.eventBus.eventBusName,
                    TOPICS_EVENT_NAMESPACE: props.topicsAnalaysisNameSpace,
                    TOPIC_MAPPINGS_EVENT_NAMESPACE: props.topicMappingsNameSpace,
                    SOURCE_PREFIX: platformTypes.join(',')
                },
                timeout: cdk.Duration.minutes(15),
                memorySize: 256
            },
            sqsEventSourceProps: {
                batchSize: 1
            },
            queueProps: {
                visibilityTimeout: cdk.Duration.minutes(120)
            }
        });

        props.rawBucket.grantRead(_publishTopicsMappings.lambdaFunction);
        lambdaPublishEventPolicy.attachToRole(_publishTopicsMappings.lambdaFunction.role as iam.Role);

        for (const platformType of platformTypes) {
            const _publishTopicTerms = new StepFuncLambdaTask(this, `${platformType}PublishTopicTerms`, {
                taskName: `Publish Topic Terms for ${platformType}`,
                lambdaFunctionProps: {
                    runtime: lambda.Runtime.PYTHON_3_8,
                    handler: 'lambda_function.topic_terms_handler',
                    code: lambda.Code.fromAsset('lambda/wf_publish_topic_model'),
                    environment: {
                        RAW_DATA_FEED: props.rawBucket.bucketName,
                        EVENT_BUS_NAME: props.eventBus.eventBusName,
                        TOPICS_EVENT_NAMESPACE: props.topicsAnalaysisNameSpace,
                        TOPIC_MAPPINGS_EVENT_NAMESPACE: props.topicMappingsNameSpace,
                        SOURCE_PREFIX: platformType,
                        QUEUE_NAME: _publishTopicsMappings.sqsQueue.queueName
                    },
                    timeout: cdk.Duration.minutes(15),
                },
                outputPath: '$.Payload'
            });
            props.rawBucket.grantRead(_publishTopicTerms.lambdaFunction);
            lambdaToS3.s3Bucket?.grantRead(_publishTopicTerms.lambdaFunction);
            lambdaPublishEventPolicy.attachToRole(_publishTopicTerms.lambdaFunction.role as iam.Role);

            const _publishTopics = new StepFuncLambdaTask(this, `${platformType}PublishTopics`, {
                taskName: `Publish Topics for ${platformType}`,
                lambdaFunctionProps: {
                    runtime: lambda.Runtime.PYTHON_3_8,
                    handler: 'lambda_function.topic_handler',
                    code: lambda.Code.fromAsset('lambda/wf_publish_topic_model'),
                    environment: {
                        RAW_DATA_FEED: props.rawBucket.bucketName,
                        EVENT_BUS_NAME: props.eventBus.eventBusName,
                        TOPICS_EVENT_NAMESPACE: props.topicsAnalaysisNameSpace,
                        TOPIC_MAPPINGS_EVENT_NAMESPACE: props.topicMappingsNameSpace,
                        SOURCE_PREFIX: platformType,
                        QUEUE_NAME: _publishTopicsMappings.sqsQueue.queueName
                    },
                    timeout: cdk.Duration.minutes(15),
                    memorySize: 256 // with youtube, the size of the dictionary in the lambda is bigger
                },
                outputPath: '$.Payload'
            });
            lambdaToS3.s3Bucket?.grantRead(_publishTopics.lambdaFunction);
            _publishTopicsMappings.sqsQueue.grantSendMessages(_publishTopics.lambdaFunction);

            const _nestedParallel = new sfn.Parallel(this, `PublishTopicFor${platformType}`, {
                comment: `Parallely process topic inferences for ${platformType}`,
                inputPath: '$',
                outputPath: '$'
            });

            _nestedParallel.branch(_publishTopicTerms.stepFunctionTask);
            _nestedParallel.branch(_publishTopics.stepFunctionTask);
            parallel.branch(_nestedParallel);
        }

        const checkTopicStatus = new StepFuncLambdaTask(this, 'CheckStatus', {
            taskName: 'Check Status',
            lambdaFunctionProps: {
                runtime: lambda.Runtime.NODEJS_14_X,
                handler: 'index.handler',
                code: lambda.Code.fromAsset(`${__dirname}/../../lambda/wf-check-topic-model`),
                environment: {
                    SOURCE_PREFIX: platformTypes.join(',')
                }
            },
            inputPath: '$',
            outputPath: '$.Payload'
        });

        const lambdaDescribeJobPolicy = new iam.Policy(this, 'LambdaDescribeJobPolicy', {
            statements: [ new iam.PolicyStatement({
                effect: iam.Effect.ALLOW,
                actions: [
                    'comprehend:DescribeTopicsDetectionJob'
                ],
                resources: [ '*' ]
            })]
        });
        lambdaDescribeJobPolicy.attachToRole(checkTopicStatus.lambdaFunction.role as iam.Role);
        (lambdaDescribeJobPolicy.node.defaultChild as iam.CfnPolicy).cfnOptions.metadata = {
            cfn_nag: {
                rules_to_suppress: [{
                  id: 'W12',
                  reason: `The * resource allows lambda function to access Amazon Comprehend services for Topic Detection. The Comprehend services not have a resource arn. This permission is retricted to the lambda function responsible for accessing the Amazon Comprehend service`
                }]
            }
        };

        const jobSubmittedChoice = new sfn.Choice(this, 'JobSubmitted?', {
            comment: 'Check if the topic modeling job is submitted',
            inputPath: '$'
        });

        const jobStatusChoice = new sfn.Choice(this, 'JobComplete?', {
            comment: 'Check if the topic modeling job is complete',
            inputPath: '$'
        });

        const jobWait = new sfn.Wait(this, 'Wait', {
            time: sfn.WaitTime.duration(cdk.Duration.minutes(10))
        });

        jobSubmittedChoice.when(sfn.Condition.stringEquals('$.JobStatus', 'NO_DATA'), new sfn.Fail(this, 'NoData'));
        jobSubmittedChoice.when(sfn.Condition.stringEquals('$.JobStatus', 'FAILED'), new sfn.Fail(this, 'SubmitFailed'));
        jobSubmittedChoice.otherwise(checkTopicStatus.stepFunctionTask);
        checkTopicStatus.stepFunctionTask.next(jobStatusChoice);
        jobStatusChoice.when(sfn.Condition.stringEquals('$.JobStatus', 'COMPLETED'), parallel);
        jobStatusChoice.when(sfn.Condition.stringEquals('$.JobStatus', 'IN_PROGRESS'), jobWait);
        jobStatusChoice.when(sfn.Condition.stringEquals('$.JobStatus', 'SUBMITTED'), jobWait);
        jobStatusChoice.otherwise(new sfn.Fail(this, 'JobFailed'));
        jobWait.next(checkTopicStatus.stepFunctionTask);

        const chain = sfn.Chain.start(submitTopicTask.stepFunctionTask).next(jobSubmittedChoice)

        const topicWorkflow = new Workflow(this, 'TopicModelWF', {
            stateMachineType: sfn.StateMachineType.STANDARD,
            chain: chain
        });
        topicScheduleRule.addTarget(new targets.SfnStateMachine(topicWorkflow.stateMachine));

        (topicWorkflow.stateMachine.role.node.tryFindChild('DefaultPolicy')?.node.findChild('Resource') as iam.CfnPolicy).addMetadata("cfn_nag",{
            rules_to_suppress: [{
                id: 'W76',
                reason: 'The statemachine invokes multiple lambdas and the policy is narrowed down to the specific lambda resource arn. '+
                    'Hence it has multiple policy statements resulting in a higher SPCM value',
            }, {
                id: 'W12',
                reason: 'The "LogDelivery" actions do not support resource-level authorization'
            }]
        });
    }