packages/@aws-cdk/aws-kinesisanalytics-flink-alpha/lib/application.ts (327 lines of code) (raw):

import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch'; import * as ec2 from 'aws-cdk-lib/aws-ec2'; import * as iam from 'aws-cdk-lib/aws-iam'; import { CfnApplicationCloudWatchLoggingOptionV2, CfnApplicationV2 } from 'aws-cdk-lib/aws-kinesisanalytics'; import * as logs from 'aws-cdk-lib/aws-logs'; import * as core from 'aws-cdk-lib/core'; import { Construct } from 'constructs'; import { ApplicationCode } from './application-code'; import { environmentProperties } from './private/environment-properties'; import { flinkApplicationConfiguration } from './private/flink-application-configuration'; import { validateFlinkApplicationProps as validateApplicationProps } from './private/validation'; import { LogLevel, MetricsLevel, Runtime } from './types'; import { addConstructMetadata } from 'aws-cdk-lib/core/lib/metadata-resource'; /** * An interface expressing the public properties on both an imported and * CDK-created Flink application. */ export interface IApplication extends core.IResource, ec2.IConnectable, iam.IGrantable { /** * The application ARN. * * @attribute */ readonly applicationArn: string; /** * The name of the Flink application. * * @attribute */ readonly applicationName: string; /** * The application IAM role. */ readonly role?: iam.IRole; /** * Convenience method for adding a policy statement to the application role. */ addToRolePolicy(policyStatement: iam.PolicyStatement): boolean; /** * Return a CloudWatch metric associated with this Flink application. * * @param metricName The name of the metric * @param props Customization properties */ metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of Kinesis Processing Units that are used to run your stream * processing application. The average number of KPUs used each hour * determines the billing for your application. * * Units: Count * * Reporting Level: Application * * @default - average over 5 minutes */ metricKpus(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time elapsed during an outage for failing/recovering jobs. * * Units: Milliseconds * * Reporting Level: Application * * @default - average over 5 minutes */ metricDowntime(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time that the job has been running without interruption. * * Units: Milliseconds * * Reporting Level: Application * * @default - sample count over 5 minutes */ metricUptime(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of times this job has fully restarted since it was * submitted. This metric does not measure fine-grained restarts. * * Units: Count * * Reporting Level: Application * * @default - sum over 5 minutes */ metricFullRestarts(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of times checkpointing has failed. * * Units: Count * * Reporting Level: Application * * @default - sum over 5 minutes */ metricNumberOfFailedCheckpoints(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time it took to complete the last checkpoint. * * Units: Milliseconds * * Reporting Level: Application * * @default - maximum over 5 minutes */ metricLastCheckpointDuration(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total size of the last checkpoint. * * Units: Bytes * * Reporting Level: Application * * @default - maximum over 5 minutes */ metricLastCheckpointSize(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The overall percentage of CPU utilization across task managers. For * example, if there are five task managers, Kinesis Data Analytics publishes * five samples of this metric per reporting interval. * * Units: Percentage * * Reporting Level: Application * * @default - average over 5 minutes */ metricCpuUtilization(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * Overall heap memory utilization across task managers. For example, if there * are five task managers, Kinesis Data Analytics publishes five samples of * this metric per reporting interval. * * Units: Percentage * * Reporting Level: Application * * @default - average over 5 minutes */ metricHeapMemoryUtilization(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total time spent performing old garbage collection operations. * * Units: Milliseconds * * Reporting Level: Application * * @default - sum over 5 minutes */ metricOldGenerationGCTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of old garbage collection operations that have occurred * across all task managers. * * Units: Count * * Reporting Level: Application * * @default - sum over 5 minutes */ metricOldGenerationGCCount(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of live threads used by the application. * * Units: Count * * Reporting Level: Application * * @default - average over 5 minutes */ metricThreadsCount(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of records this application, operator, or task has * received. * * Units: Count * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsIn(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of records this application, operator or task has * received per second. * * Units: Count/Second * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsInPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of records this application, operator or task has emitted. * * Units: Count * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsOut(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of records this application, operator or task has emitted * per second. * * Units: Count/Second * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsOutPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of records this operator or task has dropped due to arriving late. * * Units: Count * * Reporting Level: Application, Operator, Task, Parallelism * * @default - sum over 5 minutes */ metricNumLateRecordsDropped(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The last watermark this application/operator/task/thread has received. * * Units: Milliseconds * * Reporting Level: Application, Operator, Task, Parallelism * * @default - maximum over 5 minutes */ metricCurrentInputWatermark(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The last watermark this application/operator/task/thread has received. * * Units: Milliseconds * * Reporting Level: Application, Operator, Task, Parallelism * * @default - maximum over 5 minutes */ metricCurrentOutputWatermark(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The amount of managed memory currently used. * * Units: Bytes * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricManagedMemoryUsed(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total amount of managed memory. * * Units: Bytes * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricManagedMemoryTotal(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * Derived from managedMemoryUsed/managedMemoryTotal. * * Units: Percentage * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricManagedMemoryUtilization(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time (in milliseconds) this task or operator is idle (has no data to * process) per second. Idle time excludes back pressured time, so if the task * is back pressured it is not idle. * * Units: Milliseconds * * Reporting Level: Operator, Task, Parallelism * * @default - average over 5 minutes */ metricIdleTimeMsPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time (in milliseconds) this task or operator is back pressured per * second. * * Units: Milliseconds * * Reporting Level: Operator, Task, Parallelism * * @default - average over 5 minutes */ metricBackPressuredTimeMsPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time (in milliseconds) this task or operator is busy (neither idle nor * back pressured) per second. Can be NaN, if the value could not be * calculated. * * Units: Milliseconds * * Reporting Level: Operator, Task, Parallelism * * @default - average over 5 minutes */ metricBusyTimePerMsPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; } /** * Implements the functionality shared between CDK created and imported * IApplications. */ abstract class ApplicationBase extends core.Resource implements IApplication { public abstract readonly applicationArn: string; public abstract readonly applicationName: string; public abstract readonly role?: iam.IRole; // Implement iam.IGrantable interface public abstract readonly grantPrincipal: iam.IPrincipal; /** * The underlying connections object for the connections getter. * * @internal */ protected _connections?: ec2.Connections; /** Implement the convenience `IApplication.addToPrincipalPolicy` method. */ public addToRolePolicy(policyStatement: iam.PolicyStatement): boolean { if (this.role) { this.role.addToPrincipalPolicy(policyStatement); return true; } return false; } public get connections() { if (!this._connections) { throw new Error('This Application isn\'t associated with a VPC. Provide a "vpc" prop when creating the Application or "securityGroups" when importing it.'); } return this._connections; } /** * Return a CloudWatch metric associated with this Flink application. * * @param metricName The name of the metric * @param props Customization properties */ metric(metricName: string, props?: cloudwatch.MetricOptions) { return new cloudwatch.Metric({ namespace: 'AWS/KinesisAnalytics', metricName, dimensionsMap: { Application: this.applicationName }, ...props, }).attachTo(this); } /** * The number of Kinesis Processing Units that are used to run your stream * processing application. The average number of KPUs used each hour * determines the billing for your application. * * Units: Count * * Reporting Level: Application * * @default - average over 5 minutes */ metricKpus(props?: cloudwatch.MetricOptions) { return this.metric('KPUs', { statistic: 'Average', ...props }); } /** * The time elapsed during an outage for failing/recovering jobs. * * Units: Milliseconds * * Reporting Level: Application * * @default - average over 5 minutes */ metricDowntime(props?: cloudwatch.MetricOptions) { return this.metric('downtime', { statistic: 'Average', ...props }); } /** * The time that the job has been running without interruption. * * Units: Milliseconds * * Reporting Level: Application * * @default - average over 5 minutes */ metricUptime(props?: cloudwatch.MetricOptions) { return this.metric('uptime', { statistic: 'Average', ...props }); } /** * The total number of times this job has fully restarted since it was * submitted. This metric does not measure fine-grained restarts. * * Units: Count * * Reporting Level: Application * * @default - sum over 5 minutes */ metricFullRestarts(props?: cloudwatch.MetricOptions) { return this.metric('fullRestarts', { statistic: 'Sum', ...props }); } /** * The number of times checkpointing has failed. * * Units: Count * * Reporting Level: Application * * @default - sum over 5 minutes */ metricNumberOfFailedCheckpoints(props?: cloudwatch.MetricOptions) { return this.metric('numberOfFailedCheckpoints', { statistic: 'Sum', ...props }); } /** * The time it took to complete the last checkpoint. * * Units: Milliseconds * * Reporting Level: Application * * @default - maximum over 5 minutes */ metricLastCheckpointDuration(props?: cloudwatch.MetricOptions) { return this.metric('lastCheckpointDuration', { statistic: 'Maximum', ...props }); } /** * The total size of the last checkpoint. * * Units: Bytes * * Reporting Level: Application * * @default - maximum over 5 minutes */ metricLastCheckpointSize(props?: cloudwatch.MetricOptions) { return this.metric('lastCheckpointSize', { statistic: 'Maximum', ...props }); } /** * The overall percentage of CPU utilization across task managers. For * example, if there are five task managers, Kinesis Data Analytics publishes * five samples of this metric per reporting interval. * * Units: Percentage * * Reporting Level: Application * * @default - average over 5 minutes */ metricCpuUtilization(props?: cloudwatch.MetricOptions) { return this.metric('cpuUtilization', { statistic: 'Average', ...props }); } /** * Overall heap memory utilization across task managers. For example, if there * are five task managers, Kinesis Data Analytics publishes five samples of * this metric per reporting interval. * * Units: Percentage * * Reporting Level: Application * * @default - average over 5 minutes */ metricHeapMemoryUtilization(props?: cloudwatch.MetricOptions) { return this.metric('heapMemoryUtilization', { statistic: 'Average', ...props }); } /** * The total time spent performing old garbage collection operations. * * Units: Milliseconds * * Reporting Level: Application * * @default - sum over 5 minutes */ metricOldGenerationGCTime(props?: cloudwatch.MetricOptions) { return this.metric('oldGenerationGCTime', { statistic: 'Sum', ...props }); } /** * The total number of old garbage collection operations that have occurred * across all task managers. * * Units: Count * * Reporting Level: Application * * @default - sum over 5 minutes */ metricOldGenerationGCCount(props?: cloudwatch.MetricOptions) { return this.metric('oldGenerationGCCount', { statistic: 'Sum', ...props }); } /** * The total number of live threads used by the application. * * Units: Count * * Reporting Level: Application * * @default - average over 5 minutes */ metricThreadsCount(props?: cloudwatch.MetricOptions) { return this.metric('threadsCount', { statistic: 'Average', ...props }); } /** * The total number of records this application, operator, or task has * received. * * Units: Count * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsIn(props?: cloudwatch.MetricOptions) { return this.metric('numRecordsIn', { statistic: 'Average', ...props }); } /** * The total number of records this application, operator or task has received * per second. * * Units: Count/Second * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsInPerSecond(props?: cloudwatch.MetricOptions) { return this.metric('numRecordsInPerSecond', { statistic: 'Average', ...props }); } /** * The total number of records this application, operator or task has emitted. * * Units: Count * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsOut(props?: cloudwatch.MetricOptions) { return this.metric('numRecordsOut', { statistic: 'Average', ...props }); } /** * The total number of records this application, operator or task has emitted * per second. * * Units: Count/Second * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricNumRecordsOutPerSecond(props?: cloudwatch.MetricOptions) { return this.metric('numRecordsOutPerSecond', { statistic: 'Average', ...props }); } /** * The number of records this operator or task has dropped due to arriving * late. * * Units: Count * * Reporting Level: Application, Operator, Task, Parallelism * * @default - sum over 5 minutes */ metricNumLateRecordsDropped(props?: cloudwatch.MetricOptions) { return this.metric('numLateRecordsDropped', { statistic: 'Sum', ...props }); } /** * The last watermark this application/operator/task/thread has received. * * Units: Milliseconds * * Reporting Level: Application, Operator, Task, Parallelism * * @default - maximum over 5 minutes */ metricCurrentInputWatermark(props?: cloudwatch.MetricOptions) { return this.metric('currentInputWatermark', { statistic: 'Maximum', ...props }); } /** * The last watermark this application/operator/task/thread has received. * * Units: Milliseconds * * Reporting Level: Application, Operator, Task, Parallelism * * @default - maximum over 5 minutes */ metricCurrentOutputWatermark(props?: cloudwatch.MetricOptions) { return this.metric('currentOutputWatermark', { statistic: 'Maximum', ...props }); } /** * The amount of managed memory currently used. * * Units: Bytes * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricManagedMemoryUsed(props?: cloudwatch.MetricOptions) { return this.metric('managedMemoryUsed', { statistic: 'Average', ...props }); } /** * The total amount of managed memory. * * Units: Bytes * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricManagedMemoryTotal(props?: cloudwatch.MetricOptions) { return this.metric('managedMemoryTotal', { statistic: 'Average', ...props }); } /** * Derived from managedMemoryUsed/managedMemoryTotal. * * Units: Percentage * * Reporting Level: Application, Operator, Task, Parallelism * * @default - average over 5 minutes */ metricManagedMemoryUtilization(props?: cloudwatch.MetricOptions) { return this.metric('managedMemoryUtilization', { statistic: 'Average', ...props }); } /** * The time (in milliseconds) this task or operator is idle (has no data to * process) per second. Idle time excludes back pressured time, so if the task * is back pressured it is not idle. * * Units: Milliseconds * * Reporting Level: Operator, Task, Parallelism * * @default - average over 5 minutes */ metricIdleTimeMsPerSecond(props?: cloudwatch.MetricOptions) { return this.metric('idleTimeMsPerSecond', { statistic: 'Average', ...props }); } /** * The time (in milliseconds) this task or operator is back pressured per * second. * * Units: Milliseconds * * Reporting Level: Operator, Task, Parallelism * * @default - average over 5 minutes */ metricBackPressuredTimeMsPerSecond(props?: cloudwatch.MetricOptions) { return this.metric('backPressuredTimeMsPerSecond', { statistic: 'Average', ...props }); } /** * The time (in milliseconds) this task or operator is busy (neither idle nor * back pressured) per second. Can be NaN, if the value could not be * calculated. * * Units: Milliseconds * * Reporting Level: Operator, Task, Parallelism * * @default - average over 5 minutes */ metricBusyTimePerMsPerSecond(props?: cloudwatch.MetricOptions) { return this.metric('busyTimePerMsPerSecond', { statistic: 'Average', ...props }); } } /** * Attributes used for importing an Application with Application.fromApplicationAttributes. */ export interface ApplicationAttributes { /** * The ARN of the Flink application. * * Format: arn:<partition>:kinesisanalytics:<region>:<account-id>:application/<application-name> */ readonly applicationArn: string; /** * The security groups for this Flink application if deployed in a VPC. * * @default - no security groups */ readonly securityGroups?: ec2.ISecurityGroup[]; } /** * Props for creating an Application construct. */ export interface ApplicationProps { /** * A name for your Application that is unique to an AWS account. * * @default - CloudFormation-generated name */ readonly applicationName?: string; /** * The Flink version to use for this application. */ readonly runtime: Runtime; /** * The Flink code asset to run. */ readonly code: ApplicationCode; /** * Whether checkpointing is enabled while your application runs. * * @default true */ readonly checkpointingEnabled?: boolean; /** * The interval between checkpoints. * * @default - 1 minute */ readonly checkpointInterval?: core.Duration; /** * The minimum amount of time in to wait after a checkpoint finishes to start * a new checkpoint. * * @default - 5 seconds */ readonly minPauseBetweenCheckpoints?: core.Duration; /** * The level of log verbosity from the Flink application. * * @default FlinkLogLevel.INFO */ readonly logLevel?: LogLevel; /** * Describes the granularity of the CloudWatch metrics for an application. * Use caution with Parallelism level metrics. Parallelism granularity logs * metrics for each parallel thread and can quickly become expensive when * parallelism is high (e.g. > 64). * * @default MetricsLevel.APPLICATION */ readonly metricsLevel?: MetricsLevel; /** * Whether the Kinesis Data Analytics service can increase the parallelism of * the application in response to resource usage. * * @default true */ readonly autoScalingEnabled?: boolean; /** * The initial parallelism for the application. Kinesis Data Analytics can * stop the app, increase the parallelism, and start the app again if * autoScalingEnabled is true (the default value). * * @default 1 */ readonly parallelism?: number; /** * The Flink parallelism allowed per Kinesis Processing Unit (KPU). * * @default 1 */ readonly parallelismPerKpu?: number; /** * Determines if Flink snapshots are enabled. * * @default true */ readonly snapshotsEnabled?: boolean; /** * Configuration PropertyGroups. You can use these property groups to pass * arbitrary runtime configuration values to your Flink app. * * @default - No property group configuration provided to the Flink app */ readonly propertyGroups?: { readonly [propertyId: string]: { [mapKey: string]: string } }; /** * A role to use to grant permissions to your application. Prefer omitting * this property and using the default role. * * @default - a new Role will be created */ readonly role?: iam.IRole; /** * Provide a RemovalPolicy to override the default. * * @default RemovalPolicy.DESTROY */ readonly removalPolicy?: core.RemovalPolicy; /** * The log group to send log entries to. * * @default - CDK's default LogGroup */ readonly logGroup?: logs.ILogGroup; /** * Deploy the Flink application in a VPC. * * @default - no VPC */ readonly vpc?: ec2.IVpc; /** * Choose which VPC subnets to use. * * @default - SubnetType.PRIVATE_WITH_EGRESS subnets */ readonly vpcSubnets?: ec2.SubnetSelection; /** * Security groups to use with a provided VPC. * * @default - a new security group is created for this application. */ readonly securityGroups?: ec2.ISecurityGroup[]; } /** * An imported Flink application. */ class Import extends ApplicationBase { public readonly grantPrincipal: iam.IPrincipal; public readonly role?: iam.IRole; public readonly applicationName: string; public readonly applicationArn: string; constructor(scope: Construct, id: string, attrs: { applicationArn: string; securityGroups?: ec2.ISecurityGroup[] }) { super(scope, id); // Enhanced CDK Analytics Telemetry addConstructMetadata(this, attrs); // Imported applications have no associated role or grantPrincipal this.grantPrincipal = new iam.UnknownPrincipal({ resource: this }); this.role = undefined; this.applicationArn = attrs.applicationArn; const applicationName = core.Stack.of(scope).splitArn(attrs.applicationArn, core.ArnFormat.SLASH_RESOURCE_NAME).resourceName; if (!applicationName) { throw new Error(`applicationArn for fromApplicationArn (${attrs.applicationArn}) must include resource name`); } this.applicationName = applicationName; const securityGroups = attrs.securityGroups ?? []; if (securityGroups.length > 0) { this._connections = new ec2.Connections({ securityGroups }); } } } /** * The L2 construct for Flink Kinesis Data Applications. * * @resource AWS::KinesisAnalyticsV2::Application * */ export class Application extends ApplicationBase { /** * Import an existing Flink application defined outside of CDK code by * applicationName. */ public static fromApplicationName(scope: Construct, id: string, applicationName: string): IApplication { const applicationArn = core.Stack.of(scope).formatArn(applicationArnComponents(applicationName)); return new Import(scope, id, { applicationArn }); } /** * Import an existing application defined outside of CDK code by * applicationArn. */ public static fromApplicationArn(scope: Construct, id: string, applicationArn: string): IApplication { return new Import(scope, id, { applicationArn }); } /** * Import an existing application defined outside of CDK code. */ public static fromApplicationAttributes(scope: Construct, id: string, attrs: ApplicationAttributes): IApplication { return new Import(scope, id, { applicationArn: attrs.applicationArn, securityGroups: attrs.securityGroups, }); } public readonly applicationArn: string; public readonly applicationName: string; // Role must be optional for JSII compatibility public readonly role?: iam.IRole; public readonly grantPrincipal: iam.IPrincipal; constructor(scope: Construct, id: string, props: ApplicationProps) { super(scope, id, { physicalName: props.applicationName }); // Enhanced CDK Analytics Telemetry addConstructMetadata(this, props); validateApplicationProps(props); this.role = props.role ?? new iam.Role(this, 'Role', { assumedBy: new iam.ServicePrincipal('kinesisanalytics.amazonaws.com'), }); this.grantPrincipal = this.role; // Permit metric publishing to CloudWatch this.role.addToPrincipalPolicy(new iam.PolicyStatement({ actions: ['cloudwatch:PutMetricData'], resources: ['*'], })); const code = props.code.bind(this); code.bucket.grantRead(this); let vpcConfigurations; if (props.vpc) { const securityGroups = props.securityGroups ?? [ new ec2.SecurityGroup(this, 'SecurityGroup', { vpc: props.vpc, }), ]; this._connections = new ec2.Connections({ securityGroups }); const subnetSelection = props.vpcSubnets ?? { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS, }; vpcConfigurations = [{ securityGroupIds: securityGroups.map(sg => sg.securityGroupId), subnetIds: props.vpc.selectSubnets(subnetSelection).subnetIds, }]; } const resource = new CfnApplicationV2(this, 'Resource', { applicationName: props.applicationName, runtimeEnvironment: props.runtime.value, serviceExecutionRole: this.role.roleArn, applicationConfiguration: { ...code.applicationCodeConfigurationProperty, environmentProperties: environmentProperties(props.propertyGroups), flinkApplicationConfiguration: flinkApplicationConfiguration({ checkpointingEnabled: props.checkpointingEnabled, checkpointInterval: props.checkpointInterval, minPauseBetweenCheckpoints: props.minPauseBetweenCheckpoints, logLevel: props.logLevel, metricsLevel: props.metricsLevel, autoScalingEnabled: props.autoScalingEnabled, parallelism: props.parallelism, parallelismPerKpu: props.parallelismPerKpu, }), applicationSnapshotConfiguration: { snapshotsEnabled: props.snapshotsEnabled ?? true, }, vpcConfigurations, }, }); resource.node.addDependency(this.role); const logGroup = props.logGroup ?? new logs.LogGroup(this, 'LogGroup'); const logStream = new logs.LogStream(this, 'LogStream', { logGroup }); /* Permit logging */ this.role.addToPrincipalPolicy(new iam.PolicyStatement({ actions: ['logs:DescribeLogGroups'], resources: [ core.Stack.of(this).formatArn({ service: 'logs', resource: 'log-group', arnFormat: core.ArnFormat.COLON_RESOURCE_NAME, resourceName: '*', }), ], })); this.role.addToPrincipalPolicy(new iam.PolicyStatement({ actions: ['logs:DescribeLogStreams'], resources: [logGroup.logGroupArn], })); const logStreamArn = `arn:${core.Aws.PARTITION}:logs:${core.Aws.REGION}:${core.Aws.ACCOUNT_ID}:log-group:${logGroup.logGroupName}:log-stream:${logStream.logStreamName}`; this.role.addToPrincipalPolicy(new iam.PolicyStatement({ actions: ['logs:PutLogEvents'], resources: [logStreamArn], })); new CfnApplicationCloudWatchLoggingOptionV2(this, 'LoggingOption', { applicationName: resource.ref, cloudWatchLoggingOption: { logStreamArn, }, }); // Permissions required for VPC usage per: // https://docs.aws.amazon.com/kinesisanalytics/latest/java/vpc-permissions.html if (props.vpc) { this.role.addToPrincipalPolicy(new iam.PolicyStatement({ actions: [ 'ec2:DescribeVpcs', 'ec2:DescribeSubnets', 'ec2:DescribeSecurityGroups', 'ec2:DescribeDhcpOptions', 'ec2:CreateNetworkInterface', 'ec2:CreateNetworkInterfacePermission', 'ec2:DescribeNetworkInterfaces', 'ec2:DeleteNetworkInterface', ], resources: ['*'], })); } this.applicationName = this.getResourceNameAttribute(resource.ref); this.applicationArn = this.getResourceArnAttribute( core.Stack.of(this).formatArn(applicationArnComponents(resource.ref)), applicationArnComponents(this.physicalName), ); resource.applyRemovalPolicy(props.removalPolicy, { default: core.RemovalPolicy.DESTROY, }); } } function applicationArnComponents(resourceName: string): core.ArnComponents { return { service: 'kinesisanalytics', resource: 'application', resourceName, }; }