constructor()

in cdk-app/lib/blog-resources-stack.ts [14:598]


  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // The code that defines your stack goes here

    //creating bucket for the blog
    const blogBucket=new s3.Bucket(this,'abaloneBlogBucket',{
      bucketName: 'abalone-blog' + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
      removalPolicy: cdk.RemovalPolicy.DESTROY
    });

    // Lambda as a custom resource to create the folder structure within the bucket
    const bucketFolderCreator= new lambda.Function(this,"folderCreatorFunction",{
      runtime: lambda.Runtime.PYTHON_3_8,
      functionName: "abalone-bucketFolderCreator" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
      code: lambda.Code.fromAsset('../functions/bucketFolderCreator'),
      handler: 'index.handler',
      environment:{
        bucket: blogBucket.bucketName
      }
    });

    //allow bucketFolderCreator lambda permission to read and write over the bucket
    blogBucket.grantReadWrite(bucketFolderCreator);

    //Create custom resource to invoke the lambda function to create the folders
    const invokeLambdaBucketCreator= new cloudformation.CustomResource(this,"InvokeBucketFolderCreator1",{
      provider: cloudformation.CustomResourceProvider.fromLambda(bucketFolderCreator)
    });
    
    const s3AssetForSagemakerProcessing= new s3deploy.BucketDeployment(this, 'DeploySagemakerProcessingScript', {
      sources: [s3deploy.Source.asset('scripts/processing_script')],
      destinationBucket: blogBucket,
      destinationKeyPrefix: 'Scripts'
    });
    

    //step functions definition section
    //Create stepfunctions role
    const stepFunctionsRole=new iam.Role(this, "stepFunctionsRole",{
      assumedBy: new iam.CompositePrincipal(
        new iam.ServicePrincipal("states.amazonaws.com"),
        new iam.ServicePrincipal("sagemaker.amazonaws.com")
      ),
      managedPolicies:[
        iam.ManagedPolicy.fromAwsManagedPolicyName('CloudWatchLogsFullAccess'), 
        iam.ManagedPolicy.fromAwsManagedPolicyName('AWSLambdaFullAccess'),
        iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonSageMakerFullAccess'),
        iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonS3FullAccess')
      ]
    });


    /* collect information lambda
     that gets the algorithm images required for preprocessing and training */
    const startLambda= new lambda.Function(this,"lambdaStartingFunction",{
      runtime: lambda.Runtime.PYTHON_3_8,
      functionName: "abalone-stateFirstFunction" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
      code: lambda.Code.fromAsset('../functions/startLambda'),
      handler: 'index.handler',
      environment:{
        region: cdk.Aws.REGION
      }
    })

    /* first task in step function is to invoke the starting lambda
    to collect information*/
    const submitJob = new tasks.LambdaInvoke(this, 'Fetch Image URIs', {
      lambdaFunction: startLambda,
      // Lambda's result is in the attribute `Payload`
      resultPath: '$.startLambda',
    });

    /* perform dataset conversion and divide the data set into train, validation, test dataset  */
    // create a processing job
    const processingJobJson = {
      Type: 'Task',
      Resource: "arn:aws:states:::sagemaker:createProcessingJob.sync",
      Parameters: {
        "ProcessingJobName.$": '$.input.processing_jobname',
        "ProcessingInputs":[
          {
              "InputName": "code",
              "S3Input": {
                  "S3Uri": "s3://" + blogBucket.bucketName + "/Scripts/preprocessing.py",
                  "LocalPath": "/opt/ml/processing/input/code",
                  "S3DataType": "S3Prefix",
                  "S3InputMode": "File",
                  "S3DataDistributionType": "FullyReplicated",
                  "S3CompressionType": "None"
              }
          }
        ],
        "ProcessingOutputConfig": {
          "Outputs": [
              {
                  "OutputName": "output-1",
                  "S3Output": {
                      "S3Uri": "s3://" + blogBucket.bucketName + "/processing_job_logs/spark_event_logs" ,
                      "LocalPath": "/opt/ml/processing/spark-events/",
                      "S3UploadMode": "Continuous"
                  }
              }
          ]
        },
        "AppSpecification":{
          "ImageUri.$": '$.startLambda.Payload.sagemaker_spark_image',
          "ContainerArguments": [
                        "--s3_input_bucket",
                        blogBucket.bucketName,
                        "--s3_input_key_prefix",
                        "Inputs",
                        "--s3_output_bucket",
                        blogBucket.bucketName
          ],
          "ContainerEntrypoint": [
              "smspark-submit",
              "--local-spark-event-logs-dir",
              "/opt/ml/processing/spark-events/",
              "/opt/ml/processing/input/code/preprocessing.py"
          ]
        },
        "RoleArn": stepFunctionsRole.roleArn,
        "ProcessingResources": {
          "ClusterConfig": {
            "InstanceCount": 1,
            "InstanceType": "ml.m5.xlarge",
            "VolumeSizeInGB": 30
          }
        },
        "StoppingCondition": {
          "MaxRuntimeInSeconds": 1200
        }
      },
      ResultPath: '$.model.xgboost'
    }

    //create a state to start the preprocessing of the dataset
    const processingJobTask= new stepFunction.CustomState(this, 'Process and divide dataset',{
      stateJson: processingJobJson
    })

    //task to train using Xgboost algorithm
    const sagemakerXgboostTrainjob = new tasks.SageMakerCreateTrainingJob(this,'XgboostTraining',{
      trainingJobName: stepFunction.JsonPath.stringAt('$.input.xgb_jobname'),
      integrationPattern: stepFunction.IntegrationPattern.RUN_JOB,
      algorithmSpecification: {
        trainingImage: tasks.DockerImage.fromRegistry(stepFunction.JsonPath.stringAt('$.startLambda.Payload.xgb_image')),
        trainingInputMode: tasks.InputMode.FILE
      },
      inputDataConfig:[
        {
          channelName: "train",
          contentType: "libsvm",
          dataSource:{
              s3DataSource: {
                s3DataType: tasks.S3DataType.S3_PREFIX,
                s3DataDistributionType: tasks.S3DataDistributionType.FULLY_REPLICATED,
                s3Location: tasks.S3Location.fromBucket(blogBucket,'Xgboost/train/')
              }
          }
        },
        {
          channelName: "validation",
          contentType: "libsvm",
          dataSource:{
              s3DataSource: {
                s3DataType: tasks.S3DataType.S3_PREFIX,
                s3DataDistributionType: tasks.S3DataDistributionType.FULLY_REPLICATED,
                s3Location: tasks.S3Location.fromBucket(blogBucket,'Xgboost/validation/')
              }
          }
        }
      ],
      outputDataConfig:{
        s3OutputLocation: tasks.S3Location.fromBucket(blogBucket, 'ml_models/xgboost/')
      },
      resourceConfig: {
        instanceCount: 1,
        instanceType: ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.XLARGE2),
        volumeSize: cdk.Size.gibibytes(30),
      },
      stoppingCondition: {
        maxRuntime: cdk.Duration.hours(1),
      },
      hyperparameters:{
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "silent":"0",
        "objective":"reg:linear",
        "num_round":"50"
      },
      resultPath: '$.training.xgboostresult'
    })

    //task to train using linear learner algorithm
    const sagemakerLinearTrainjob = new tasks.SageMakerCreateTrainingJob(this,'LinearTraining',{
      trainingJobName: stepFunction.JsonPath.stringAt('$.input.ll_jobname'),
      integrationPattern:stepFunction.IntegrationPattern.RUN_JOB,
      algorithmSpecification:{
        trainingImage: tasks.DockerImage.fromRegistry(stepFunction.JsonPath.stringAt('$.startLambda.Payload.linear_image')),
        trainingInputMode: tasks.InputMode.FILE
      },
      inputDataConfig:[
        {
          channelName: "train",
          contentType: "text/csv",
          dataSource:{
              s3DataSource: {
                s3DataType: tasks.S3DataType.S3_PREFIX,
                s3DataDistributionType: tasks.S3DataDistributionType.FULLY_REPLICATED,
                s3Location: tasks.S3Location.fromBucket(blogBucket,'Linear/train/')
              }
          }
        },
        {
          channelName: "validation",
          contentType: "text/csv",
          dataSource:{
              s3DataSource: {
                s3DataType: tasks.S3DataType.S3_PREFIX,
                s3DataDistributionType: tasks.S3DataDistributionType.FULLY_REPLICATED,
                s3Location: tasks.S3Location.fromBucket(blogBucket,'Linear/validation/')
              }
          }
        }
      ],
      outputDataConfig:{
        s3OutputLocation: tasks.S3Location.fromBucket(blogBucket, 'ml_models/linear/')
      },
      resourceConfig: {
        instanceCount: 1,
        instanceType: ec2.InstanceType.of(ec2.InstanceClass.M5,ec2.InstanceSize.XLARGE),
        volumeSize: cdk.Size.gibibytes(30),
      },
      stoppingCondition: {
        maxRuntime: cdk.Duration.hours(1),
      },
      hyperparameters:{
        "feature_dim": "8",
        "epochs": "10",
        "loss": "absolute_loss",
        "predictor_type": "regressor",
        "normalize_data": "True",
        "optimizer":"adam",
        "mini_batch_size": "100",
        "learning_rate": "0.0001"
      },
      resultPath: '$.training.llresult'
    })

    // create a model out of the xgboost training job
    const createXgboostModelJson = {
      Type: 'Task',
      Resource: "arn:aws:states:::sagemaker:createModel",
      Parameters: {
        "ExecutionRoleArn": stepFunctionsRole.roleArn,
        "ModelName.$": '$.input.xgb_model_name',
        "PrimaryContainer": {
                    "Environment": {},
                    "Image.$": '$.startLambda.Payload.xgb_image',
                    "ModelDataUrl.$": "$.training.xgboostresult.ModelArtifacts.S3ModelArtifacts"
        }
      },
      ResultPath: '$.model.xgboost'
    }
    
    //create a state to save the xgboost model created from the training job
    const saveXgboostModel= new stepFunction.CustomState(this, 'Save Xgboost Model',{
      stateJson: createXgboostModelJson
    })

    // create endpoint configuration for the xgboost model
    const createXgboostModelEndPointConfigJson = {
          Type: 'Task',
          Resource: "arn:aws:states:::sagemaker:createEndpointConfig",
          Parameters: {
            "EndpointConfigName.$": '$.input.xgb_model_name',
            "ProductionVariants": [
                {
                    "InitialInstanceCount": 1,
                    "InstanceType": "ml.m4.xlarge",
                    "ModelName.$": '$.input.xgb_model_name',
                    "VariantName": "AllTraffic"
                }
            ]
          },
          ResultPath: null
        }
    
    // create a state to save the linear model created from the training job
    const createXgboostEndPointConfig= new stepFunction.CustomState(this, 'Create Xgboost Endpoint Config',{
      stateJson: createXgboostModelEndPointConfigJson
    })


    // create a model out of the linear learner training job
    const createLinearModelJson = {
      Type: 'Task',
      Resource: "arn:aws:states:::sagemaker:createModel",
      Parameters: {
        "ExecutionRoleArn": stepFunctionsRole.roleArn,
        "ModelName.$": '$.input.ll_model_name',
        "PrimaryContainer": {
                    "Environment": {},
                    "Image.$": '$.startLambda.Payload.linear_image',
                    "ModelDataUrl.$": "$.training.llresult.ModelArtifacts.S3ModelArtifacts"
        }
      },
      ResultPath: '$.model.linearlearner'
    }
    
    // create a state to save the linear model created from the training job
    const saveLinearModel= new stepFunction.CustomState(this, 'Save Linear Model',{
      stateJson: createLinearModelJson
    })

    // create endpoint configurationfor the model
    const createLinearModelEndPointConfigJson = {
      Type: 'Task',
      Resource: "arn:aws:states:::sagemaker:createEndpointConfig",
      Parameters: {
        "EndpointConfigName.$": '$.input.ll_model_name',
        "ProductionVariants": [
            {
                "InitialInstanceCount": 1,
                "InstanceType": "ml.m4.xlarge",
                "ModelName.$": '$.input.ll_model_name',
                "VariantName": "AllTraffic"
            }
        ]
      },
      ResultPath: null
    }
    
    // create a state to save the linear model created from the training job
    const createLinearEndPointConfig= new stepFunction.CustomState(this, 'Create Linear Endpoint Config',{
      stateJson: createLinearModelEndPointConfigJson
    })

    //create xgboost endpoint based on configuration
    const createXgboostEndPointJson = {
      Type: 'Task',
      Resource: "arn:aws:states:::sagemaker:createEndpoint",
      Parameters: {
        "EndpointConfigName.$": "$.input.xgb_model_name",
        "EndpointName.$": "$.input.xgb_endpoint_name"
      },
      ResultPath: null
    }
    
    // create a state to create the xgboost endpoint
    const createXgboostEndPoint= new stepFunction.CustomState(this, 'Create Xgboost Endpoint',{
      stateJson: createXgboostEndPointJson
    })

    //create linear endpoint based on configuration
    const createLinearEndPointJson = {
    Type: 'Task',
    Resource: "arn:aws:states:::sagemaker:createEndpoint",
    Parameters: {
      "EndpointConfigName.$": "$.input.ll_model_name",
      "EndpointName.$": "$.input.ll_endpoint_name"
    },
    ResultPath: null
  }
  
  // create a state to create the endpoint
  const createLinearEndPoint= new stepFunction.CustomState(this, 'Create Linear Endpoint',{
    stateJson: createLinearEndPointJson
  })

  // create a lambda to describe the endpoint and return its status
  const describeEndpointLambda = new lambda.Function(this,"describeEndpointLambda",{
    runtime: lambda.Runtime.PYTHON_3_8,
    functionName: "abalone-describeEndpointFunction" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
    code: lambda.Code.fromAsset('../functions/describeEndpointLambda'),
    handler: 'index.handler'
  })

  //add permission to invoke endpoints
  describeEndpointLambda.addToRolePolicy(new iam.PolicyStatement({
    effect: iam.Effect.ALLOW,
    actions: [
      'sagemaker:DescribeEndpoint'
    ],
    resources:[ '*' ]
  }));

  //create a task to invoke the lambda to perform the prediction
  const describeXgboostEndpoint = new tasks.LambdaInvoke(this, 'Describe Xgboost Endpoint', {
    lambdaFunction: describeEndpointLambda,
    // Lambda's result is in the attribute `Payload`
    resultPath: '$.endpoints_status.xgboostresult',
  });

  //create a task to invoke the lambda to perform the prediction
  const describeLinearEndpoint = new tasks.LambdaInvoke(this, 'Describe Linear Endpoint', {
    lambdaFunction: describeEndpointLambda,
    // Lambda's result is in the attribute `Payload`
    resultPath: '$.endpoints_status.llresult',
  });


  // create a prediction Lambda
  const predictionAccuracy = new lambda.Function(this,"predictionFunction",{
    runtime: lambda.Runtime.PYTHON_3_8,
    functionName: "abalone-predictionFunction" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
    code: lambda.Code.fromAsset('../functions/predictionLambda'),
    handler: 'index.handler',
    environment:{
      region: cdk.Aws.REGION,
      bucket: blogBucket.bucketName
    }
  })

  //add permission to invoke endpoints
  predictionAccuracy.addToRolePolicy(new iam.PolicyStatement({
    effect: iam.Effect.ALLOW,
    actions: [
      'sagemaker:InvokeEndpoint'
    ],
    resources:[ '*' ]
  }));

  //add permission to read and write from buckets
  blogBucket.grantRead(predictionAccuracy)

  //create a task to invoke the lambda to perform the prediction
  const performXgboostPredction = new tasks.LambdaInvoke(this, 'Perform Xgboost Prediction', {
    lambdaFunction: predictionAccuracy,
    // Lambda's result is in the attribute `Payload`
    resultPath: '$.prediction.xgboostresult',
  });

  const performLinearPredction = new tasks.LambdaInvoke(this, 'Perform Linear Prediction', {
    lambdaFunction: predictionAccuracy,
    // Lambda's result is in the attribute `Payload`
    resultPath: '$.prediction.llresult',
  });

  // create a cleanup Lambda
  const cleanupLambda = new lambda.Function(this,"cleanupFunction",{
    runtime: lambda.Runtime.PYTHON_3_8,
    functionName: "abalone-cleanupFunction" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
    code: lambda.Code.fromAsset('../functions/cleanupLambda'),
    handler: 'index.handler',
  })

  //add permission to invoke endpoints
  cleanupLambda.addToRolePolicy(new iam.PolicyStatement({
    effect: iam.Effect.ALLOW,
    actions: [
      'sagemaker:*'
    ],
    resources:[ '*' ]
  }));

  // create a task in stepfunctions to invoke the clean up lambda for xgboost endpoint
  const xgboostCleanupTask = new tasks.LambdaInvoke(this, 'Delete xgboost Endpoint', {
    lambdaFunction: cleanupLambda,
    resultPath: '$.null'
  });

  // create a task in stepfunctions to invoke the clean up lambda for Linear endpoint
  const LinearCleanupTask = new tasks.LambdaInvoke(this, 'Delete Linear Endpoint', {
    lambdaFunction: cleanupLambda,
    resultPath: '$.null'
  });
        
  // Wait until it's the time mentioned in the the state object's "triggerTime"
  // field.
  const xgboostWait = new stepFunction.Wait(this, 'Wait For xgboost endpoint', {
    time: stepFunction.WaitTime.duration(cdk.Duration.minutes(1))
  });

  //checking if the endpoint is ready
  const isXgboostEndpointReady = new stepFunction.Choice(this, 'Xgboost Endpoint Ready?');
  isXgboostEndpointReady.when(stepFunction.Condition.not(
    stepFunction.Condition.stringEquals('$.endpoints_status.xgboostresult.Payload.endpoint_status', 'InService'),
  ), xgboostWait);
  isXgboostEndpointReady.otherwise(performXgboostPredction.next(xgboostCleanupTask));
  // Set the next state
  xgboostWait.next(describeXgboostEndpoint);
  
  // Wait until it's the time mentioned in the the state object's "triggerTime"
  // field.
  const linearWait = new stepFunction.Wait(this, 'Wait For Linear endpoint', {
    time: stepFunction.WaitTime.duration(cdk.Duration.minutes(1))
  });

  //checking if the endpoint is ready
  const isLinearEndpointReady = new stepFunction.Choice(this, 'Linear Endpoint Ready?');
  isLinearEndpointReady.when(stepFunction.Condition.not(
    stepFunction.Condition.stringEquals('$.endpoints_status.llresult.Payload.endpoint_status', 'InService'),
  ), linearWait);
  isLinearEndpointReady.otherwise(performLinearPredction.next(LinearCleanupTask));
  
  // Set the next state
  linearWait.next(describeLinearEndpoint);

  //xgboost succees state
  const XgboostWins = new stepFunction.Succeed(this, 'Xgboost has better accuracy!');

  //Linear sucess state
  const LinearWins = new stepFunction.Succeed(this, 'Linear has better accuracy!');

  //perform training for both alogirthms at the same time
  const parallel = new stepFunction.Parallel(this, 'Do the work in parallel');
  // xgboost branch
  parallel.branch(sagemakerXgboostTrainjob
    .next(saveXgboostModel)
    .next(createXgboostEndPointConfig)
    .next(createXgboostEndPoint)
    .next(describeXgboostEndpoint)
    .next(isXgboostEndpointReady));
  // linear learner branch
  parallel.branch(sagemakerLinearTrainjob
    .next(saveLinearModel)
    .next(createLinearEndPointConfig)
    .next(createLinearEndPoint)
    .next(describeLinearEndpoint)
    .next(isLinearEndpointReady));

  
  // create a comparison Lambda
  const comparisonLambda = new lambda.Function(this,"comparisonFunction",{
    runtime: lambda.Runtime.PYTHON_3_8,
    functionName: "abalone-comparisonFunction" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
    code: lambda.Code.fromAsset('../functions/comparisonLambda'),
    handler: 'index.handler',
  })
  
  // create a task in the step functions to invoke the comparison Lambda
  const ComparisonTask = new tasks.LambdaInvoke(this, 'Compare performace', {
    lambdaFunction: comparisonLambda,
    resultPath: '$[0].winningAlgorithm',
    outputPath: '$[0].winningAlgorithm'
  });


  //making a choice which performed better
  const choice = new stepFunction.Choice(this, 'which model had better results ?');
  choice.when(stepFunction.Condition.stringEquals(
    '$.Payload.winnerAlgorithm',
    'xgboost'
  ), XgboostWins);
  choice.otherwise(LinearWins);


  /* Creating the definition of the step function by
  chaining steps for the step function */
  const definition= submitJob
    .next(processingJobTask)
    .next(parallel)
    .next(ComparisonTask)
    .next(choice)

  // creating the stepfunctions resource
  const blogStepFunction= new stepFunction.StateMachine(this,"abaloneStepFunction",{
    definition,
    role: stepFunctionsRole,
    timeout: cdk.Duration.minutes(60)
  })
    
  /*Lambda to invoke the step function
  this lambda is invoked once the dataset is divided*/
  const stateInvokeLambda= new lambda.Function(this,"InvokeStateMachine",{
        runtime: lambda.Runtime.PYTHON_3_8,
        functionName: "abalone-invokeStateMachine" + '-' + cdk.Aws.ACCOUNT_ID + '-' + cdk.Aws.REGION,
        code: lambda.Code.fromAsset('../functions/invokeStateLambda'),
        handler: 'index.handler',
        environment:{
          stateMachineArn: blogStepFunction.stateMachineArn,
          accountId: cdk.Aws.ACCOUNT_ID
        }
      })

  // Invoke Lambda to start step function when object are uploaded
  blogBucket.addEventNotification(s3.EventType.OBJECT_CREATED_PUT,new s3Trigger.LambdaDestination(stateInvokeLambda),{ prefix: 'Inputs/'})
  blogStepFunction.grantStartExecution(stateInvokeLambda);
  }