in source/lib/mlops_orchestrator_stack.py [0:0]
def __init__(self, scope: core.Construct, id: str, *, multi_account=False, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Get stack parameters:
notification_email = pf.create_notification_email_parameter(self)
git_address = pf.create_git_address_parameter(self)
# Get the optional S3 assets bucket to use
existing_bucket = pf.create_existing_bucket_parameter(self)
# Get the optional S3 assets bucket to use
existing_ecr_repo = pf.create_existing_ecr_repo_parameter(self)
# Will SageMaker's Model Registry be used to provision models
use_model_registry = pf.create_use_model_registry_parameter(self)
# Does the user want the solution to create model registry
create_model_registry = pf.create_model_registry_parameter(self)
# Enable detailed error message in the API response
allow_detailed_error_message = pf.create_detailed_error_message_parameter(self)
# Conditions
git_address_provided = cf.create_git_address_provided_condition(self, git_address)
# client provided an existing S3 bucket name, to be used for assets
existing_bucket_provided = cf.create_existing_bucket_provided_condition(self, existing_bucket)
# client provided an existing Amazon ECR name
existing_ecr_provided = cf.create_existing_ecr_provided_condition(self, existing_ecr_repo)
# client wants the solution to create model registry
model_registry_condition = cf.create_model_registry_condition(self, create_model_registry)
# S3 bucket needs to be created for assets
create_new_bucket = cf.create_new_bucket_condition(self, existing_bucket)
# Amazon ECR repo needs too be created for custom Algorithms
create_new_ecr_repo = cf.create_new_ecr_repo_condition(self, existing_ecr_repo)
# Constants
pipeline_stack_name = "mlops-pipeline"
# CDK Resources setup
access_logs_bucket = s3.Bucket(
self,
"accessLogs",
encryption=s3.BucketEncryption.S3_MANAGED,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)
# Apply secure transfer bucket policy
apply_secure_bucket_policy(access_logs_bucket)
# This is a logging bucket.
access_logs_bucket.node.default_child.cfn_options.metadata = suppress_s3_access_policy()
# Import user provide S3 bucket, if any. s3.Bucket.from_bucket_arn is used instead of
# s3.Bucket.from_bucket_name to allow cross account bucket.
client_existing_bucket = s3.Bucket.from_bucket_arn(
self,
"ClientExistingBucket",
f"arn:aws:s3:::{existing_bucket.value_as_string.strip()}",
)
# Create the resource if existing_bucket_provided condition is True
core.Aspects.of(client_existing_bucket).add(ConditionalResources(existing_bucket_provided))
# Import user provided Amazon ECR repository
client_erc_repo = ecr.Repository.from_repository_name(
self, "ClientExistingECRReo", existing_ecr_repo.value_as_string
)
# Create the resource if existing_ecr_provided condition is True
core.Aspects.of(client_erc_repo).add(ConditionalResources(existing_ecr_provided))
# Creating assets bucket so that users can upload ML Models to it.
assets_bucket = s3.Bucket(
self,
"pipeline-assets-" + str(uuid.uuid4()),
versioned=True,
encryption=s3.BucketEncryption.S3_MANAGED,
server_access_logs_bucket=access_logs_bucket,
server_access_logs_prefix="assets_bucket_access_logs",
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)
# Apply secure transport bucket policy
apply_secure_bucket_policy(assets_bucket)
# Create the resource if create_new_bucket condition is True
core.Aspects.of(assets_bucket).add(ConditionalResources(create_new_bucket))
# Get assets S3 bucket's name/arn, based on the condition
assets_s3_bucket_name = core.Fn.condition_if(
existing_bucket_provided.logical_id,
client_existing_bucket.bucket_name,
assets_bucket.bucket_name,
).to_string()
# Creating Amazon ECR repository
ecr_repo = ecr.Repository(self, "ECRRepo", image_scan_on_push=True)
# Create the resource if create_new_ecr condition is True
core.Aspects.of(ecr_repo).add(ConditionalResources(create_new_ecr_repo))
# Get ECR repo's name based on the condition
ecr_repo_name = core.Fn.condition_if(
existing_ecr_provided.logical_id,
client_erc_repo.repository_name,
ecr_repo.repository_name,
).to_string()
# Get ECR repo's arn based on the condition
ecr_repo_arn = core.Fn.condition_if(
existing_ecr_provided.logical_id,
client_erc_repo.repository_arn,
ecr_repo.repository_arn,
).to_string()
blueprints_bucket_name = "blueprint-repository-" + str(uuid.uuid4())
blueprint_repository_bucket = s3.Bucket(
self,
blueprints_bucket_name,
encryption=s3.BucketEncryption.S3_MANAGED,
server_access_logs_bucket=access_logs_bucket,
server_access_logs_prefix=blueprints_bucket_name,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
)
# Apply secure transport bucket policy
apply_secure_bucket_policy(blueprint_repository_bucket)
# solution helper function
helper_function = create_solution_helper(self)
# custom resource to generate UUID
create_id_function = create_uuid_custom_resource(
self, create_model_registry.value_as_string, helper_function.function_arn
)
# creating SageMaker Model registry
# use the first 8 characters as a unique_id to be appended to the model_package_group_name
unique_id = core.Fn.select(0, core.Fn.split("-", create_id_function.get_att_string("UUID")))
model_package_group_name = f"mlops-model-registry-{unique_id}"
model_registry = create_sagemaker_model_registry(self, "SageMakerModelRegistry", model_package_group_name)
# only create based on the condition
model_registry.cfn_options.condition = model_registry_condition
# add dependency on the create_id_function custom resource
model_registry.node.add_dependency(create_id_function)
# Custom resource to copy source bucket content to blueprints bucket
custom_resource_lambda_fn = create_copy_assets_lambda(self, blueprint_repository_bucket.bucket_name)
# grant permission to upload file to the blueprints bucket
blueprint_repository_bucket.grant_write(custom_resource_lambda_fn)
custom_resource = core.CustomResource(
self,
"CustomResourceCopyAssets",
service_token=custom_resource_lambda_fn.function_arn,
)
custom_resource.node.add_dependency(blueprint_repository_bucket)
# IAM policies setup ###
cloudformation_role = iam.Role(
self,
"mlopscloudformationrole",
assumed_by=iam.ServicePrincipal("cloudformation.amazonaws.com"),
)
# Cloudformation policy setup
orchestrator_policy = create_orchestrator_policy(
self, pipeline_stack_name, ecr_repo_name, blueprint_repository_bucket, assets_s3_bucket_name
)
orchestrator_policy.attach_to_role(cloudformation_role)
# Lambda function IAM setup
lambda_passrole_policy = iam.PolicyStatement(actions=["iam:passrole"], resources=[cloudformation_role.role_arn])
# create sagemaker layer
sm_layer = sagemaker_layer(self, blueprint_repository_bucket)
# make sure the sagemaker code is uploaded first to the blueprints bucket
sm_layer.node.add_dependency(custom_resource)
# API Gateway and lambda setup to enable provisioning pipelines through API calls
provisioner_apigw_lambda = aws_apigateway_lambda.ApiGatewayToLambda(
self,
"PipelineOrchestration",
lambda_function_props={
"runtime": lambda_.Runtime.PYTHON_3_8,
"handler": "index.handler",
"code": lambda_.Code.from_asset("lambdas/pipeline_orchestration"),
"layers": [sm_layer],
"timeout": core.Duration.minutes(10),
},
api_gateway_props={
"defaultMethodOptions": {
"authorizationType": apigw.AuthorizationType.IAM,
},
"restApiName": f"{core.Aws.STACK_NAME}-orchestrator",
"proxy": False,
"dataTraceEnabled": True,
},
)
# add lambda suppressions
provisioner_apigw_lambda.lambda_function.node.default_child.cfn_options.metadata = suppress_lambda_policies()
provision_resource = provisioner_apigw_lambda.api_gateway.root.add_resource("provisionpipeline")
provision_resource.add_method("POST")
status_resource = provisioner_apigw_lambda.api_gateway.root.add_resource("pipelinestatus")
status_resource.add_method("POST")
blueprint_repository_bucket.grant_read(provisioner_apigw_lambda.lambda_function)
provisioner_apigw_lambda.lambda_function.add_to_role_policy(lambda_passrole_policy)
orchestrator_policy.attach_to_role(provisioner_apigw_lambda.lambda_function.role)
# Environment variables setup
provisioner_apigw_lambda.lambda_function.add_environment(
key="BLUEPRINT_BUCKET_URL",
value=str(blueprint_repository_bucket.bucket_regional_domain_name),
)
provisioner_apigw_lambda.lambda_function.add_environment(
key="BLUEPRINT_BUCKET", value=str(blueprint_repository_bucket.bucket_name)
)
provisioner_apigw_lambda.lambda_function.add_environment(
key="ACCESS_BUCKET", value=str(access_logs_bucket.bucket_name)
)
provisioner_apigw_lambda.lambda_function.add_environment(key="ASSETS_BUCKET", value=str(assets_s3_bucket_name))
provisioner_apigw_lambda.lambda_function.add_environment(
key="CFN_ROLE_ARN", value=str(cloudformation_role.role_arn)
)
provisioner_apigw_lambda.lambda_function.add_environment(key="PIPELINE_STACK_NAME", value=pipeline_stack_name)
provisioner_apigw_lambda.lambda_function.add_environment(
key="NOTIFICATION_EMAIL", value=notification_email.value_as_string
)
provisioner_apigw_lambda.lambda_function.add_environment(key="REGION", value=core.Aws.REGION)
provisioner_apigw_lambda.lambda_function.add_environment(key="IS_MULTI_ACCOUNT", value=str(multi_account))
provisioner_apigw_lambda.lambda_function.add_environment(
key="USE_MODEL_REGISTRY", value=use_model_registry.value_as_string
)
provisioner_apigw_lambda.lambda_function.add_environment(
key="ALLOW_DETAILED_ERROR_MESSAGE", value=allow_detailed_error_message.value_as_string
)
provisioner_apigw_lambda.lambda_function.add_environment(key="ECR_REPO_NAME", value=ecr_repo_name)
provisioner_apigw_lambda.lambda_function.add_environment(key="ECR_REPO_ARN", value=ecr_repo_arn)
provisioner_apigw_lambda.lambda_function.add_environment(key="LOG_LEVEL", value="DEBUG")
cfn_policy_for_lambda = orchestrator_policy.node.default_child
cfn_policy_for_lambda.cfn_options.metadata = {
"cfn_nag": {
"rules_to_suppress": [
{
"id": "W76",
"reason": "A complex IAM policy is required for this resource.",
}
]
}
}
# Codepipeline with Git source definitions ###
source_output = codepipeline.Artifact()
# processing git_address to retrieve repo name
repo_name_split = core.Fn.split("/", git_address.value_as_string)
repo_name = core.Fn.select(5, repo_name_split)
# getting codecommit repo cdk object using 'from_repository_name'
repo = codecommit.Repository.from_repository_name(self, "AWSMLOpsFrameworkRepository", repo_name)
codebuild_project = codebuild.PipelineProject(
self,
"Take config file",
build_spec=codebuild.BuildSpec.from_object(
{
"version": "0.2",
"phases": {
"build": {
"commands": [
"ls -a",
"aws lambda invoke --function-name "
+ provisioner_apigw_lambda.lambda_function.function_name
+ " --payload fileb://mlops-config.json response.json"
+ " --invocation-type RequestResponse",
]
}
},
}
),
)
# Defining a Codepipeline project with CodeCommit as source
codecommit_pipeline = codepipeline.Pipeline(
self,
"MLOpsCodeCommitPipeline",
stages=[
codepipeline.StageProps(
stage_name="Source",
actions=[
codepipeline_actions.CodeCommitSourceAction(
action_name="CodeCommit",
repository=repo,
branch="main",
output=source_output,
)
],
),
codepipeline.StageProps(
stage_name="TakeConfig",
actions=[
codepipeline_actions.CodeBuildAction(
action_name="provision_pipeline",
input=source_output,
outputs=[],
project=codebuild_project,
)
],
),
],
cross_account_keys=False,
)
codecommit_pipeline.add_to_role_policy(
create_invoke_lambda_policy([provisioner_apigw_lambda.lambda_function.function_arn])
)
codebuild_project.add_to_role_policy(
create_invoke_lambda_policy([provisioner_apigw_lambda.lambda_function.function_arn])
)
pipeline_child_nodes = codecommit_pipeline.node.find_all()
pipeline_child_nodes[1].node.default_child.cfn_options.metadata = {
"cfn_nag": {
"rules_to_suppress": [
{
"id": "W35",
"reason": "This is a managed bucket generated by CDK for codepipeline.",
},
{
"id": "W51",
"reason": "This is a managed bucket generated by CDK for codepipeline.",
},
]
}
}
# custom resource for operational metrics###
metrics_mapping = core.CfnMapping(self, "AnonymousData", mapping={"SendAnonymousData": {"Data": "Yes"}})
metrics_condition = core.CfnCondition(
self,
"AnonymousDatatoAWS",
expression=core.Fn.condition_equals(metrics_mapping.find_in_map("SendAnonymousData", "Data"), "Yes"),
)
# If user chooses Git as pipeline provision type, create codepipeline with Git repo as source
core.Aspects.of(repo).add(ConditionalResources(git_address_provided))
core.Aspects.of(codecommit_pipeline).add(ConditionalResources(git_address_provided))
core.Aspects.of(codebuild_project).add(ConditionalResources(git_address_provided))
# Create Template Interface
paramaters_list = [
notification_email.logical_id,
git_address.logical_id,
existing_bucket.logical_id,
existing_ecr_repo.logical_id,
use_model_registry.logical_id,
create_model_registry.logical_id,
allow_detailed_error_message.logical_id,
]
paramaters_labels = {
f"{notification_email.logical_id}": {"default": "Notification Email (Required)"},
f"{git_address.logical_id}": {"default": "CodeCommit Repo URL Address (Optional)"},
f"{existing_bucket.logical_id}": {"default": "Name of an Existing S3 Bucket (Optional)"},
f"{existing_ecr_repo.logical_id}": {"default": "Name of an Existing Amazon ECR repository (Optional)"},
f"{use_model_registry.logical_id}": {"default": "Do you want to use SageMaker Model Registry?"},
f"{create_model_registry.logical_id}": {
"default": "Do you want the solution to create a SageMaker's model package group?"
},
f"{allow_detailed_error_message.logical_id}": {
"default": "Do you want to allow detailed error messages in the APIs response?"
},
}
# configure mutli-account parameters and permissions
is_delegated_admin = None
if multi_account:
paramaters_list, paramaters_labels, is_delegated_admin = configure_multi_account_parameters_permissions(
self,
assets_bucket,
blueprint_repository_bucket,
ecr_repo,
model_registry,
provisioner_apigw_lambda.lambda_function,
paramaters_list,
paramaters_labels,
)
# properties of send data custom resource
# if you add new metrics to the cr properties, make sure to updated the allowed keys
# to send in the "_sanitize_data" function in source/lambdas/solution_helper/lambda_function.py
send_data_cr_properties = {
"Resource": "AnonymousMetric",
"UUID": create_id_function.get_att_string("UUID"),
"bucketSelected": core.Fn.condition_if(
existing_bucket_provided.logical_id,
"True",
"False",
).to_string(),
"gitSelected": core.Fn.condition_if(
git_address_provided.logical_id,
"True",
"False",
).to_string(),
"Region": core.Aws.REGION,
"IsMultiAccount": str(multi_account),
"IsDelegatedAccount": is_delegated_admin if multi_account else core.Aws.NO_VALUE,
"UseModelRegistry": use_model_registry.value_as_string,
"SolutionId": get_cdk_context_value(self, "SolutionId"),
"Version": get_cdk_context_value(self, "Version"),
}
# create send data custom resource
send_data_function = create_send_data_custom_resource(
self, helper_function.function_arn, send_data_cr_properties
)
# create send_data_function based on metrics_condition condition
core.Aspects.of(send_data_function).add(ConditionalResources(metrics_condition))
# create template metadata
self.template_options.metadata = {
"AWS::CloudFormation::Interface": {
"ParameterGroups": [
{
"Label": {"default": "MLOps Framework Settings"},
"Parameters": paramaters_list,
}
],
"ParameterLabels": paramaters_labels,
}
}
# Outputs #
core.CfnOutput(
self,
id="BlueprintsBucket",
value=f"https://s3.console.aws.amazon.com/s3/buckets/{blueprint_repository_bucket.bucket_name}",
description="S3 Bucket to upload MLOps Framework Blueprints",
)
core.CfnOutput(
self,
id="AssetsBucket",
value=f"https://s3.console.aws.amazon.com/s3/buckets/{assets_s3_bucket_name}",
description="S3 Bucket to upload model artifact",
)
core.CfnOutput(
self,
id="ECRRepoName",
value=ecr_repo_name,
description="Amazon ECR repository's name",
)
core.CfnOutput(
self,
id="ECRRepoArn",
value=ecr_repo_arn,
description="Amazon ECR repository's arn",
)
core.CfnOutput(
self,
id="ModelRegistryArn",
value=core.Fn.condition_if(
model_registry_condition.logical_id,
model_registry.attr_model_package_group_arn,
"[No Model Package Group was created]",
).to_string(),
description="SageMaker model package group arn",
)