in cdk/cdk/stack.py [0:0]
def __init__(self, scope: cdk.Construct, construct_id: str, **kwargs) -> None:
super().__init__(
scope,
construct_id,
**{"description": "Amazon Forecast Accelerator (uksb-1s7c5ojr9)"},
)
email_address = core.CfnParameter(self, "emailAddress",
description="(Required) An e-mail address with which to receive "
"deployment notifications.")
instance_type = core.CfnParameter(self, "instanceType",
default="ml.t2.medium",
description="(Required) SageMaker Notebook instance type on which to host "
"the AFA dashboard (e.g. ml.t2.medium, ml.t3.xlarge, ml.t3.2xlarge, ml.m4.4xlarge)")
self.afa_branch = kwargs.get("afa_branch", "main")
self.lambdamap_branch = kwargs.get("lambdamap_branch", "main")
self.lambdamap_function_name = kwargs.get("lambdamap_function_name", "AfaLambdaMapFunction")
#
# S3 Bucket
#
bucket = s3.Bucket(self, "Bucket", auto_delete_objects=False,
removal_policy=core.RemovalPolicy.DESTROY,
encryption=s3.BucketEncryption.S3_MANAGED,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL)
#
# SSM Parameter Store
#
ssm_s3_bucket_path_param = ssm.StringParameter(self,
"AfaSsmS3Bucket",
string_value=bucket.bucket_name,
parameter_name="AfaS3Bucket")
ssm_s3_input_path_param = ssm.StringParameter(self,
"AfaSsmS3InputPath",
string_value=f"s3://{bucket.bucket_name}/input/",
parameter_name="AfaS3InputPath")
ssm_s3_output_path_param = ssm.StringParameter(self,
"AfaSsmS3OutputPath",
string_value=f"s3://{bucket.bucket_name}/afc-exports/",
parameter_name="AfaS3OutputPath")
#
# SNS topic for email notification
#
topic = \
sns.Topic(self, f"NotificationTopic",
topic_name=f"{construct_id}-NotificationTopic")
topic.add_subscription(
subscriptions.EmailSubscription(email_address.value_as_string))
self.topic = topic
sns_lambda_role = iam.Role(
self,
f"SnsEmailLambdaRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSNSFullAccess")
])
self.sns_lambda_role = sns_lambda_role
sns_lambda = lambda_.Function(self,
f"SnsEmailLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
environment={"TOPIC_ARN": f"arn:aws:sns:{self.region}:{self.account}:{topic.topic_name}"},
code=self.make_dashboard_ready_email_inline_code(),
handler="index.lambda_handler",
role=sns_lambda_role)
#
# Notebook lifecycle configuration
#
notebook_instance_name = f"{construct_id}-NotebookInstance"
lcc = self.make_nb_lcc(construct_id, notebook_instance_name,
sns_lambda.function_name)
#
# Notebook role
#
sm_role = iam.Role(
self,
f"NotebookRole",
assumed_by=iam.ServicePrincipal("sagemaker.amazonaws.com"))
sm_policy = \
iam.Policy(
self,
"SmPolicy",
roles=[sm_role],
statements=[
# Lambda
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"lambda:*",
],
resources=[
f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{self.lambdamap_function_name}",
f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{core.Aws.STACK_NAME}*",
]
),
# S3
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:*"
],
resources=[
f"arn:aws:s3:::{construct_id.lower()}*",
]
),
# SageMaker
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"sagemaker:DescribeNotebookInstanceLifecycleConfig",
"sagemaker:DeleteNotebookInstance",
"sagemaker:StopNotebookInstance",
"sagemaker:DescribeNotebookInstance",
"sagemaker:CreateNotebookInstanceLifecycleConfig",
"sagemaker:DeleteNotebookInstanceLifecycleConfig",
"sagemaker:UpdateNotebookInstanceLifecycleConfig",
"sagemaker:CreateNotebookInstance",
"sagemaker:UpdateNotebookInstance"
],
resources=[
f"arn:aws:sagemaker:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:notebook-instance/{construct_id.lower()}*",
f"arn:aws:sagemaker:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:notebook-instance-lifecycle-config/notebooklifecycleconfig*",
]
),
# Step Functions
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"states:*"
],
resources=[
f"arn:aws:states:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:*:{core.Aws.STACK_NAME}*",
]
),
# SSM
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ssm:*"
],
resources=[
f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaS3Bucket",
f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaS3InputPath",
f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaS3OutputPath",
f"arn:aws:ssm:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:parameter/AfaAfcStateMachineArn",
]
),
]
)
#
# Notebook instance
#
sm.CfnNotebookInstance(
self,
f"NotebookInstance",
role_arn=sm_role.role_arn,
instance_type=instance_type.value_as_string,
notebook_instance_name=notebook_instance_name,
volume_size_in_gb=16,
lifecycle_config_name=lcc.attr_notebook_instance_lifecycle_config_name,
)
# AFC/Lambda role
afc_role = \
iam.Role(
self,
f"AfcRole",
assumed_by=iam.CompositePrincipal(
iam.ServicePrincipal("forecast.amazonaws.com"),
iam.ServicePrincipal("lambda.amazonaws.com")
),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonForecastFullAccess"),
])
afc_policy = \
iam.Policy(
self,
"AfcPolicy",
roles=[afc_role],
statements=[
# Lambda
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"lambda:*",
],
resources=[
f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{self.lambdamap_function_name}",
f"arn:aws:lambda:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:function:{core.Aws.STACK_NAME}*",
]
),
# S3
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:*"
],
resources=[
f"arn:aws:s3:::{construct_id.lower()}*",
]
),
# Logging
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"logs:*"
],
resources=[
f"arn:aws:logs:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:log-group:/aws/lambda/{core.Aws.STACK_NAME}*"
]
),
# SNS
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"sns:*"
],
resources=[
f"arn:aws:sns:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:{core.Aws.STACK_NAME}*"
]
),
]
)
fail_state = sfn.Fail(self, "Fail")
succeed_state = sfn.Succeed(self, "Succeed")
#
# PREPARE DATA
#
prepare_lambda = \
lambda_.Function(
self,
"PrepareLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
handler="index.prepare_handler",
code=lambda_.Code.from_inline(open(os.path.join(PWD, "afc_lambdas", "prepare.py")).read()),
environment={
"AFC_ROLE_ARN": afc_role.role_arn
},
role=afc_role,
timeout=core.Duration.seconds(900)
)
prepare_step = \
tasks.LambdaInvoke(
self,
"PrepareDataStep",
lambda_function=prepare_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
#
# CREATE PREDICTOR
#
create_predictor_lambda = \
lambda_.Function(
self,
"CreatedPredictorLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
handler="index.create_predictor_handler",
code=lambda_.Code.from_inline(open(os.path.join(PWD, "afc_lambdas", "create_predictor.py")).read()),
environment={
"AFC_ROLE_ARN": afc_role.role_arn
},
role=afc_role,
timeout=core.Duration.seconds(900))
create_predictor_step = \
tasks.LambdaInvoke(
self,
"CreatePredictorStep",
lambda_function=create_predictor_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
create_predictor_step.add_retry(
backoff_rate=1.05,
interval=core.Duration.seconds(60),
max_attempts=1000,
errors=["ResourceNotFoundException",
"ResourceInUseException",
"ResourcePendingException"])
#
# CREATE FORECAST
#
create_forecast_lambda = \
lambda_.Function(
self,
"CreatedForecastLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
handler="index.create_forecast_handler",
code=lambda_.Code.from_inline(
open(os.path.join(PWD, "afc_lambdas", "create_forecast.py")).read()),
role=afc_role,
timeout=core.Duration.seconds(900))
create_forecast_step = \
tasks.LambdaInvoke(
self,
"CreateforecastStep",
lambda_function=create_forecast_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
create_forecast_step.add_retry(
backoff_rate=1.1,
interval=core.Duration.seconds(60),
max_attempts=2000,
errors=["ResourceNotFoundException",
"ResourceInUseException",
"ResourcePendingException"])
#
# CREATE FORECAST EXPORT
#
create_forecast_export_lambda = \
lambda_.Function(
self,
"CreateExportLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
handler="index.create_forecast_export_handler",
code=lambda_.Code.from_inline(
open(os.path.join(PWD, "afc_lambdas", "create_export.py")).read()),
environment={
"AFC_ROLE_ARN": afc_role.role_arn
},
role=afc_role,
timeout=core.Duration.seconds(900))
create_forecast_export_step = \
tasks.LambdaInvoke(
self,
"CreateExportStep",
lambda_function=create_forecast_export_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
create_forecast_export_step.add_retry(
backoff_rate=1.1,
interval=core.Duration.seconds(60),
max_attempts=2000,
errors=["ResourceInUseException",
"ResourcePendingException"])
#
# BACKTEST EXPORT FILE(s)
#
create_predictor_backtest_export_lambda = \
lambda_.Function(
self,
"CreatePredictorBacktestExportLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
handler="index.create_predictor_backtest_export_handler",
code=lambda_.Code.from_inline(
open(os.path.join(PWD, "afc_lambdas",
"create_predictor_backtest_export.py")).read()),
environment={
"AFC_ROLE_ARN": afc_role.role_arn
},
role=afc_role,
timeout=core.Duration.seconds(900))
create_predictor_backtest_export_step = \
tasks.LambdaInvoke(
self,
"CreatePredictorBacktestExportStep",
lambda_function=create_predictor_backtest_export_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
create_predictor_backtest_export_step.add_retry(
backoff_rate=1.1,
interval=core.Duration.seconds(60),
max_attempts=2000,
errors=["ResourceInUseException",
"ResourcePendingException"])
#
# POSTPROCESS FORECAST EXPORT FILE(s)
#
postprocess_lambda = \
lambda_.Function(self,
f"PostProcessLambda",
code=lambda_.EcrImageCode \
.from_asset_image(
directory=os.path.join(PWD, "afc_lambdas", "postprocess")),
runtime=lambda_.Runtime.FROM_IMAGE,
handler=lambda_.Handler.FROM_IMAGE,
memory_size=10240,
role=afc_role,
timeout=core.Duration.seconds(900))
postprocess_step = \
tasks.LambdaInvoke(
self,
"PostProcessStep",
lambda_function=postprocess_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
postprocess_step.add_retry(
backoff_rate=1.1,
interval=core.Duration.seconds(30),
max_attempts=2000,
errors=["NoFilesFound",
"ResourceInUseException",
"ResourcePendingException"])
# DELETE AFC RESOURCES
delete_afc_resources_lambda = \
lambda_.Function(
self,
"DeleteAfcResourcesLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
handler="index.delete_afc_resources_handler",
code=lambda_.Code.from_inline(
open(os.path.join(PWD, "afc_lambdas", "delete_resources.py")).read()),
role=afc_role,
timeout=core.Duration.seconds(900))
delete_afc_resources_step = \
tasks.LambdaInvoke(
self,
"DeleteAfcResourcesStep",
lambda_function=delete_afc_resources_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
delete_afc_resources_step.add_retry(
backoff_rate=1.1,
interval=core.Duration.seconds(60),
max_attempts=2000,
errors=["ResourceNotFoundException",
"ResourceInUseException",
"ResourcePendingException"])
#
# SNS EMAIL
#
sns_afc_email_lambda = \
lambda_.Function(self, f"{construct_id}-SnsAfcEmailLambda",
runtime=lambda_.Runtime.PYTHON_3_8,
environment={"TOPIC_ARN": topic.topic_arn},
code=self.make_afc_email_inline_code(),
handler="index.lambda_handler",
role=afc_role)
sns_afc_email_step = \
tasks.LambdaInvoke(
self,
"SnsAfcEmailStep",
lambda_function=sns_afc_email_lambda,
payload=sfn.TaskInput.from_object({
"input": sfn.JsonPath.string_at("$")
})
)
#
# State machine
#
definition = prepare_step.next(create_predictor_step) \
.next(create_forecast_step) \
.next(create_predictor_backtest_export_step) \
.next(create_forecast_export_step) \
.next(postprocess_step) \
.next(delete_afc_resources_step) \
.next(sns_afc_email_step)
state_machine = sfn.StateMachine(self,
"AfaSsmAfcStateMachine",
state_machine_name=f"{construct_id}-AfcStateMachine",
definition=definition,
timeout=core.Duration.hours(24))
ssm_state_machine_param = ssm.StringParameter(self,
"AfaSsmAfcStateMachineArn",
string_value=state_machine.state_machine_arn,
parameter_name="AfaAfcStateMachineArn")