in source/infrastructure/personalize/stack.py [0:0]
def __init__(self, scope: Construct, construct_id: str, *args, **kwargs) -> None:
super().__init__(scope, construct_id, *args, **kwargs)
# CloudFormation Parameters
self.email = CfnParameter(
self,
id="Email",
type="String",
description="Email to notify with personalize workflow results",
default="",
max_length=50,
allowed_pattern=r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$|^$)",
constraint_description="Must be a valid email address or blank",
)
self.solutions_template_options.add_parameter(
self.email, "Email", "Solution Configuration"
)
self.email_provided = CfnCondition(
self,
"EmailProvided",
expression=Fn.condition_not(Fn.condition_equals(self.email, "")),
)
self.personalize_kms_key_arn = CfnParameter(
self,
id="PersonalizeKmsKeyArn",
description="Provide Amazon Personalize with an alternate AWS Key Management (KMS) key to use to encrypt your datasets",
default="",
allowed_pattern="(^arn:.*:kms:.*:.*:key/.*$|^$)",
)
self.solutions_template_options.add_parameter(
self.personalize_kms_key_arn,
"(Optional) KMS key ARN used to encrypt Datasets managed by Amazon Personalize",
"Security Configuration",
)
kms_enabled = CfnCondition(
self,
"PersonalizeSseKmsEnabled",
expression=Fn.condition_not(
Fn.condition_equals(self.personalize_kms_key_arn, "")
),
)
# layers
layer_powertools = PowertoolsLayer.get_or_create(self)
layer_solutions = SolutionsLayer.get_or_create(self)
common_layers = [layer_powertools, layer_solutions]
# buckets
access_logs_bucket = AccessLogsBucket(
self,
"AccessLogsBucket",
suppress=[
CfnNagSuppression(
"W35",
"This bucket is used as the logging destination for personalization data processing",
)
],
)
data_bucket = DataBucket(
self,
"PersonalizeBucket",
server_access_logs_bucket=access_logs_bucket,
server_access_logs_prefix="personalize-bucket-access-logs/",
)
# the AWS lambda functions required by the shared step functions
create_config = CreateConfig(self, "CreateConfig", layers=common_layers)
prepare_input = PrepareInput(
self,
"Prepare Input",
layers=common_layers,
)
create_dataset_group = CreateDatasetGroup(
self,
"Create Dataset Group",
input_path="$.datasetGroup", # NOSONAR (python:S1192) - string for clarity
result_path="$.datasetGroup.serviceConfig", # NOSONAR (python:S1192) - string for clarity
kms_enabled=kms_enabled,
kms_key=self.personalize_kms_key_arn,
layers=common_layers,
)
create_schema = CreateSchema(
self,
"Create Schema",
layers=common_layers,
)
create_dataset = CreateDataset(
self,
"Create Dataset",
layers=common_layers,
)
create_dataset_import_job = CreateDatasetImportJob(
self,
"Create Dataset Import Job",
layers=common_layers,
personalize_bucket=data_bucket,
)
notifications = Notifications(
self,
"SNS Notification",
email=self.email,
email_provided=self.email_provided,
layers=common_layers,
)
create_event_tracker = CreateEventTracker(
self,
"Create Event Tracker",
layers=common_layers,
)
create_solution = CreateSolution(
self,
"Create Solution",
layers=common_layers,
)
create_recommender = CreateRecommender(
self,
"Create Recommender",
layers=common_layers,
)
create_solution_version = CreateSolutionVersion(
self,
"Create Solution Version",
layers=common_layers,
)
create_campaign = CreateCampaign(
self,
"Create Campaign",
layers=common_layers,
)
create_batch_inference_job = CreateBatchInferenceJob(
self,
"Create Batch Inference Job",
layers=common_layers,
personalize_bucket=data_bucket,
)
create_batch_segment_job = CreateBatchSegmentJob(
self,
"Create Batch Segment Job",
layers=common_layers,
personalize_bucket=data_bucket,
)
create_filter = CreateFilter(self, "Create Filter", layers=common_layers)
create_timestamp = CreateTimestamp(
self, "Create Timestamp", layers=[layer_powertools]
)
# EventBridge events can be triggered for resource creation and update
# Note: https://github.com/aws/aws-cdk/issues/17338
bus_name = (
f"aws-solutions-{self.node.try_get_context('SOLUTION_ID')}-{Aws.STACK_NAME}"
)
event_bus = EventBus(
self,
id="Notifications",
event_bus_name=bus_name,
)
event_bus.node.default_child.add_override(
"Properties.Name",
bus_name,
)
create_dataset_group.grant_put_events(event_bus)
create_schema.grant_put_events(event_bus)
create_dataset.grant_put_events(event_bus)
create_dataset_import_job.grant_put_events(event_bus)
create_event_tracker.grant_put_events(event_bus)
create_recommender.grant_put_events(event_bus)
create_solution.grant_put_events(event_bus)
create_solution_version.grant_put_events(event_bus)
create_campaign.grant_put_events(event_bus)
create_batch_inference_job.grant_put_events(event_bus)
create_batch_segment_job.grant_put_events(event_bus)
create_filter.grant_put_events(event_bus)
dataset_management_functions = {
"create_schema": create_schema,
"create_dataset": create_dataset,
"create_dataset_import_job": create_dataset_import_job,
}
success = notifications.state(
self,
"Success",
payload=TaskInput.from_object(
{"datasetGroup.$": "$[0].datasetGroup.serviceConfig.name"}
),
)
dataset_import_schedule_sfn = ScheduledDatasetImport(
self,
"Scheduled Dataset Import",
dataset_management_functions=dataset_management_functions,
create_timestamp=create_timestamp,
notifications=notifications,
prepare_input=prepare_input,
).state_machine
solution_maintenance_schedule_sfn = ScheduledSolutionMaintenance(
self,
"Scheduled Solution Maintenance",
prepare_input=prepare_input,
create_solution=create_solution,
create_solution_version=create_solution_version,
create_campaign=create_campaign,
create_batch_inference_job=create_batch_inference_job,
create_batch_segment_job=create_batch_segment_job,
create_timestamp=create_timestamp,
notifications=notifications,
create_recommender=create_recommender,
).state_machine
# scheduler and step function to schedule
scheduler = Scheduler(self, "Scheduler")
scheduler.grant_invoke(dataset_import_schedule_sfn)
scheduler.grant_invoke(solution_maintenance_schedule_sfn)
schedules = Schedules(
dataset_import=SchedulerFragment(
self,
schedule_for="personalize dataset import",
schedule_for_suffix="$.datasetGroup.serviceConfig.name",
scheduler=scheduler,
target=dataset_import_schedule_sfn,
schedule_path="$.datasetGroup.workflowConfig.schedules.import",
schedule_input={
"datasetGroup": {
"serviceConfig.$": "$.datasetGroup.serviceConfig",
"workflowConfig": {"maxAge": "1 second"},
}, # NOSONAR (python:S1192) - string for clarity
"datasets.$": "$.datasets",
"bucket.$": "$.bucket",
},
),
)
create_solutions = SolutionFragment(
self,
"Create Solutions",
create_solution=create_solution,
create_solution_version=create_solution_version,
create_campaign=create_campaign,
create_batch_inference_job=create_batch_inference_job,
create_batch_segment_job=create_batch_segment_job,
create_recommender=create_recommender,
scheduler=scheduler,
to_schedule=solution_maintenance_schedule_sfn,
)
# fmt: off
definition = Chain.start(
Parallel(self, "Manage The Execution")
.branch(
prepare_input.state(
self,
"Prepare Input",
) .next(
create_dataset_group.state(
self,
"Create Dataset Group",
backoff_rate=1.02,
interval=Duration.seconds(5),
max_attempts=30,
)
).next(
DatasetImportsFragment(
self,
"Handle Dataset Imports",
**dataset_management_functions
)
).next(
schedules.dataset_import
).next(
EventTrackerFragment(self, "Event Tracker", create_event_tracker=create_event_tracker)
).next(
FilterFragment(self, "Filters", create_filter=create_filter) # filters require data to be present
).next(
create_solutions
)
)
.add_catch(
FailureFragment(self, notifications).start_state,
errors=["States.ALL"],
result_path="$.statesError"
)
.next(success)
)
# fmt: on
state_machine_namer = ResourceName(
self, "StateMachineName", purpose="personalize-workflow", max_length=80
)
state_machine = StateMachine(
self,
"PersonalizeStateMachine",
state_machine_name=state_machine_namer.resource_name.to_string(),
definition=definition,
tracing_enabled=True,
)
add_cfn_nag_suppressions(
state_machine.role.node.try_find_child("DefaultPolicy").node.find_child(
"Resource"
),
[
CfnNagSuppression(
"W12", "IAM policy for AWS X-Ray requires an allow on *"
),
CfnNagSuppression(
"W76",
"Large step functions need larger IAM roles to access all managed AWS Lambda functions",
),
],
)
s3_event_handler = S3EventHandler(
self,
"S3EventHandler",
state_machine=state_machine,
bucket=data_bucket,
layers=[layer_powertools, layer_solutions],
topic=notifications.topic,
)
s3_event_notification = LambdaDestination(s3_event_handler)
data_bucket.add_event_notification(
EventType.OBJECT_CREATED,
s3_event_notification,
NotificationKeyFilter(prefix="train/", suffix=".json"),
)
# Handle suppressions for the notification handler resource generated by CDK
bucket_notification_handler = self.node.try_find_child(
"BucketNotificationsHandler050a0587b7544547bf325f094a3db834"
)
bucket_notification_policy = (
bucket_notification_handler.node.find_child("Role")
.node.find_child("DefaultPolicy")
.node.find_child("Resource")
)
add_cfn_nag_suppressions(
bucket_notification_policy,
[
CfnNagSuppression(
"W12",
"bucket resource is '*' due to circular dependency with bucket and role creation at the same time",
)
],
)
Tags.of(self).add("SOLUTION_ID", self.node.try_get_context("SOLUTION_ID"))
Tags.of(self).add("SOLUTION_NAME", self.node.try_get_context("SOLUTION_NAME"))
Tags.of(self).add(
"SOLUTION_VERSION", self.node.try_get_context("SOLUTION_VERSION")
)
Aspects.of(self).add(
CfnNagSuppressAll(
suppress=[
CfnNagSuppression(
"W89",
"functions deployed by this solution do not require VPC access",
),
CfnNagSuppression(
"W92",
"functions deployed by this solution do not require reserved concurrency",
),
CfnNagSuppression(
"W58",
"functions deployed by this solution use custom policy to write to CloudWatch logs",
),
],
resource_type="AWS::Lambda::Function",
)
)
# dashboard
self.dashboard = Dashboard(
self,
"PersonalizeDashboard",
scheduler_sfn_arn=scheduler.state_machine_arn,
personalize_bucket_name=data_bucket.bucket_name,
)
# outputs
CfnOutput(
self,
"PersonalizeBucketName",
value=data_bucket.bucket_name,
export_name=f"{Aws.STACK_NAME}-PersonalizeBucketName",
)
CfnOutput(
self,
"SchedulerTableName",
value=scheduler.scheduler_table.table_name,
export_name=f"{Aws.STACK_NAME}-SchedulerTableName",
)
CfnOutput(
self,
"SchedulerStepFunctionArn",
value=scheduler.state_machine_arn,
export_name=f"{Aws.STACK_NAME}-SchedulerStepFunctionArn",
)
CfnOutput(
self,
"Dashboard",
value=self.dashboard.name,
export_name=f"{Aws.STACK_NAME}-Dashboard",
)
CfnOutput(
self,
"SNSTopicArn",
value=notifications.topic.topic_arn,
export_name=f"{Aws.STACK_NAME}-SNSTopicArn",
)
CfnOutput(
self,
"EventBusArn",
value=event_bus.event_bus_arn,
export_name=f"{Aws.STACK_NAME}-EventBusArn",
)
CfnOutput(
self,
"CreateConfigFunctionArn",
value=create_config.function_arn,
export_name=f"{Aws.STACK_NAME}-CreateConfigFunctionArn",
)