def __init__()

in source/infrastructure/personalize/stack.py [0:0]


    def __init__(self, scope: Construct, construct_id: str, *args, **kwargs) -> None:
        super().__init__(scope, construct_id, *args, **kwargs)

        # CloudFormation Parameters
        self.email = CfnParameter(
            self,
            id="Email",
            type="String",
            description="Email to notify with personalize workflow results",
            default="",
            max_length=50,
            allowed_pattern=r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$|^$)",
            constraint_description="Must be a valid email address or blank",
        )
        self.solutions_template_options.add_parameter(
            self.email, "Email", "Solution Configuration"
        )
        self.email_provided = CfnCondition(
            self,
            "EmailProvided",
            expression=Fn.condition_not(Fn.condition_equals(self.email, "")),
        )

        self.personalize_kms_key_arn = CfnParameter(
            self,
            id="PersonalizeKmsKeyArn",
            description="Provide Amazon Personalize with an alternate AWS Key Management (KMS) key to use to encrypt your datasets",
            default="",
            allowed_pattern="(^arn:.*:kms:.*:.*:key/.*$|^$)",
        )
        self.solutions_template_options.add_parameter(
            self.personalize_kms_key_arn,
            "(Optional) KMS key ARN used to encrypt Datasets managed by Amazon Personalize",
            "Security Configuration",
        )
        kms_enabled = CfnCondition(
            self,
            "PersonalizeSseKmsEnabled",
            expression=Fn.condition_not(
                Fn.condition_equals(self.personalize_kms_key_arn, "")
            ),
        )

        # layers
        layer_powertools = PowertoolsLayer.get_or_create(self)
        layer_solutions = SolutionsLayer.get_or_create(self)
        common_layers = [layer_powertools, layer_solutions]

        # buckets
        access_logs_bucket = AccessLogsBucket(
            self,
            "AccessLogsBucket",
            suppress=[
                CfnNagSuppression(
                    "W35",
                    "This bucket is used as the logging destination for personalization data processing",
                )
            ],
        )

        data_bucket = DataBucket(
            self,
            "PersonalizeBucket",
            server_access_logs_bucket=access_logs_bucket,
            server_access_logs_prefix="personalize-bucket-access-logs/",
        )

        # the AWS lambda functions required by the shared step functions
        create_config = CreateConfig(self, "CreateConfig", layers=common_layers)
        prepare_input = PrepareInput(
            self,
            "Prepare Input",
            layers=common_layers,
        )
        create_dataset_group = CreateDatasetGroup(
            self,
            "Create Dataset Group",
            input_path="$.datasetGroup",  # NOSONAR (python:S1192) - string for clarity
            result_path="$.datasetGroup.serviceConfig",  # NOSONAR (python:S1192) - string for clarity
            kms_enabled=kms_enabled,
            kms_key=self.personalize_kms_key_arn,
            layers=common_layers,
        )
        create_schema = CreateSchema(
            self,
            "Create Schema",
            layers=common_layers,
        )
        create_dataset = CreateDataset(
            self,
            "Create Dataset",
            layers=common_layers,
        )
        create_dataset_import_job = CreateDatasetImportJob(
            self,
            "Create Dataset Import Job",
            layers=common_layers,
            personalize_bucket=data_bucket,
        )
        notifications = Notifications(
            self,
            "SNS Notification",
            email=self.email,
            email_provided=self.email_provided,
            layers=common_layers,
        )
        create_event_tracker = CreateEventTracker(
            self,
            "Create Event Tracker",
            layers=common_layers,
        )
        create_solution = CreateSolution(
            self,
            "Create Solution",
            layers=common_layers,
        )
        create_recommender = CreateRecommender(
            self,
            "Create Recommender",
            layers=common_layers,
        )
        create_solution_version = CreateSolutionVersion(
            self,
            "Create Solution Version",
            layers=common_layers,
        )
        create_campaign = CreateCampaign(
            self,
            "Create Campaign",
            layers=common_layers,
        )
        create_batch_inference_job = CreateBatchInferenceJob(
            self,
            "Create Batch Inference Job",
            layers=common_layers,
            personalize_bucket=data_bucket,
        )
        create_batch_segment_job = CreateBatchSegmentJob(
            self,
            "Create Batch Segment Job",
            layers=common_layers,
            personalize_bucket=data_bucket,
        )
        create_filter = CreateFilter(self, "Create Filter", layers=common_layers)
        create_timestamp = CreateTimestamp(
            self, "Create Timestamp", layers=[layer_powertools]
        )

        # EventBridge events can be triggered for resource creation and update
        # Note: https://github.com/aws/aws-cdk/issues/17338
        bus_name = (
            f"aws-solutions-{self.node.try_get_context('SOLUTION_ID')}-{Aws.STACK_NAME}"
        )
        event_bus = EventBus(
            self,
            id="Notifications",
            event_bus_name=bus_name,
        )
        event_bus.node.default_child.add_override(
            "Properties.Name",
            bus_name,
        )

        create_dataset_group.grant_put_events(event_bus)
        create_schema.grant_put_events(event_bus)
        create_dataset.grant_put_events(event_bus)
        create_dataset_import_job.grant_put_events(event_bus)
        create_event_tracker.grant_put_events(event_bus)
        create_recommender.grant_put_events(event_bus)
        create_solution.grant_put_events(event_bus)
        create_solution_version.grant_put_events(event_bus)
        create_campaign.grant_put_events(event_bus)
        create_batch_inference_job.grant_put_events(event_bus)
        create_batch_segment_job.grant_put_events(event_bus)
        create_filter.grant_put_events(event_bus)

        dataset_management_functions = {
            "create_schema": create_schema,
            "create_dataset": create_dataset,
            "create_dataset_import_job": create_dataset_import_job,
        }

        success = notifications.state(
            self,
            "Success",
            payload=TaskInput.from_object(
                {"datasetGroup.$": "$[0].datasetGroup.serviceConfig.name"}
            ),
        )

        dataset_import_schedule_sfn = ScheduledDatasetImport(
            self,
            "Scheduled Dataset Import",
            dataset_management_functions=dataset_management_functions,
            create_timestamp=create_timestamp,
            notifications=notifications,
            prepare_input=prepare_input,
        ).state_machine

        solution_maintenance_schedule_sfn = ScheduledSolutionMaintenance(
            self,
            "Scheduled Solution Maintenance",
            prepare_input=prepare_input,
            create_solution=create_solution,
            create_solution_version=create_solution_version,
            create_campaign=create_campaign,
            create_batch_inference_job=create_batch_inference_job,
            create_batch_segment_job=create_batch_segment_job,
            create_timestamp=create_timestamp,
            notifications=notifications,
            create_recommender=create_recommender,
        ).state_machine

        # scheduler and step function to schedule
        scheduler = Scheduler(self, "Scheduler")
        scheduler.grant_invoke(dataset_import_schedule_sfn)
        scheduler.grant_invoke(solution_maintenance_schedule_sfn)

        schedules = Schedules(
            dataset_import=SchedulerFragment(
                self,
                schedule_for="personalize dataset import",
                schedule_for_suffix="$.datasetGroup.serviceConfig.name",
                scheduler=scheduler,
                target=dataset_import_schedule_sfn,
                schedule_path="$.datasetGroup.workflowConfig.schedules.import",
                schedule_input={
                    "datasetGroup": {
                        "serviceConfig.$": "$.datasetGroup.serviceConfig",
                        "workflowConfig": {"maxAge": "1 second"},
                    },  # NOSONAR (python:S1192) - string for clarity
                    "datasets.$": "$.datasets",
                    "bucket.$": "$.bucket",
                },
            ),
        )

        create_solutions = SolutionFragment(
            self,
            "Create Solutions",
            create_solution=create_solution,
            create_solution_version=create_solution_version,
            create_campaign=create_campaign,
            create_batch_inference_job=create_batch_inference_job,
            create_batch_segment_job=create_batch_segment_job,
            create_recommender=create_recommender,
            scheduler=scheduler,
            to_schedule=solution_maintenance_schedule_sfn,
        )

        # fmt: off
        definition = Chain.start(
            Parallel(self, "Manage The Execution")
            .branch(
                prepare_input.state(
                    self,
                    "Prepare Input",
                ) .next(
                    create_dataset_group.state(
                        self,
                        "Create Dataset Group",
                        backoff_rate=1.02,
                        interval=Duration.seconds(5),
                        max_attempts=30,
                    )
                ).next(
                    DatasetImportsFragment(
                        self,
                        "Handle Dataset Imports",
                        **dataset_management_functions
                    )
                ).next(
                    schedules.dataset_import
                ).next(
                    EventTrackerFragment(self, "Event Tracker", create_event_tracker=create_event_tracker)
                ).next(
                    FilterFragment(self, "Filters", create_filter=create_filter)  # filters require data to be present
                ).next(
                    create_solutions
                )
            )
            .add_catch(
                FailureFragment(self, notifications).start_state,
                errors=["States.ALL"],
                result_path="$.statesError"
            )
            .next(success)
        )
        # fmt: on

        state_machine_namer = ResourceName(
            self, "StateMachineName", purpose="personalize-workflow", max_length=80
        )
        state_machine = StateMachine(
            self,
            "PersonalizeStateMachine",
            state_machine_name=state_machine_namer.resource_name.to_string(),
            definition=definition,
            tracing_enabled=True,
        )
        add_cfn_nag_suppressions(
            state_machine.role.node.try_find_child("DefaultPolicy").node.find_child(
                "Resource"
            ),
            [
                CfnNagSuppression(
                    "W12", "IAM policy for AWS X-Ray requires an allow on *"
                ),
                CfnNagSuppression(
                    "W76",
                    "Large step functions need larger IAM roles to access all managed AWS Lambda functions",
                ),
            ],
        )

        s3_event_handler = S3EventHandler(
            self,
            "S3EventHandler",
            state_machine=state_machine,
            bucket=data_bucket,
            layers=[layer_powertools, layer_solutions],
            topic=notifications.topic,
        )
        s3_event_notification = LambdaDestination(s3_event_handler)
        data_bucket.add_event_notification(
            EventType.OBJECT_CREATED,
            s3_event_notification,
            NotificationKeyFilter(prefix="train/", suffix=".json"),
        )

        # Handle suppressions for the notification handler resource generated by CDK
        bucket_notification_handler = self.node.try_find_child(
            "BucketNotificationsHandler050a0587b7544547bf325f094a3db834"
        )
        bucket_notification_policy = (
            bucket_notification_handler.node.find_child("Role")
            .node.find_child("DefaultPolicy")
            .node.find_child("Resource")
        )
        add_cfn_nag_suppressions(
            bucket_notification_policy,
            [
                CfnNagSuppression(
                    "W12",
                    "bucket resource is '*' due to circular dependency with bucket and role creation at the same time",
                )
            ],
        )

        Tags.of(self).add("SOLUTION_ID", self.node.try_get_context("SOLUTION_ID"))
        Tags.of(self).add("SOLUTION_NAME", self.node.try_get_context("SOLUTION_NAME"))
        Tags.of(self).add(
            "SOLUTION_VERSION", self.node.try_get_context("SOLUTION_VERSION")
        )

        Aspects.of(self).add(
            CfnNagSuppressAll(
                suppress=[
                    CfnNagSuppression(
                        "W89",
                        "functions deployed by this solution do not require VPC access",
                    ),
                    CfnNagSuppression(
                        "W92",
                        "functions deployed by this solution do not require reserved concurrency",
                    ),
                    CfnNagSuppression(
                        "W58",
                        "functions deployed by this solution use custom policy to write to CloudWatch logs",
                    ),
                ],
                resource_type="AWS::Lambda::Function",
            )
        )

        # dashboard
        self.dashboard = Dashboard(
            self,
            "PersonalizeDashboard",
            scheduler_sfn_arn=scheduler.state_machine_arn,
            personalize_bucket_name=data_bucket.bucket_name,
        )

        # outputs
        CfnOutput(
            self,
            "PersonalizeBucketName",
            value=data_bucket.bucket_name,
            export_name=f"{Aws.STACK_NAME}-PersonalizeBucketName",
        )
        CfnOutput(
            self,
            "SchedulerTableName",
            value=scheduler.scheduler_table.table_name,
            export_name=f"{Aws.STACK_NAME}-SchedulerTableName",
        )
        CfnOutput(
            self,
            "SchedulerStepFunctionArn",
            value=scheduler.state_machine_arn,
            export_name=f"{Aws.STACK_NAME}-SchedulerStepFunctionArn",
        )
        CfnOutput(
            self,
            "Dashboard",
            value=self.dashboard.name,
            export_name=f"{Aws.STACK_NAME}-Dashboard",
        )
        CfnOutput(
            self,
            "SNSTopicArn",
            value=notifications.topic.topic_arn,
            export_name=f"{Aws.STACK_NAME}-SNSTopicArn",
        )
        CfnOutput(
            self,
            "EventBusArn",
            value=event_bus.event_bus_arn,
            export_name=f"{Aws.STACK_NAME}-EventBusArn",
        )
        CfnOutput(
            self,
            "CreateConfigFunctionArn",
            value=create_config.function_arn,
            export_name=f"{Aws.STACK_NAME}-CreateConfigFunctionArn",
        )