in cdk/lib/cdk-stack.ts [28:135]
constructor(scope: cdk.Construct, id: string, props: StackProps) {
super(scope, id, props);
this.templateOptions.description = 'Creates sample Apache Beam pipeline that can be deployed to Kinesis Data Analytics for Java Applications and Amazon EMR (amazon-kinesis-analytics-beam-taxi-consumer)'
const bucket = new s3.Bucket(this, 'Bucket', {
versioned: true
});
const emptyBucket = new EmptyBucketOnDelete(this, 'EmptyBucket', {
bucket: bucket
});
new cdk.CfnOutput(this, 'S3Bucket', { value: bucket.bucketName });
const consumerBuild = new GithubBuildPipeline(this, 'BeamTaxiConsumerBuildPipeline', {
bucket: bucket,
url: `https://github.com/aws-samples/amazon-kinesis-analytics-beam-taxi-consumer/archive/${props.consumerApplicationVersion}.zip`,
extract: true,
});
if (!(props.demoInfrastructure || props.completeInfrastructure)) {
return;
}
const keyName = new cdk.CfnParameter(this, 'KeyName', {
type: 'AWS::EC2::KeyPair::KeyName'
}).valueAsString;
const vpc = new ec2.Vpc(this, 'VPC', {
cidr: "10.0.0.0/16",
subnetConfiguration: [
{
cidrMask: 24,
name: 'public',
subnetType: ec2.SubnetType.PUBLIC,
}
]
});
const lambdaSource = fs.readFileSync('lambda/add-approximate-arrival-time.js').toString();
const enrichEvents = new lambda.Function(this, 'EnrichEventsLambda', {
runtime: lambda.Runtime.NODEJS_12_X,
code: lambda.Code.inline(lambdaSource),
timeout: Duration.seconds(60),
handler: 'index.handler'
});
const replay = new KinesisReplay(this, 'KinesisReplayInfrastructure', {
...props,
bucket: bucket,
keyName: keyName,
vpc: vpc
});
const emr = new EmrInfrastructure(this, 'EmrInfrastructure', {
bucket: bucket,
keyName: keyName,
vpc: vpc
});
if (!props.completeInfrastructure) {
new cdk.CfnOutput(replay, 'KinesisReplayCommand', { value: `java -jar amazon-kinesis-replay-*.jar -streamRegion ${cdk.Aws.REGION} -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 720 -streamName beam-summit` });
new cdk.CfnOutput(emr, 'StartFlinkApplication', { value: `flink run -p 8 ${props.consumerApplicationJarObject} --runner=FlinkRunner --inputS3Pattern=s3://${bucket.bucketName}/kinesis-stream-data/*/*/*/*/* --inputStreamName=beam-summit --awsRegion=${cdk.Aws.REGION} --source=s3 --outputBoroughs=true` });
return;
}
const stream = new kds.Stream(this, 'InputStream', {
shardCount: 4
});
const role = new iam.Role(this, 'KinesisAnalyticsRole', {
assumedBy: new iam.ServicePrincipal('kinesisanalytics.amazonaws.com')
});
const dashboard = new BeamDashboard(this, 'Dashboard', {
inputStream: stream,
dashboardName: cdk.Aws.STACK_NAME
});
new FirehoseInfrastructure(this, 'FirehoseInfrastructure', {
bucket: bucket,
emptyBucket: emptyBucket,
inputStream: stream,
lambda: enrichEvents,
buildSuccessWaitCondition: consumerBuild.buildSuccessWaitCondition
});
new KinesisAnalyticsJava(this, 'FlinkInfrastructure', {
...props,
dashboard: dashboard,
bucket: bucket,
role: role,
inputStream: stream,
buildSuccessWaitCondition: consumerBuild.buildSuccessWaitCondition,
});
new CodeGuruProfilerInfrastructure(this, 'CodeGuruProfilerInfrastructure', {
groupName: 'flink-beam-app',
role: role,
})
new cdk.CfnOutput(replay, 'KinesisReplayCommand', { value: `java -jar amazon-kinesis-replay-*.jar -streamRegion ${cdk.Aws.REGION} -streamName ${stream.streamName} -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 720` });
new cdk.CfnOutput(emr, 'StartFlinkApplication', { value: `flink run -p 8 ${props.consumerApplicationJarObject} --runner=FlinkRunner --inputS3Pattern=s3://${bucket.bucketName}/kinesis-stream-data/*/*/*/*/* --awsRegion=${cdk.Aws.REGION} --inputStreamName=${stream.streamName} --source=s3 --outputBoroughs=true` });
}