constructor()

in lib/blog-glue-workflow-stack.ts [42:426]


  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    //###  Add assets to S3 bucket as individual files  #####

    //python scripts run in Glue Workflow
    const f_pyAssetETL = new Asset(this, "py-asset-etl", {
      path: path.join(__dirname, "assets/glue-cdk-asset-etl.py"),
    });
    const f_pyParquet = new Asset(this, "py-load", {
      path: path.join(__dirname, "assets/glue-parquet-etl.py"),
    });
    const f_pyRedshiftLoad = new Asset(this, "redshift-load", {
      path: path.join(__dirname, "assets/redshift-load-etl.py"),
    });

    //Get dynamic CDK asset bucket name to pass into Glue Jobs
    const assetBucketName = f_pyAssetETL.s3BucketName;

    //create glue database
    const glue_db = new glue.Database(this, "glue-workflow-db", {
      databaseName: "glue-workflow-db",
    });

    //create glue cralwer role to access S3 bucket
    const glue_crawler_role = new Role(this, "glue-crawler-role", {
      roleName: "AWSGlueServiceRole-AccessS3Bucket",
      description:
        "Assigns the managed policy AWSGlueServiceRole to AWS Glue Crawler so it can crawl S3 buckets",
      managedPolicies: [
        ManagedPolicy.fromManagedPolicyArn(
          this,
          "glue-service-policy",
          glue_managed_policy
        ),
      ],
      assumedBy: new ServicePrincipal(glue_ServiceUrl),
    });
    this.glueRole = glue_crawler_role;

    //add policy to role to grant access to S3 asset bucket and public buckets
    const iam_policy_forAssets = new Policy(this, "iam-policy-forAssets", {
      force: true,
      policyName: "glue-policy-workflowAssetAccess",
      roles: [glue_crawler_role],
      statements: [
        new PolicyStatement({
          effect: Effect.ALLOW,
          actions: [
            "s3:GetObject",
            "s3:PutObject",
            "s3:DeleteObject",
            "s3:ListBucket",
          ],
          resources: ["arn:aws:s3:::" + f_pyAssetETL.s3BucketName + "/*"],
        }),
        new PolicyStatement({
          effect: Effect.ALLOW,
          actions: ["s3:GetObject"],
          resources: [
            "arn:aws:s3:::" + covid_src_bucket + "/*",
            "arn:aws:s3:::" + hiring_src_bucket + "/*",
          ],
        }),
      ],
    });

    //Define paths for scripts and data
    const scripts = "s3://" + assetBucketName + "/" + scriptsPath;
    const covid = "s3://" + assetBucketName + "/" + covidPath;
    const hiring = "s3://" + assetBucketName + "/" + hiringPath;
    const redshift_temp_dir = "s3://" + assetBucketName + "/output/temp/";
    const outputPath = "s3://" + assetBucketName + parquetPath;

    //create glue crawler to crawl csv files in S3
    const glue_crawler_s3 = new glue.CfnCrawler(this, "glue-crawler-s3", {
      name: "s3-csv-crawler",
      role: glue_crawler_role.roleName,
      targets: {
        s3Targets: [
          {
            path: covid
          },
          {
            path: hiring
          },
        ],
      },
      databaseName: glue_db.databaseName,
      schemaChangePolicy: {
        updateBehavior: "UPDATE_IN_DATABASE",
        deleteBehavior: "DEPRECATE_IN_DATABASE",
      },
    });

    //create glue crawler to crawl parqet files in S3
    const glue_crawler_s3_parquet = new glue.CfnCrawler(
      this,
      "glue-crawler-s3-parquet",
      {
        name: "s3-parquet-crawler",
        role: glue_crawler_role.roleName,
        targets: {
          s3Targets: [
            {
              path: outputPath,
            },
          ],
        },
        databaseName: glue_db.databaseName,
        schemaChangePolicy: {
          updateBehavior: "UPDATE_IN_DATABASE",
          deleteBehavior: "DEPRECATE_IN_DATABASE",
        },
      }
    );

    //####  Create the glue workflow, jobs and triggers that will handle the ETL to convert CSV to Parquet and load the parquet file into Redshift #####

    //create glue workflow
    const glue_workflow = new glue.CfnWorkflow(this, "glue-workflow", {
      name: "glue-workflow",
      description:
        "ETL workflow to convert CSV to parquet and then load into Redshift",
    });

    //create jobs
    const glue_job_asset = new glue.CfnJob(this, "glue-job-asset", {
      name: "glue-workflow-assetjob",
      description: "Copy CDK assets to scripts folder and give meaningful name",
      role: glue_crawler_role.roleArn,
      executionProperty: {
        maxConcurrentRuns: 1,
      },
      command: {
        name: "glueetl",
        pythonVersion: "3",
        scriptLocation: f_pyAssetETL.s3ObjectUrl,
      },
      defaultArguments: {
        "--TempDir": "s3://" + f_pyAssetETL.s3BucketName + "/output/temp/",
        "--job-bookmark-option": "job-bookmark-disable",
        "--job-language": "python",
        "--spark-event-logs-path":
          "s3://" + f_pyAssetETL.s3BucketName + "/output/logs/",
        "--enable-metrics": "",
        "--enable-continuous-cloudwatch-log": "true",
        "--source_BucketName": assetBucketName,
        "--target_BucketName": assetBucketName,
        "--target_covidPrefix": covidPath,
        "--target_hiringPrefix": hiringPath,
        "--covid_source_bucket": covid_src_bucket,
        "--obj_covid_source_key": covid_src_key,
        "--obj_covid_target_key": covidPath + obj_covidCases,
        "--hiring_source_bucket": hiring_src_bucket,
        "--obj_hiring_source_key": hiring_src_key,
        "--obj_hiring_target_key": hiringPath + obj_covidHiring,
        "--obj_1_source_key": f_pyAssetETL.s3ObjectKey,
        "--obj_1_target_key": scriptsPath + obj_assets,
        "--obj_2_source_key": f_pyParquet.s3ObjectKey,
        "--obj_2_target_key": scriptsPath + obj_etl,
        "--obj_3_source_key": f_pyRedshiftLoad.s3ObjectKey,
        "--obj_3_target_key": scriptsPath + obj_redshiftLoad,
      },
      maxRetries: 2,
      timeout: 60,
      numberOfWorkers: 10,
      glueVersion: "3.0",
      workerType: "G.1X",
    });

    const glue_job_parquet = new glue.CfnJob(this, "glue-job-parquet", {
      name: "glue-workflow-parquetjob",
      description: "Convert the csv files in S3 to parquet",
      role: glue_crawler_role.roleArn,
      executionProperty: {
        maxConcurrentRuns: 1,
      },
      command: {
        name: "glueetl", //spark ETL job must be set to value of 'glueetl'
        pythonVersion: "3",
        scriptLocation:
          "s3://" + f_pyParquet.s3BucketName + "/" + scriptsPath + obj_etl,
      },
      defaultArguments: {
        "--TempDir": "s3://" + assetBucketName + "/output/temp/",
        "--job-bookmark-option": "job-bookmark-disable",
        "--job-language": "python",
        "--spark-event-logs-path":
          "s3://" + assetBucketName + "/output/logs/",
        "--enable-metrics": "",
        "--enable-continuous-cloudwatch-log": "true",
        "--glue_database_name": glue_db.databaseName,
        "--glue_covid_table": covidCasesTable,
        "--glue_hiring_table": covidHiringTable,
        "--output_bucket_name": assetBucketName,
        "--output_prefix_path": parquetPath
      },
      maxRetries: 2,
      timeout: 240,
      numberOfWorkers: 10,
      glueVersion: "3.0",
      workerType: "G.1X",
    });

    //load parquet data into Redshift
    const glue_job_redshift_load = new glue.CfnJob(
      this,
      "glue-job-redshift-load",
      {
        name: "glue-workflow-redshift-load",
        description: "Use Glue to load output data into Redshift",
        role: glue_crawler_role.roleArn,
        executionProperty: {
          maxConcurrentRuns: 1,
        },
        command: {
          name: "glueetl", //spark ETL job must be set to value of 'glueetl'
          pythonVersion: "3",
          scriptLocation:
            "s3://" +
            f_pyRedshiftLoad.s3BucketName +
            "/" +
            scriptsPath +
            obj_redshiftLoad,
        },
        defaultArguments: {
          "--TempDir": redshift_temp_dir,
          "--job-bookmark-option": "job-bookmark-disable",
          "--job-language": "python",
          "--spark-event-logs-path":
            "s3://" + assetBucketName + "/output/logs/",
          "--enable-metrics": "",
          "--enable-continuous-cloudwatch-log": "true",
          "--glue_database_name": glue_db.databaseName,
          "--glue_input_file1": obj_redshiftLoad,
          "--output_bucket_name": assetBucketName,
        },
        connections: {
          connections: ["redshift-connect"],
        },
        maxRetries: 2,
        timeout: 240,
        numberOfWorkers: 10,
        glueVersion: "3.0",
        workerType: "G.1X",
      }
    );

    //create triggers

    //rename assets and copy them to scripts folder
    const glue_trigger_assetJob = new glue.CfnTrigger(
      this,
      "glue-trigger-assetJob",
      {
        name: "Run-Job-" + glue_job_asset.name,
        workflowName: glue_workflow.name,
        actions: [
          {
            jobName: glue_job_asset.name,
            timeout: 120,
          },
        ],
        type: "ON_DEMAND",
      }
    );
    //add trigger dependency on workflow and job
    glue_trigger_assetJob.addDependsOn(glue_job_asset);
    glue_trigger_assetJob.addDependsOn(glue_workflow);

    //crawl csv files located in S3 scripts folder
    const glue_trigger_crawlJob = new glue.CfnTrigger(
      this,
      "glue-trigger-crawlJob",
      {
        name: "Run-Crawler-" + glue_crawler_s3.name,
        workflowName: glue_workflow.name,
        actions: [
          {
            crawlerName: glue_crawler_s3.name,
          },
        ],
        predicate: {
          conditions: [
            {
              logicalOperator: "EQUALS",
              jobName: glue_job_asset.name,
              state: "SUCCEEDED",
            },
          ],
          logical: "ANY",
        },
        type: "CONDITIONAL",
        startOnCreation: true,
      }
    );

    //etl job trigger to merge data and convert to parquet for Redshift load
    const glue_trigger_parquetJob = new glue.CfnTrigger(
      this,
      "glue-trigger-parquetJob",
      {
        name: "Run-Job-" + glue_job_parquet.name,
        workflowName: glue_workflow.name,
        actions: [
          {
            jobName: glue_job_parquet.name,
          },
        ],
        predicate: {
          conditions: [
            {
              logicalOperator: "EQUALS",
              crawlerName: glue_crawler_s3.name,
              crawlState: "SUCCEEDED",
            },
          ],
          logical: "ANY",
        },
        type: "CONDITIONAL",
        startOnCreation: true,
      }
    );

    //crawl parquet files located in S3 output-data folder
    const glue_trigger_crawlJob_parquet = new glue.CfnTrigger(
      this,
      "glue-trigger-crawlJob-parquet",
      {
        name: "Run-Crawler-" + glue_crawler_s3_parquet.name,
        workflowName: glue_workflow.name,
        actions: [
          {
            crawlerName: glue_crawler_s3_parquet.name,
          },
        ],
        predicate: {
          conditions: [
            {
              logicalOperator: "EQUALS",
              jobName: glue_job_parquet.name,
              state: "SUCCEEDED",
            },
          ],
          logical: "ANY",
        },
        type: "CONDITIONAL",
        startOnCreation: true,
      }
    );

    //create Glue job trigger to load output data into Redshift
    const glue_trigger_redshiftJob = new glue.CfnTrigger(
      this,
      "glue-trigger-redshiftJob",
      {
        name: "Run-Job-" + glue_job_redshift_load.name,
        workflowName: glue_workflow.name,
        actions: [
          {
            jobName: glue_job_redshift_load.name,
          },
        ],
        predicate: {
          conditions: [
            {
              logicalOperator: "EQUALS",
              crawlerName: glue_crawler_s3_parquet.name,
              crawlState: "SUCCEEDED",
            },
          ],
          logical: "ANY",
        },
        type: "CONDITIONAL",
        startOnCreation: true,
      }
    );

    //add trigger dependency on workflow, job and crawler
    glue_trigger_crawlJob.addDependsOn(glue_job_asset);
    glue_trigger_parquetJob.addDependsOn(glue_trigger_crawlJob);
    glue_trigger_crawlJob_parquet.addDependsOn(glue_trigger_parquetJob);
    glue_trigger_redshiftJob.addDependsOn(glue_trigger_crawlJob_parquet);
  }