in cdk/lib/kinesis-analytics-infrastructure.ts [31:169]
constructor(scope: cdk.Construct, id: string, props: KinesisAnalyticsProps) {
super(scope, id);
const logGroup = new logs.LogGroup(this, 'FlinkLogGroup', {
retention: RetentionDays.ONE_WEEK
});
new cdk.CfnOutput(this, 'FlinkLogGroupName', { value: logGroup.logGroupName });
const role = props.role
props.bucket.grantRead(role);
props.inputStream.grantRead(role);
props.inputStream.grant(role, 'kinesis:DescribeStream');
cloudwatch.Metric.grantPutMetricData(role);
logGroup.grantWrite(role);
logGroup.grant(role, "logs:DescribeLogStreams");
role.addToPolicy(
new iam.PolicyStatement({
actions: ['logs:DescribeLogGroups'],
resources: [`arn:aws:logs:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:log-group:*`]
})
);
const bucket = Bucket.fromBucketName(this, 'TclBucket', 'nyc-tlc');
bucket.grantRead(role);
const logStream = new logs.LogStream(this, 'FlinkLogStream', {
logGroup: logGroup
});
const flinkApp = new kda.CfnApplicationV2(this, 'FlinkApplication', {
runtimeEnvironment: 'FLINK-1_11',
serviceExecutionRole: role.roleArn,
applicationName: `${cdk.Aws.STACK_NAME}`,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: props.bucket.bucketArn,
fileKey: `target/${props.consumerApplicationJarObject}`
}
},
codeContentType: 'ZIPFILE'
},
environmentProperties: {
propertyGroups: [
{
propertyGroupId: 'BeamApplicationProperties',
propertyMap: {
AwsRegion: cdk.Aws.REGION,
InputStreamName: props.inputStream.streamName,
OutputBoroughs: 'false',
InputS3Pattern: `s3://${props.bucket.bucketName}/kinesis-stream-data/*/*/*/*/*`,
Source: 'kinesis',
CodeGuruProfilingGroupName: 'flink-beam-app'
}
}
]
},
flinkApplicationConfiguration: {
monitoringConfiguration: {
logLevel: 'INFO',
metricsLevel: 'TASK',
configurationType: 'CUSTOM'
},
parallelismConfiguration: {
autoScalingEnabled: false,
parallelism: 4,
parallelismPerKpu: 1,
configurationType: 'CUSTOM'
}
},
applicationSnapshotConfiguration: {
snapshotsEnabled: false
}
}
});
new kda.CfnApplicationCloudWatchLoggingOptionV2(this, 'FlinkLogging', {
applicationName: flinkApp.ref.toString(),
cloudWatchLoggingOption: {
logStreamArn: `arn:aws:logs:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:log-group:${logGroup.logGroupName}:log-stream:${logStream.logStreamName}`
}
});
flinkApp.node.addDependency(props.buildSuccessWaitCondition);
const lambdaSource = fs.readFileSync('lambda/stop-kda-app.py').toString();
const terminateAppLambda = new lambda.Function(this, 'TerminateAppLambda', {
runtime: lambda.Runtime.PYTHON_3_7,
timeout: Duration.minutes(15),
code: lambda.Code.fromInline(lambdaSource),
handler: 'index.empty_bucket',
memorySize: 512,
environment: {
application_name: flinkApp.ref,
}
});
terminateAppLambda.addToRolePolicy(
new iam.PolicyStatement({
actions: ['kinesisanalytics:StopApplication'],
resources: [`arn:${cdk.Aws.PARTITION}:kinesisanalytics:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:application/${flinkApp.ref}`]
})
);
new logs.MetricFilter(this, 'AppTerminatedFilter', {
filterPattern: logs.FilterPattern.literal('Job reached globally terminal state FINISHED'),
metricNamespace: 'Beam',
metricName: 'BeamApplicationFinished',
metricValue: '1',
defaultValue: 0,
logGroup: logGroup
});
const metric = new cloudwatch.Metric({
namespace: 'Beam',
metricName: 'BeamApplicationFinished',
period: cdk.Duration.minutes(1)
});
const alarm = metric.createAlarm(this, 'AppTerminatedAlarm', {
threshold: 0,
actionsEnabled: true,
comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
treatMissingData: TreatMissingData.NOT_BREACHING,
evaluationPeriods: 1,
statistic: "sum"
});
const topic = new sns.Topic(this, 'AppTerminatedTopic');
topic.addSubscription(new subs.LambdaSubscription(terminateAppLambda));
alarm.addAlarmAction(new cloudwatch_actions.SnsAction(topic));
}