source/infrastructure/forecast/aws_lambda/functions/sns.py [47:86]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        )

    def create_sns(self):
        """
        Create the SNS topic using AWS Solutions Constructs
        :return:
        """
        lambda_sns = LambdaToSns(
            self,
            "NotificationConfiguration",
            existing_lambda_obj=self.function,
            topic_props=TopicProps(
                display_name=f"{self.node.try_get_context('SOLUTION_NAME')} Notifications"
            ),
        )
        topic = lambda_sns.sns_topic
        topic.node.default_child.override_logical_id("NotificationTopic")
        return topic

    def create_subscription(self, email, email_provided):
        logical_id = "NotificationSubscription"
        subscription = Subscription(
            self,
            logical_id,
            topic=self.topic,
            endpoint=email.value_as_string,
            protocol=SubscriptionProtocol.EMAIL,
        )
        subscription.node.default_child.override_logical_id(logical_id)
        Aspects.of(subscription).add(ConditionalResources(email_provided))
        return subscription

    def _create_resources(self):
        self.topic = self.create_sns()
        self.subscription = self.create_subscription(
            email=self.email, email_provided=self.email_provided
        )

    def _set_permissions(self) -> None:
        self.topic.grant_publish(self.function)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



source/infrastructure/forecast/sns/notifications.py [52:91]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        )

    def create_sns(self):
        """
        Create the SNS topic using AWS Solutions Constructs
        :return:
        """
        lambda_sns = LambdaToSns(
            self,
            "NotificationConfiguration",
            existing_lambda_obj=self.function,
            topic_props=TopicProps(
                display_name=f"{self.node.try_get_context('SOLUTION_NAME')} Notifications"
            ),
        )
        topic = lambda_sns.sns_topic
        topic.node.default_child.override_logical_id("NotificationTopic")
        return topic

    def create_subscription(self, email, email_provided):
        logical_id = "NotificationSubscription"
        subscription = Subscription(
            self,
            logical_id,
            topic=self.topic,
            endpoint=email.value_as_string,
            protocol=SubscriptionProtocol.EMAIL,
        )
        subscription.node.default_child.override_logical_id(logical_id)
        Aspects.of(subscription).add(ConditionalResources(email_provided))
        return subscription

    def _create_resources(self):
        self.topic = self.create_sns()
        self.subscription = self.create_subscription(
            email=self.email, email_provided=self.email_provided
        )

    def _set_permissions(self) -> None:
        self.topic.grant_publish(self.function)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



