constructor()

in cdk/lib/streaming-etl.ts [20:375]


  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const synthDate = new Date().toISOString().split('T')[0];

    this.templateOptions.description = `Creates a sample streaming ETL pipeline based on Apache Flink and Amazon Kinesis Data Analytics that reads data from a Kinesis data stream and persists it to Amazon S3 (shausma-kda-streaming-etl-${synthDate})`;

    const bucket = new s3.Bucket(this, 'Bucket', {
      versioned: true,
      encryption: BucketEncryption.S3_MANAGED,
      removalPolicy: RemovalPolicy.DESTROY,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      metrics: [{
        id: 'EntireBucket',
      }],
      lifecycleRules: [{
        abortIncompleteMultipartUploadAfter: Duration.days(7)
      }]
    });

    const emptyBucket = new EmptyBucketOnDelete(this, 'EmptyBucket', {
      bucket: bucket
    });

    new cdk.CfnOutput(this, `OutputBucket`, { value: `https://console.aws.amazon.com/s3/buckets/${bucket.bucketName}/streaming-etl-output/` });



    const artifacts = new BuildArtifacts(this, 'BuildArtifacts', {
      bucket: bucket,
      flinkConsumerVersion: 'release-0.1.1',
      kinesisReplayVersion: 'release-0.1.1'
    });


    const stream = new kds.Stream(this, 'InputStream', {
      shardCount: 16
    });


    const logGroup = new logs.LogGroup(this, 'KdaLogGroup', {
      retention: RetentionDays.ONE_WEEK,
      removalPolicy: RemovalPolicy.DESTROY
    });

    const logStream = new logs.LogStream(this, 'KdaLogStream', {
      logGroup: logGroup,
      removalPolicy: RemovalPolicy.DESTROY
    });

    const logStreamArn = `arn:${cdk.Aws.PARTITION}:logs:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:log-group:${logGroup.logGroupName}:log-stream:${logStream.logStreamName}`;

    const kdaRole = new iam.Role(this, 'KdaRole', {
      assumedBy: new iam.ServicePrincipal('kinesisanalytics.amazonaws.com'),
    });

    bucket.grantReadWrite(kdaRole);
    stream.grantRead(kdaRole);

    kdaRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'kinesis:ListShards' ],
      resources: [ stream.streamArn ]
    }))

    kdaRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'cloudwatch:PutMetricData' ],
      resources: [ '*' ]
    }));

    kdaRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'logs:DescribeLogStreams', 'logs:DescribeLogGroups' ],
      resources: [
        logGroup.logGroupArn,
        `arn:${cdk.Aws.PARTITION}:logs:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:log-group:*`
      ]
    }));

    kdaRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'logs:PutLogEvents' ],
      resources: [ logStreamArn ]
    }));


    const kdaApp = new kda.CfnApplicationV2(this, 'KdaApplication', {
      runtimeEnvironment: 'FLINK-1_11',
      serviceExecutionRole: kdaRole.roleArn,
      applicationName: `${cdk.Aws.STACK_NAME}`,
      applicationConfiguration: {
        environmentProperties: {
          propertyGroups: [
            {
              propertyGroupId: 'FlinkApplicationProperties',
              propertyMap: {
                OutputBucket: `s3://${bucket.bucketName}/streaming-etl-output/`,
                ParquetConversion: true,
                InputKinesisStream: stream.streamName
              },
            }
          ]
        },
        flinkApplicationConfiguration: {
          monitoringConfiguration: {
            logLevel: 'INFO',
            metricsLevel: 'TASK',
            configurationType: 'CUSTOM'
          },
          parallelismConfiguration: {
            autoScalingEnabled: false,
            parallelism: 2,
            parallelismPerKpu: 1,
            configurationType: 'CUSTOM'
          },
          checkpointConfiguration: {
            configurationType: "CUSTOM",
            checkpointInterval: 60_000,
            minPauseBetweenCheckpoints: 60_000,
            checkpointingEnabled: true
          }
        },
        applicationSnapshotConfiguration: {
          snapshotsEnabled: false
        },
        applicationCodeConfiguration: {
          codeContent: {
            s3ContentLocation: {
              bucketArn: bucket.bucketArn,
              fileKey: 'target/amazon-kinesis-analytics-streaming-etl-1.0-SNAPSHOT.jar'        
            }
          },
          codeContentType: 'ZIPFILE'
        }
      }
    });

    new kda.CfnApplicationCloudWatchLoggingOptionV2(this, 'KdsFlinkProducerLogging', {
        applicationName: kdaApp.ref.toString(),
        cloudWatchLoggingOption: {
          logStreamArn: logStreamArn
        }
    });

    kdaApp.addDependsOn(artifacts.consumerBuildSuccessWaitCondition);
    kdaApp.addDependsOn(emptyBucket.customResource);       //ensures that the app is stopped before the bucket is emptied


    const vpc = new ec2.Vpc(this, 'VPC', {
      subnetConfiguration: [
        {
          cidrMask: 24,
          name: 'public',
          subnetType: ec2.SubnetType.PUBLIC,
        }
      ]
    });
    
    const producerRole = new iam.Role(this, 'ReplayRole', {
      assumedBy: new iam.ServicePrincipal('ec2.amazonaws.com'),
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonSSMManagedInstanceCore')
      ]
    });

    stream.grantReadWrite(producerRole);
    producerRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'kinesis:ListShards' ],
      resources: [ stream.streamArn ]
    }));

    bucket.grantRead(producerRole);
    s3.Bucket.fromBucketName(this, 'BigDataBucket', 'aws-bigdata-blog').grantRead(producerRole);

    producerRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'cloudwatch:PutMetricData' ],
      resources: [ '*' ]
    }));

    producerRole.addToPolicy(new iam.PolicyStatement({
      actions: [ 'kinesisanalytics:StartApplication', 'kinesisanalytics:StopApplication', 'kinesisanalytics:DescribeApplication', 'kinesisanalytics:UpdateApplication' ],
      resources: [ `arn:${cdk.Aws.PARTITION}:kinesisanalytics:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:application/${kdaApp.applicationName}` ]
    }));

    const userData = UserData.forLinux()

    const instance = new ec2.Instance(this, 'ProducerInstance', {
      vpc: vpc,
      vpcSubnets: {
        subnets: vpc.publicSubnets
      },
      instanceType: InstanceType.of(InstanceClass.C5N, InstanceSize.LARGE),
      machineImage: new AmazonLinuxImage({
        generation: AmazonLinuxGeneration.AMAZON_LINUX_2
      }),
      instanceName: `${cdk.Aws.STACK_NAME}/ProducerInstance`,
      userData: userData,
      role: producerRole,
      resourceSignalTimeout: Duration.minutes(5)
    }).instance;

    userData.addCommands(
      'yum install -y tmux jq java-11-amazon-corretto-headless',
      `aws s3 cp --recursive --no-progress --exclude '*' --include 'amazon-kinesis-replay-*.jar' 's3://${bucket.bucketName}/target/' /tmp`,
      `aws --region ${cdk.Aws.REGION} kinesisanalyticsv2 start-application --application-name ${kdaApp.ref} --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'`,
      `/opt/aws/bin/cfn-signal -e $? --stack ${cdk.Aws.STACK_NAME} --resource ${instance.logicalId} --region ${cdk.Aws.REGION}`
    );
    
    instance.addDependsOn(kdaApp);

    new cdk.CfnOutput(this, 'ReplayCommand', { value: `java -jar /tmp/amazon-kinesis-replay-*.jar -streamName ${stream.streamName} -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600` });
    new cdk.CfnOutput(this, 'ConnectToInstance', { value: `https://console.aws.amazon.com/systems-manager/session-manager/${instance.ref}`});


    const dashboard = new cloudwatch.Dashboard(this, 'Dashboard', {
      dashboardName: cdk.Aws.STACK_NAME
    });

    const incomingRecords = new Metric({
      namespace: 'AWS/Kinesis',
      metricName: 'IncomingRecords',
      dimensions: {
        StreamName: stream.streamName
      },
      period: Duration.minutes(1),
      statistic: 'sum'
    });

    const incomingBytes = new Metric({
      namespace: 'AWS/Kinesis',
      metricName: 'IncomingBytes',
      dimensions: {
        StreamName: stream.streamName
      },
      period: Duration.minutes(1),
      statistic: 'sum'
    });

    const outgoingRecords = new Metric({
      namespace: 'AWS/Kinesis',
      metricName: 'GetRecords.Records',
      dimensions: {
        StreamName: stream.streamName
      },
      period: Duration.minutes(1),
      statistic: 'sum'
    });

    const outgoingBytes = new Metric({
      namespace: 'AWS/Kinesis',
      metricName: 'GetRecords.Bytes',
      dimensions: {
        StreamName: stream.streamName
      },
      period: Duration.minutes(1),
      statistic: 'sum'
    });

    const millisBehindLatest = new Metric({
      namespace: 'AWS/KinesisAnalytics',
      metricName: 'millisBehindLatest',
      dimensions: {
        Id: cdk.Fn.join('_', cdk.Fn.split('-', stream.streamName)),
        Application: kdaApp.ref,
        Flow: 'Input'
      },
      period: Duration.minutes(1),
      statistic: 'max',
    });

    const bytesUploaded = new Metric({
      namespace: 'AWS/S3',
      metricName: 'BytesUploaded',
      dimensions: {
        BucketName: bucket.bucketName,
        FilterId: 'EntireBucket'
      },
      period: Duration.minutes(1),
      statistic: 'sum'
    });

    const putRequests = new Metric({
      namespace: 'AWS/S3',
      metricName: 'PutRequests',
      dimensions: {
        BucketName: bucket.bucketName,
        FilterId: 'EntireBucket'
      },
      period: Duration.minutes(1),
      statistic: 'sum'
    });
    

    dashboard.addWidgets(
      new cloudwatch.GraphWidget({
        left: [incomingRecords],
        right: [incomingBytes],
        width: 24,
        title: 'Kinesis data stream (incoming)',
        leftYAxis: {
          min: 0
        },
        rightYAxis: {
          min: 0
        }
      })
    );

    dashboard.addWidgets(
      new cloudwatch.GraphWidget({
        left: [outgoingRecords],
        right: [outgoingBytes],
        width: 24,
        title: 'Kinesis data stream (outgoing)',
        leftYAxis: {
          min: 0
        },
        rightYAxis: {
          min: 0
        }
      })
    );

    dashboard.addWidgets(
      new cloudwatch.GraphWidget({
        left: [
          millisBehindLatest,
          millisBehindLatest.with({
            statistic: "avg"
          })
        ],
        width: 24,
        title: 'Flink consumer lag',
        leftYAxis: {
          label: 'milliseconds',
          showUnits: false,
          min: 0
        }
      })
    );

    dashboard.addWidgets(
      new cloudwatch.GraphWidget({
        left: [putRequests],
        right: [bytesUploaded],
        width: 24,
        title: 'Amazon S3 (incoming)',
        leftYAxis: {
          min: 0
        },
        rightYAxis: {
          min: 0
        }
      })
    );

    new cdk.CfnOutput(this, 'CloudwatchDashboard', { value: `https://console.aws.amazon.com/cloudwatch/home#dashboards:name=${cdk.Aws.STACK_NAME}` });
    new cdk.CfnOutput(this, 'CloudwatchLogsInsights', { value: `https://console.aws.amazon.com/cloudwatch/home#logs-insights:queryDetail=~(end~0~source~'${logGroup.logGroupName}~start~-3600~timeType~'RELATIVE~unit~'seconds)` });
  }