constructor()

in next_steps/kinesis_stream_connector/l4m_connector/stack/l4m_connector_stack.ts [26:216]


  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);
    
    /* 
      Source data schema: The default schema provided below matches the schema generated by 
      sample data generator provided with this repo. Please update "fieldSchema" to match your
      source kinesis stream records
    */
    const fieldSchema: CfnTable.ColumnProperty[] = [
      {
          name: 'platform',
          type: 'string',
          comment: 'platform source',
      },
      {
          name: 'marketplace',
          type: 'string',
          comment: 'country code',
      },
      {
          name: 'event_time',
          type: 'timestamp',
          comment: 'event timestamp',
      },
      {
          name: 'views',
          type: 'bigint',
          comment: 'total views for the time interval',
      },
      {
        name: 'revenue',
        type: 'decimal',
        comment: 'total revenue for the time interval ',
      },
    ];

    // Get the stream name from cdk.json - ensure the stream exists 
    //  - either from the one you deployed using data generator cdk in this repo or 
    //    your own kinesis stream source
    const streamName = this.node.tryGetContext('streamName');
    const streamArn = this.formatArn({
      resourceName: streamName,
      service: 'kinesis',
      resource: 'stream'
    });
    
    // Glue and S3 parameters passed from cdk.json
    const glueDBName = this.node.tryGetContext('glueDBName');
    const glueTableName = this.node.tryGetContext('glueTableName');
    const sourceFormat = this.node.tryGetContext('sourceFormat');
    const bucketName = this.node.tryGetContext('bucketName');
    const bucketPrefix = this.node.tryGetContext('bucketPrefix');
    const l4mInterval = this.node.tryGetContext('l4mInterval')    

    // Obtain a Stream object from Stream name/ARN
    const sourceStream = Stream.fromStreamArn(this, "sourceStream", streamArn); 
    
    // Create S3 bucket to be used as destination (sink) for AWS Glue Spark Streaming ETL job
    const l4mBucket = new Bucket(this, bucketName, {
      bucketName: bucketName,
      publicReadAccess: false,
      blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
      encryption: BucketEncryption.KMS_MANAGED,
      enforceSSL: true
    });
    
    /* 
    Location of the AWS Glue Spark Streaming ETL script 
    - the main script that ingests data from Kinesis Data Stream and writes them to S3 
      organized by time interval, ready for use by Amazon Lookout for Metrics service
    */
    const glueSparkScript = new Asset(this, 'gluespartkscript', {
      path: "./src/glue_spark_etl_to_s3.py"
    });
    
    // Create Glue Database
    const glueDB = new CfnDatabase(this, glueDBName, {
      catalogId: cdk.Aws.ACCOUNT_ID,
      databaseInput: {
        name: glueDBName,
        description: 'Glue DB'
      }
    });

    // Setup the schema (to be used by Glue Table)
    const paths: string = fieldSchema.map((item) => {
      return item.name;
    }).join(',');

    /* 
    Setup the Glue Table properties 
     Glue Table is connected to the source Kinesis Data Stream 
    */
    const tableProps = {
      catalogId: glueDB.catalogId,
      databaseName: glueDBName,
      tableInput: {
        name: glueTableName,
        storageDescriptor: {
          columns: fieldSchema,
          location: streamName,
          inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
          outputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
          compressed: false,
          numberOfBuckets: -1,
          serdeInfo: {
            serializationLibrary: "org.openx.data.jsonserde.JsonSerDe",
            parameters: {
              paths: paths
            }
          },
          parameters: {
            endpointUrl: `https://kinesis.${cdk.Aws.REGION}.amazonaws.com`,
            streamName: streamName,
            typeOfData: 'kinesis'
          }
        },
        tableType: 'EXTERNAL_TABLE',
        parameters: {
          classification: sourceFormat
        }
      }
    };

    const glueTable = new CfnTable(this, glueTableName, tableProps);
    glueTable.addDependsOn(glueDB);

    // Setup IAM policies for AWS Glue Job 
    const gluePolicyStmts = new PolicyStatement();
    gluePolicyStmts.addActions("s3:*");
    gluePolicyStmts.addResources(l4mBucket.bucketArn);

    gluePolicyStmts.addActions("s3:*");
    gluePolicyStmts.addResources(`${l4mBucket.bucketArn}/*`);    

    gluePolicyStmts.addActions("kinesis:DescribeStream");
    gluePolicyStmts.addResources(sourceStream.streamArn);

    const gluePolicy = new Policy(this, "gluePolicy"); 
    gluePolicy.addStatements(gluePolicyStmts);

    const glueRole = new Role(this, "glueRole", {
      assumedBy: new ServicePrincipal("glue.amazonaws.com"),
      managedPolicies: [
        ManagedPolicy.fromAwsManagedPolicyName("AmazonS3ReadOnlyAccess"),
        ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
        ManagedPolicy.fromAwsManagedPolicyName("AWSGlueSchemaRegistryFullAccess")
      ]
    });
    gluePolicy.attachToRole(glueRole);
    sourceStream.grantRead(glueRole);

    // Setup AWS Glue Job to execute Spark streaming ETL script
    const glueSparkJob = new CfnJob(this, 'glueSparkETLtoS3', {
      name: "glueSparkETLtoS3",
      description: "Glue Spark streaming ETL job to ingest kinesis data stream to S3",
      role: glueRole.roleArn,
      glueVersion: "2.0",
      command:{
        name: "gluestreaming",
        scriptLocation: glueSparkScript.s3ObjectUrl,
        pythonVersion: "3"
      },
      defaultArguments: {
        "--srcDBName": glueDBName,
        "--srcTableName": glueTableName,
        "--srcFormat": sourceFormat,
        "--l4mBucket": bucketName,
        "--l4mBucketPrefix": bucketPrefix,
        "--l4mInterval": l4mInterval,
        "--job-bookmark-option": "job-bookmark-disable"
      },
      allocatedCapacity: 1,
      maxRetries: 1,
      executionProperty: {maxConcurrentRuns: 1}
    });
    glueSparkJob.addDependsOn(glueTable);

    new CfnOutput(this, "Glue ETL Job", {
      description: "Glue ETL Job",
      exportName: "glue-etl-job",
      value: String(glueSparkJob.name)
    });

    new CfnOutput(this, "Output S3 Bucket", {
      description: "Output S3 Bucket i.e. input to lookout for metrics",
      exportName: "s3bucket-live-data",      
      value: l4mBucket.bucketArn
    });

  }