def __init__()

in cdk/cdk/stack.py [0:0]


    def __init__(self, scope: cdk.Construct, construct_id: str, **kwargs) -> None:
        super().__init__(
            scope,
            construct_id,
            **{"description": "Amazon Forecast Accelerator (uksb-1s7c5ojr9)"},
        )
        email_address = core.CfnParameter(self, "emailAddress",
                description="(Required) An e-mail address with which to receive "
                "deployment notifications.")

        instance_type = core.CfnParameter(self, "instanceType",
                default="ml.t2.medium",
                description="(Required) SageMaker Notebook instance type on which to host "
                "the AFA dashboard (e.g. ml.t2.medium, ml.t3.xlarge, ml.t3.2xlarge, ml.m4.4xlarge)")

        self.afa_branch = kwargs.get("afa_branch", "main")
        self.lambdamap_branch = kwargs.get("lambdamap_branch", "main")
        self.lambdamap_function_name = kwargs.get("lambdamap_function_name", "AfaLambdaMapFunction")

        #
        # S3 Bucket
        #
        bucket = s3.Bucket(self, "Bucket", auto_delete_objects=False,
            removal_policy=core.RemovalPolicy.DESTROY,
            encryption=s3.BucketEncryption.S3_MANAGED,
            block_public_access=s3.BlockPublicAccess.BLOCK_ALL)

        #
        # SSM Parameter Store
        #
        ssm_s3_bucket_path_param = ssm.StringParameter(self,
                "AfaSsmS3Bucket",
                string_value=bucket.bucket_name,
                parameter_name="AfaS3Bucket")

        ssm_s3_input_path_param = ssm.StringParameter(self,
                "AfaSsmS3InputPath",
                string_value=f"s3://{bucket.bucket_name}/input/",
                parameter_name="AfaS3InputPath")

        ssm_s3_output_path_param = ssm.StringParameter(self,
                "AfaSsmS3OutputPath",
                string_value=f"s3://{bucket.bucket_name}/afc-exports/",
                parameter_name="AfaS3OutputPath")

        #
        # SNS topic for email notification
        #
        topic = \
            sns.Topic(self, f"NotificationTopic",
                topic_name=f"{construct_id}-NotificationTopic")

        topic.add_subscription(
            subscriptions.EmailSubscription(email_address.value_as_string))

        self.topic = topic

        sns_lambda_role = iam.Role(
            self,
            f"SnsEmailLambdaRole",
            assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
            managed_policies=[
                iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSNSFullAccess")
            ])

        self.sns_lambda_role = sns_lambda_role

        sns_lambda = lambda_.Function(self,
            f"SnsEmailLambda",
            runtime=lambda_.Runtime.PYTHON_3_8,
            environment={"TOPIC_ARN": f"arn:aws:sns:{self.region}:{self.account}:{topic.topic_name}"},
            code=self.make_dashboard_ready_email_inline_code(),
            handler="index.lambda_handler",
            role=sns_lambda_role)

        #
        # Notebook lifecycle configuration
        #
        notebook_instance_name = f"{construct_id}-NotebookInstance"
        lcc = self.make_nb_lcc(construct_id, notebook_instance_name,
                sns_lambda.function_name)
        #
        # Notebook role
        #
        sm_role = iam.Role(
            self,
            f"NotebookRole",
            assumed_by=iam.ServicePrincipal("sagemaker.amazonaws.com"))

        sm_policy = \
            iam.Policy(
                self,
                "SmPolicy",
                roles=[sm_role],
                statements=[
                    # Lambda
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "lambda:*",
                        ],
                        resources=[
                            f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{self.lambdamap_function_name}",
                            f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{core.Aws.STACK_NAME}*",
                        ]
                    ),

                    # S3
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "s3:*"
                        ],
                        resources=[
                            f"arn:aws:s3:::{construct_id.lower()}*",
                        ]
                    ),

                    # SageMaker
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "sagemaker:DescribeNotebookInstanceLifecycleConfig",
                            "sagemaker:DeleteNotebookInstance",
                            "sagemaker:StopNotebookInstance",
                            "sagemaker:DescribeNotebookInstance",
                            "sagemaker:CreateNotebookInstanceLifecycleConfig",
                            "sagemaker:DeleteNotebookInstanceLifecycleConfig",
                            "sagemaker:UpdateNotebookInstanceLifecycleConfig",
                            "sagemaker:CreateNotebookInstance",
                            "sagemaker:UpdateNotebookInstance"
                        ],
                        resources=[
                            f"arn:aws:sagemaker:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:notebook-instance/{construct_id.lower()}*",
                            f"arn:aws:sagemaker:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:notebook-instance-lifecycle-config/notebooklifecycleconfig*",
                        ]
                    ),

                    # Step Functions
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "states:*"
                        ],
                        resources=[
                            f"arn:aws:states:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:*:{core.Aws.STACK_NAME}*",
                        ]
                    ),

                    # SSM
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "ssm:*"
                        ],
                        resources=[
                            f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaS3Bucket",
                            f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaS3InputPath",
                            f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaS3OutputPath",
                            f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaAfcStateMachineArn",
                        ]
                    ),
                ]
            )

        #
        # Notebook instance
        #
        sm.CfnNotebookInstance(
            self,
            f"NotebookInstance",
            role_arn=sm_role.role_arn,
            instance_type=instance_type.value_as_string,
            notebook_instance_name=notebook_instance_name,
            volume_size_in_gb=16,
            lifecycle_config_name=lcc.attr_notebook_instance_lifecycle_config_name,
            )
        
        # AFC/Lambda role
        afc_role = \
            iam.Role(
            self,
            f"AfcRole",
            assumed_by=iam.CompositePrincipal(
                iam.ServicePrincipal("forecast.amazonaws.com"),
                iam.ServicePrincipal("lambda.amazonaws.com")
            ),
            managed_policies=[
                iam.ManagedPolicy.from_aws_managed_policy_name("AmazonForecastFullAccess"),
            ])

        afc_policy = \
            iam.Policy(
                self,
                "AfcPolicy",
                roles=[afc_role],
                statements=[
                    # Lambda
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "lambda:*",
                        ],
                        resources=[
                            f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{self.lambdamap_function_name}",
                            f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{core.Aws.STACK_NAME}*",
                        ]
                    ),

                    # S3
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "s3:*"
                        ],
                        resources=[
                            f"arn:aws:s3:::{construct_id.lower()}*",
                        ]
                    ),

                    # Logging
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "logs:*"
                        ],
                        resources=[
                            f"arn:aws:logs:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:log-group:/aws/lambda/{core.Aws.STACK_NAME}*"
                        ]
                    ),

                    # SNS
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=[
                            "sns:*"
                        ],
                        resources=[
                            f"arn:aws:sns:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:{core.Aws.STACK_NAME}*"
                        ]
                    ),
                ]
            )
        
        fail_state = sfn.Fail(self, "Fail")
        succeed_state = sfn.Succeed(self, "Succeed")

        #
        # PREPARE DATA
        #
        prepare_lambda = \
            lambda_.Function(
                self,
                "PrepareLambda",
                runtime=lambda_.Runtime.PYTHON_3_8,
                handler="index.prepare_handler",
                code=lambda_.Code.from_inline(open(os.path.join(PWD, "afc_lambdas", "prepare.py")).read()),
                environment={
                    "AFC_ROLE_ARN": afc_role.role_arn
                },
                role=afc_role,
                timeout=core.Duration.seconds(900)
            )

        prepare_step = \
            tasks.LambdaInvoke(
                self,
                "PrepareDataStep",
                lambda_function=prepare_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        #
        # CREATE PREDICTOR
        #
        create_predictor_lambda = \
            lambda_.Function(
                self,
                "CreatedPredictorLambda",
                runtime=lambda_.Runtime.PYTHON_3_8,
                handler="index.create_predictor_handler",
                code=lambda_.Code.from_inline(open(os.path.join(PWD, "afc_lambdas", "create_predictor.py")).read()),
                environment={
                    "AFC_ROLE_ARN": afc_role.role_arn
                },
                role=afc_role,
                timeout=core.Duration.seconds(900))

        create_predictor_step = \
            tasks.LambdaInvoke(
                self,
                "CreatePredictorStep",
                lambda_function=create_predictor_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        create_predictor_step.add_retry(
                backoff_rate=1.05,
                interval=core.Duration.seconds(60),
                max_attempts=1000,
                errors=["ResourceNotFoundException",
                        "ResourceInUseException",
                        "ResourcePendingException"])

        #
        # CREATE FORECAST
        #
        create_forecast_lambda = \
            lambda_.Function(
                self,
                "CreatedForecastLambda",
                runtime=lambda_.Runtime.PYTHON_3_8,
                handler="index.create_forecast_handler",
                code=lambda_.Code.from_inline(
                    open(os.path.join(PWD, "afc_lambdas", "create_forecast.py")).read()),
                role=afc_role,
                timeout=core.Duration.seconds(900))

        create_forecast_step = \
            tasks.LambdaInvoke(
                self,
                "CreateforecastStep",
                lambda_function=create_forecast_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        create_forecast_step.add_retry(
            backoff_rate=1.1,
            interval=core.Duration.seconds(60),
            max_attempts=2000,
            errors=["ResourceNotFoundException",
                    "ResourceInUseException",
                    "ResourcePendingException"])

        #
        # CREATE FORECAST EXPORT
        #
        create_forecast_export_lambda = \
            lambda_.Function(
                self,
                "CreateExportLambda",
                runtime=lambda_.Runtime.PYTHON_3_8,
                handler="index.create_forecast_export_handler",
                code=lambda_.Code.from_inline(
                    open(os.path.join(PWD, "afc_lambdas", "create_export.py")).read()),
                environment={
                    "AFC_ROLE_ARN": afc_role.role_arn
                },
                role=afc_role,
                timeout=core.Duration.seconds(900))

        create_forecast_export_step = \
            tasks.LambdaInvoke(
                self,
                "CreateExportStep",
                lambda_function=create_forecast_export_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        create_forecast_export_step.add_retry(
            backoff_rate=1.1,
            interval=core.Duration.seconds(60),
            max_attempts=2000,
            errors=["ResourceInUseException",
                    "ResourcePendingException"])

        #
        # BACKTEST EXPORT FILE(s)
        #
        create_predictor_backtest_export_lambda = \
            lambda_.Function(
                self,
                "CreatePredictorBacktestExportLambda",
                runtime=lambda_.Runtime.PYTHON_3_8,
                handler="index.create_predictor_backtest_export_handler",
                code=lambda_.Code.from_inline(
                    open(os.path.join(PWD, "afc_lambdas",
                            "create_predictor_backtest_export.py")).read()),
                environment={
                    "AFC_ROLE_ARN": afc_role.role_arn
                },
                role=afc_role,
                timeout=core.Duration.seconds(900))

        create_predictor_backtest_export_step = \
            tasks.LambdaInvoke(
                self,
                "CreatePredictorBacktestExportStep",
                lambda_function=create_predictor_backtest_export_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        create_predictor_backtest_export_step.add_retry(
            backoff_rate=1.1,
            interval=core.Duration.seconds(60),
            max_attempts=2000,
            errors=["ResourceInUseException",
                    "ResourcePendingException"])

        #
        # POSTPROCESS FORECAST EXPORT FILE(s)
        #
        postprocess_lambda = \
            lambda_.Function(self, 
                f"PostProcessLambda",
                code=lambda_.EcrImageCode \
                            .from_asset_image(
                    directory=os.path.join(PWD, "afc_lambdas", "postprocess")),
                runtime=lambda_.Runtime.FROM_IMAGE,
                handler=lambda_.Handler.FROM_IMAGE,
                memory_size=10240,
                role=afc_role,
                timeout=core.Duration.seconds(900))

        postprocess_step = \
            tasks.LambdaInvoke(
                self,
                "PostProcessStep",
                lambda_function=postprocess_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        postprocess_step.add_retry(
            backoff_rate=1.1,
            interval=core.Duration.seconds(30),
            max_attempts=2000,
            errors=["NoFilesFound",
                    "ResourceInUseException",
                    "ResourcePendingException"])

        # DELETE AFC RESOURCES
        delete_afc_resources_lambda = \
            lambda_.Function(
                self,
                "DeleteAfcResourcesLambda",
                runtime=lambda_.Runtime.PYTHON_3_8,
                handler="index.delete_afc_resources_handler",
                code=lambda_.Code.from_inline(
                    open(os.path.join(PWD, "afc_lambdas", "delete_resources.py")).read()),
                role=afc_role,
                timeout=core.Duration.seconds(900))
        
        delete_afc_resources_step = \
            tasks.LambdaInvoke(
                self,
                "DeleteAfcResourcesStep",
                lambda_function=delete_afc_resources_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        delete_afc_resources_step.add_retry(
            backoff_rate=1.1,
            interval=core.Duration.seconds(60),
            max_attempts=2000,
            errors=["ResourceNotFoundException",
                    "ResourceInUseException",
                    "ResourcePendingException"])

        #
        # SNS EMAIL
        #
        sns_afc_email_lambda = \
            lambda_.Function(self, f"{construct_id}-SnsAfcEmailLambda",
            runtime=lambda_.Runtime.PYTHON_3_8,
            environment={"TOPIC_ARN": topic.topic_arn},
            code=self.make_afc_email_inline_code(),
            handler="index.lambda_handler",
            role=afc_role)

        sns_afc_email_step = \
            tasks.LambdaInvoke(
                self,
                "SnsAfcEmailStep",
                lambda_function=sns_afc_email_lambda,
                payload=sfn.TaskInput.from_object({
                    "input": sfn.JsonPath.string_at("$")
                })
            )

        #
        # State machine
        #
        definition = prepare_step.next(create_predictor_step) \
                                 .next(create_forecast_step) \
                                 .next(create_predictor_backtest_export_step) \
                                 .next(create_forecast_export_step) \
                                 .next(postprocess_step) \
                                 .next(delete_afc_resources_step) \
                                 .next(sns_afc_email_step)

        state_machine = sfn.StateMachine(self,
            "AfaSsmAfcStateMachine",
            state_machine_name=f"{construct_id}-AfcStateMachine",
            definition=definition,
            timeout=core.Duration.hours(24))

        ssm_state_machine_param = ssm.StringParameter(self,
            "AfaSsmAfcStateMachineArn",
            string_value=state_machine.state_machine_arn,
            parameter_name="AfaAfcStateMachineArn")