packages/aws-cdk-lib/aws-kinesis/lib/stream.ts (323 lines of code) (raw):

import { Construct } from 'constructs'; import { KinesisMetrics } from './kinesis-fixed-canned-metrics'; import { CfnStream } from './kinesis.generated'; import { ResourcePolicy } from './resource-policy'; import * as cloudwatch from '../../aws-cloudwatch'; import * as iam from '../../aws-iam'; import * as kms from '../../aws-kms'; import { ArnFormat, Aws, CfnCondition, Duration, Fn, IResolvable, IResource, RemovalPolicy, Resource, ResourceProps, Stack, Token, ValidationError } from '../../core'; import { addConstructMetadata } from '../../core/lib/metadata-resource'; const READ_OPERATIONS = [ 'kinesis:DescribeStreamSummary', 'kinesis:GetRecords', 'kinesis:GetShardIterator', 'kinesis:ListShards', 'kinesis:SubscribeToShard', 'kinesis:DescribeStream', 'kinesis:ListStreams', 'kinesis:DescribeStreamConsumer', ]; const UNSUPPORTED_RESOURCE_POLICY_READ_OPERATIONS = new Set<string>([ 'kinesis:DescribeStreamConsumer', 'kinesis:ListStreams', 'kinesis:SubscribeToShard', ]); const WRITE_OPERATIONS = [ 'kinesis:ListShards', 'kinesis:PutRecord', 'kinesis:PutRecords', ]; /** * A Kinesis Stream */ export interface IStream extends IResource { /** * The ARN of the stream. * * @attribute */ readonly streamArn: string; /** * The name of the stream * * @attribute */ readonly streamName: string; /** * Optional KMS encryption key associated with this stream. */ readonly encryptionKey?: kms.IKey; /** * Adds a statement to the IAM resource policy associated with this stream. * * If this stream was created in this stack (`new Stream`), a resource policy * will be automatically created upon the first call to `addToResourcePolicy`. If * the stream is imported (`Stream.import`), then this is a no-op. */ addToResourcePolicy(statement: iam.PolicyStatement): iam.AddToResourcePolicyResult; /** * Grant read permissions for this stream and its contents to an IAM * principal (Role/Group/User). * * If an encryption key is used, permission to ues the key to decrypt the * contents of the stream will also be granted. */ grantRead(grantee: iam.IGrantable): iam.Grant; /** * Grant write permissions for this stream and its contents to an IAM * principal (Role/Group/User). * * If an encryption key is used, permission to ues the key to encrypt the * contents of the stream will also be granted. */ grantWrite(grantee: iam.IGrantable): iam.Grant; /** * Grants read/write permissions for this stream and its contents to an IAM * principal (Role/Group/User). * * If an encryption key is used, permission to use the key for * encrypt/decrypt will also be granted. */ grantReadWrite(grantee: iam.IGrantable): iam.Grant; /** * Grant the indicated permissions on this stream to the provided IAM principal. */ grant(grantee: iam.IGrantable, ...actions: string[]): iam.Grant; /** * Return stream metric based from its metric name * * @param metricName name of the stream metric * @param props properties of the metric */ metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of bytes retrieved from the Kinesis stream, measured over the specified time period. Minimum, Maximum, * and Average statistics represent the bytes in a single GetRecords operation for the stream in the specified time * period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricGetRecordsBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The age of the last record in all GetRecords calls made against a Kinesis stream, measured over the specified time * period. Age is the difference between the current time and when the last record of the GetRecords call was written * to the stream. The Minimum and Maximum statistics can be used to track the progress of Kinesis consumer * applications. A value of zero indicates that the records being read are completely caught up with the stream. * * The metric defaults to maximum over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricGetRecordsIteratorAgeMilliseconds(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time taken per GetRecords operation, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricGetRecordsLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of records retrieved from the shard, measured over the specified time period. Minimum, Maximum, and * Average statistics represent the records in a single GetRecords operation for the stream in the specified time * period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricGetRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of successful GetRecords operations per stream, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricGetRecordsSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of bytes successfully put to the Kinesis stream over the specified time period. This metric includes * bytes from PutRecord and PutRecords operations. Minimum, Maximum, and Average statistics represent the bytes in a * single put operation for the stream in the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of records successfully put to the Kinesis stream over the specified time period. This metric includes * record counts from PutRecord and PutRecords operations. Minimum, Maximum, and Average statistics represent the * records in a single put operation for the stream in the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of bytes put to the Kinesis stream using the PutRecord operation over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time taken per PutRecord operation, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of successful PutRecord operations per Kinesis stream, measured over the specified time period. Average * reflects the percentage of successful writes to a stream. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of bytes put to the Kinesis stream using the PutRecords operation over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The time taken per PutRecords operation, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsLatency(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of PutRecords operations where at least one record succeeded, per Kinesis stream, measured over the * specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The total number of records sent in a PutRecords operation per Kinesis data stream, measured over the specified * time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsTotalRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of successful records in a PutRecords operation per Kinesis data stream, measured over the specified * time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsSuccessfulRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of records rejected due to internal failures in a PutRecords operation per Kinesis data stream, * measured over the specified time period. Occasional internal failures are to be expected and should be retried. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsFailedRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of records rejected due to throttling in a PutRecords operation per Kinesis data stream, measured over * the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordsThrottledRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of GetRecords calls throttled for the stream over the specified time period. The most commonly used * statistic for this metric is Average. * * When the Minimum statistic has a value of 1, all records were throttled for the stream during the specified time * period. * * When the Maximum statistic has a value of 0 (zero), no records were throttled for the stream during the specified * time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties * * @param props properties of the metric * */ metricReadProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric; /** * The number of records rejected due to throttling for the stream over the specified time period. This metric * includes throttling from PutRecord and PutRecords operations. * * When the Minimum statistic has a non-zero value, records were being throttled for the stream during the specified * time period. * * When the Maximum statistic has a value of 0 (zero), no records were being throttled for the stream during the * specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricWriteProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric; } /** * A reference to a stream. The easiest way to instantiate is to call * `stream.export()`. Then, the consumer can use `Stream.import(this, ref)` and * get a `Stream`. */ export interface StreamAttributes { /** * The ARN of the stream. */ readonly streamArn: string; /** * The KMS key securing the contents of the stream if encryption is enabled. * * @default - No encryption */ readonly encryptionKey?: kms.IKey; } /** * Represents a Kinesis Stream. */ abstract class StreamBase extends Resource implements IStream { /** * The ARN of the stream. */ public abstract readonly streamArn: string; /** * The name of the stream */ public abstract readonly streamName: string; /** * Optional KMS encryption key associated with this stream. */ public abstract readonly encryptionKey?: kms.IKey; /** * Indicates if a stream resource policy should automatically be created upon * the first call to `addToResourcePolicy`. * * Set by subclasses. */ protected abstract readonly autoCreatePolicy: boolean; private resourcePolicy?: ResourcePolicy; constructor(scope: Construct, id: string, props: ResourceProps = {}) { super(scope, id, props); this.node.addValidation({ validate: () => this.resourcePolicy?.document.validateForResourcePolicy() ?? [] }); } /** * Adds a statement to the IAM resource policy associated with this stream. * * If this stream was created in this stack (`new Stream`), a resource policy * will be automatically created upon the first call to `addToResourcePolicy`. If * the stream is imported (`Stream.import`), then this is a no-op. */ public addToResourcePolicy(statement: iam.PolicyStatement): iam.AddToResourcePolicyResult { if (!this.resourcePolicy && this.autoCreatePolicy) { this.resourcePolicy = new ResourcePolicy(this, 'Policy', { stream: this }); } if (this.resourcePolicy) { this.resourcePolicy.document.addStatements(statement); return { statementAdded: true, policyDependable: this.resourcePolicy }; } return { statementAdded: false }; } /** * Grant read permissions for this stream and its contents to an IAM * principal (Role/Group/User). * * If an encryption key is used, permission to ues the key to decrypt the * contents of the stream will also be granted. */ public grantRead(grantee: iam.IGrantable) { const ret = this.grant(grantee, ...READ_OPERATIONS); if (this.encryptionKey) { this.encryptionKey.grantDecrypt(grantee); } return ret; } /** * Grant write permissions for this stream and its contents to an IAM * principal (Role/Group/User). * * If an encryption key is used, permission to ues the key to encrypt the * contents of the stream will also be granted. */ public grantWrite(grantee: iam.IGrantable) { const ret = this.grant(grantee, ...WRITE_OPERATIONS); this.encryptionKey?.grantEncrypt(grantee); return ret; } /** * Grants read/write permissions for this stream and its contents to an IAM * principal (Role/Group/User). * * If an encryption key is used, permission to use the key for * encrypt/decrypt will also be granted. */ public grantReadWrite(grantee: iam.IGrantable) { const ret = this.grant(grantee, ...Array.from(new Set([...READ_OPERATIONS, ...WRITE_OPERATIONS]))); this.encryptionKey?.grantEncryptDecrypt(grantee); return ret; } /** * Grant the indicated permissions on this stream to the given IAM principal (Role/Group/User). */ public grant(grantee: iam.IGrantable, ...actions: string[]) { return iam.Grant.addToPrincipalOrResource({ grantee, actions, resourceArns: [this.streamArn], resource: { addToResourcePolicy: (statement) => { // filter out actions not supported by stream resource policy (defined in {@link READ_OPERATIONS} and {@link WRITE_OPERATIONS}) const filteredActions = statement.actions.filter(action => !UNSUPPORTED_RESOURCE_POLICY_READ_OPERATIONS.has(action)); if (filteredActions.length > 0) { const filteredActionsStatement = statement.copy({ actions: filteredActions, }); return this.addToResourcePolicy(filteredActionsStatement); } return { statementAdded: false }; }, node: this.node, stack: this.stack, env: this.env, applyRemovalPolicy: x => this.applyRemovalPolicy(x), }, }); } /** * Return stream metric based from its metric name * * @param metricName name of the stream metric * @param props properties of the metric */ public metric(metricName: string, props?: cloudwatch.MetricOptions) { return new cloudwatch.Metric({ namespace: 'AWS/Kinesis', metricName, dimensionsMap: { StreamName: this.streamName, }, ...props, }).attachTo(this); } /** * The number of bytes retrieved from the Kinesis stream, measured over the specified time period. Minimum, Maximum, * and Average statistics represent the bytes in a single GetRecords operation for the stream in the specified time * period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricGetRecordsBytes(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.getRecordsBytesAverage, props); } /** * The age of the last record in all GetRecords calls made against a Kinesis stream, measured over the specified time * period. Age is the difference between the current time and when the last record of the GetRecords call was written * to the stream. The Minimum and Maximum statistics can be used to track the progress of Kinesis consumer * applications. A value of zero indicates that the records being read are completely caught up with the stream. * * The metric defaults to maximum over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricGetRecordsIteratorAgeMilliseconds(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.getRecordsIteratorAgeMillisecondsMaximum, props); } /** * The number of successful GetRecords operations per stream, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricGetRecordsSuccess(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.getRecordsSuccessAverage, props); } /** * The number of records retrieved from the shard, measured over the specified time period. Minimum, Maximum, and * Average statistics represent the records in a single GetRecords operation for the stream in the specified time * period. * * average * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricGetRecords(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.getRecordsRecordsAverage, props); } /** * The number of successful GetRecords operations per stream, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricGetRecordsLatency(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.getRecordsLatencyAverage, props); } /** * The number of bytes put to the Kinesis stream using the PutRecord operation over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordBytes(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordBytesAverage, props); } /** * The time taken per PutRecord operation, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ metricPutRecordLatency(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordLatencyAverage, props); } /** * The number of successful PutRecord operations per Kinesis stream, measured over the specified time period. Average * reflects the percentage of successful writes to a stream. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordSuccess(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordSuccessAverage, props); } /** * The number of bytes put to the Kinesis stream using the PutRecords operation over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsBytes(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsBytesAverage, props); } /** * The time taken per PutRecords operation, measured over the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsLatency(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsLatencyAverage, props); } /** * The number of PutRecords operations where at least one record succeeded, per Kinesis stream, measured over the * specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsSuccess(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsSuccessAverage, props); } /** * The total number of records sent in a PutRecords operation per Kinesis data stream, measured over the specified * time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsTotalRecords(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsTotalRecordsAverage, props); } /** * The number of successful records in a PutRecords operation per Kinesis data stream, measured over the specified * time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsSuccessfulRecords(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsSuccessfulRecordsAverage, props); } /** * The number of records rejected due to internal failures in a PutRecords operation per Kinesis data stream, * measured over the specified time period. Occasional internal failures are to be expected and should be retried. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsFailedRecords(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsFailedRecordsAverage, props); } /** * The number of records rejected due to throttling in a PutRecords operation per Kinesis data stream, measured over * the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricPutRecordsThrottledRecords(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.putRecordsThrottledRecordsAverage, props); } /** * The number of bytes successfully put to the Kinesis stream over the specified time period. This metric includes * bytes from PutRecord and PutRecords operations. Minimum, Maximum, and Average statistics represent the bytes in a * single put operation for the stream in the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricIncomingBytes(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.incomingBytesAverage, props); } /** * The number of records successfully put to the Kinesis stream over the specified time period. This metric includes * record counts from PutRecord and PutRecords operations. Minimum, Maximum, and Average statistics represent the * records in a single put operation for the stream in the specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricIncomingRecords(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.incomingRecordsAverage, props); } /** * The number of GetRecords calls throttled for the stream over the specified time period. The most commonly used * statistic for this metric is Average. * * When the Minimum statistic has a value of 1, all records were throttled for the stream during the specified time * period. * * When the Maximum statistic has a value of 0 (zero), no records were throttled for the stream during the specified * time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties * * @param props properties of the metric * */ public metricReadProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.readProvisionedThroughputExceededAverage, props); } /** * The number of records rejected due to throttling for the stream over the specified time period. This metric * includes throttling from PutRecord and PutRecords operations. * * When the Minimum statistic has a non-zero value, records were being throttled for the stream during the specified * time period. * * When the Maximum statistic has a value of 0 (zero), no records were being throttled for the stream during the * specified time period. * * The metric defaults to average over 5 minutes, it can be changed by passing `statistic` and `period` properties. * * @param props properties of the metric */ public metricWriteProvisionedThroughputExceeded(props?: cloudwatch.MetricOptions) { return this.metricFromCannedFunction(KinesisMetrics.writeProvisionedThroughputExceededAverage, props); } // create metrics based on generated KinesisMetrics static methods private metricFromCannedFunction( createCannedProps: (dimensions: { StreamName: string }) => cloudwatch.MetricProps, props?: cloudwatch.MetricOptions): cloudwatch.Metric { return new cloudwatch.Metric({ ...createCannedProps({ StreamName: this.streamName }), ...props, }).attachTo(this); } } /** * Properties for a Kinesis Stream */ export interface StreamProps { /** * Enforces a particular physical stream name. * @default <generated> */ readonly streamName?: string; /** * The number of hours for the data records that are stored in shards to remain accessible. * @default Duration.hours(24) */ readonly retentionPeriod?: Duration; /** * The number of shards for the stream. * * Can only be provided if streamMode is Provisioned. * * @default 1 */ readonly shardCount?: number; /** * The kind of server-side encryption to apply to this stream. * * If you choose KMS, you can specify a KMS key via `encryptionKey`. If * encryption key is not specified, a key will automatically be created. * * @default - StreamEncryption.KMS if encrypted Streams are supported in the region * or StreamEncryption.UNENCRYPTED otherwise. * StreamEncryption.KMS if an encryption key is supplied through the encryptionKey property */ readonly encryption?: StreamEncryption; /** * External KMS key to use for stream encryption. * * The 'encryption' property must be set to "Kms". * * @default - Kinesis Data Streams master key ('/alias/aws/kinesis'). * If encryption is set to StreamEncryption.KMS and this property is undefined, a new KMS key * will be created and associated with this stream. */ readonly encryptionKey?: kms.IKey; /** * The capacity mode of this stream. * * @default StreamMode.PROVISIONED */ readonly streamMode?: StreamMode; /** * Policy to apply when the stream is removed from the stack. * * @default RemovalPolicy.RETAIN */ readonly removalPolicy?: RemovalPolicy; } /** * A Kinesis stream. Can be encrypted with a KMS key. */ export class Stream extends StreamBase { /** * Import an existing Kinesis Stream provided an ARN * * @param scope The parent creating construct (usually `this`). * @param id The construct's name * @param streamArn Stream ARN (i.e. arn:aws:kinesis:<region>:<account-id>:stream/Foo) */ public static fromStreamArn(scope: Construct, id: string, streamArn: string): IStream { return Stream.fromStreamAttributes(scope, id, { streamArn }); } /** * Creates a Stream construct that represents an external stream. * * @param scope The parent creating construct (usually `this`). * @param id The construct's name. * @param attrs Stream import properties */ public static fromStreamAttributes(scope: Construct, id: string, attrs: StreamAttributes): IStream { class Import extends StreamBase { public readonly streamArn = attrs.streamArn; public readonly streamName = Stack.of(scope).splitArn(attrs.streamArn, ArnFormat.SLASH_RESOURCE_NAME).resourceName!; public readonly encryptionKey = attrs.encryptionKey; protected readonly autoCreatePolicy = false; } return new Import(scope, id, { environmentFromArn: attrs.streamArn, }); } public readonly streamArn: string; public readonly streamName: string; public readonly encryptionKey?: kms.IKey; private readonly stream: CfnStream; protected readonly autoCreatePolicy = true; constructor(scope: Construct, id: string, props: StreamProps = {}) { super(scope, id, { physicalName: props.streamName, }); // Enhanced CDK Analytics Telemetry addConstructMetadata(this, props); let shardCount = props.shardCount; const streamMode = props.streamMode; if (streamMode === StreamMode.ON_DEMAND && shardCount !== undefined) { throw new ValidationError(`streamMode must be set to ${StreamMode.PROVISIONED} (default) when specifying shardCount`, this); } if ((streamMode === StreamMode.PROVISIONED || streamMode === undefined) && shardCount === undefined) { shardCount = 1; } const retentionPeriodHours = props.retentionPeriod?.toHours() ?? 24; if (!Token.isUnresolved(retentionPeriodHours)) { if (retentionPeriodHours < 24 || retentionPeriodHours > 8760) { throw new ValidationError(`retentionPeriod must be between 24 and 8760 hours. Received ${retentionPeriodHours}`, this); } } const { streamEncryption, encryptionKey } = this.parseEncryption(props); this.stream = new CfnStream(this, 'Resource', { name: this.physicalName, retentionPeriodHours, shardCount, streamEncryption, ...(props.streamMode !== undefined ? { streamModeDetails: { streamMode: props.streamMode }, } : undefined), }); this.stream.applyRemovalPolicy(props.removalPolicy); this.streamArn = this.getResourceArnAttribute(this.stream.attrArn, { service: 'kinesis', resource: 'stream', resourceName: this.physicalName, }); this.streamName = this.getResourceNameAttribute(this.stream.ref); this.encryptionKey = encryptionKey; } /** * Set up key properties and return the Stream encryption property from the * user's configuration. */ private parseEncryption(props: StreamProps): { streamEncryption?: CfnStream.StreamEncryptionProperty | IResolvable; encryptionKey?: kms.IKey; } { // if encryption properties are not set, default to KMS in regions where KMS is available if (!props.encryption && !props.encryptionKey) { const conditionName = 'AwsCdkKinesisEncryptedStreamsUnsupportedRegions'; const existing = Stack.of(this).node.tryFindChild(conditionName); // create a single condition for the Stack if (!existing) { new CfnCondition(Stack.of(this), conditionName, { expression: Fn.conditionOr( Fn.conditionEquals(Aws.REGION, 'cn-north-1'), Fn.conditionEquals(Aws.REGION, 'cn-northwest-1'), ), }); } return { streamEncryption: Fn.conditionIf(conditionName, Aws.NO_VALUE, { EncryptionType: 'KMS', KeyId: 'alias/aws/kinesis' }), }; } // default based on whether encryption key is specified const encryptionType = props.encryption ?? (props.encryptionKey ? StreamEncryption.KMS : StreamEncryption.UNENCRYPTED); // if encryption key is set, encryption must be set to KMS. if (encryptionType !== StreamEncryption.KMS && props.encryptionKey) { throw new ValidationError(`encryptionKey is specified, so 'encryption' must be set to KMS (value: ${encryptionType})`, this); } if (encryptionType === StreamEncryption.UNENCRYPTED) { return {}; } if (encryptionType === StreamEncryption.MANAGED) { const encryption = { encryptionType: 'KMS', keyId: 'alias/aws/kinesis' }; return { streamEncryption: encryption }; } if (encryptionType === StreamEncryption.KMS) { const encryptionKey = props.encryptionKey || new kms.Key(this, 'Key', { description: `Created by ${this.node.path}`, }); const streamEncryption: CfnStream.StreamEncryptionProperty = { encryptionType: 'KMS', keyId: encryptionKey.keyArn, }; return { encryptionKey, streamEncryption }; } throw new ValidationError(`Unexpected 'encryptionType': ${encryptionType}`, this); } } /** * What kind of server-side encryption to apply to this stream */ export enum StreamEncryption { /** * Records in the stream are not encrypted. */ UNENCRYPTED = 'NONE', /** * Server-side encryption with a KMS key managed by the user. * If `encryptionKey` is specified, this key will be used, otherwise, one will be defined. */ KMS = 'KMS', /** * Server-side encryption with a master key managed by Amazon Kinesis */ MANAGED = 'MANAGED', } /** * Specifies the capacity mode to apply to this stream. */ export enum StreamMode { /** * Specify the provisioned capacity mode. The stream will have `shardCount` shards unless * modified and will be billed according to the provisioned capacity. */ PROVISIONED = 'PROVISIONED', /** * Specify the on-demand capacity mode. The stream will autoscale and be billed according to the * volume of data ingested and retrieved. */ ON_DEMAND = 'ON_DEMAND', }