packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts (220 lines of code) (raw):

import { Construct } from 'constructs'; import { IEventSourceDlq } from './dlq'; import { IFunction } from './function-base'; import { CfnEventSourceMapping } from './lambda.generated'; import * as iam from '../../aws-iam'; import { IKey } from '../../aws-kms'; import * as cdk from '../../core'; import { ValidationError } from '../../core/lib/errors'; import { addConstructMetadata } from '../../core/lib/metadata-resource'; /** * The type of authentication protocol or the VPC components for your event source's SourceAccessConfiguration * @see https://docs.aws.amazon.com/lambda/latest/dg/API_SourceAccessConfiguration.html#SSS-Type-SourceAccessConfiguration-Type */ export class SourceAccessConfigurationType { /** * (MQ) The Secrets Manager secret that stores your broker credentials. */ public static readonly BASIC_AUTH = new SourceAccessConfigurationType('BASIC_AUTH'); /** * The subnets associated with your VPC. Lambda connects to these subnets to fetch data from your Self-Managed Apache Kafka cluster. */ public static readonly VPC_SUBNET = new SourceAccessConfigurationType('VPC_SUBNET'); /** * The VPC security group used to manage access to your Self-Managed Apache Kafka brokers. */ public static readonly VPC_SECURITY_GROUP = new SourceAccessConfigurationType('VPC_SECURITY_GROUP'); /** * The Secrets Manager ARN of your secret key used for SASL SCRAM-256 authentication of your Self-Managed Apache Kafka brokers. */ public static readonly SASL_SCRAM_256_AUTH = new SourceAccessConfigurationType('SASL_SCRAM_256_AUTH'); /** * The Secrets Manager ARN of your secret key used for SASL SCRAM-512 authentication of your Self-Managed Apache Kafka brokers. */ public static readonly SASL_SCRAM_512_AUTH = new SourceAccessConfigurationType('SASL_SCRAM_512_AUTH'); /** * The Secrets Manager ARN of your secret key containing the certificate chain (X.509 PEM), private key (PKCS#8 PEM), * and private key password (optional) used for mutual TLS authentication of your MSK/Apache Kafka brokers. */ public static readonly CLIENT_CERTIFICATE_TLS_AUTH = new SourceAccessConfigurationType('CLIENT_CERTIFICATE_TLS_AUTH'); /** * The Secrets Manager ARN of your secret key containing the root CA certificate (X.509 PEM) used for TLS encryption of your Apache Kafka brokers. */ public static readonly SERVER_ROOT_CA_CERTIFICATE = new SourceAccessConfigurationType('SERVER_ROOT_CA_CERTIFICATE'); /** * The name of the virtual host in your RabbitMQ broker. Lambda uses this RabbitMQ host as the event source. */ public static readonly VIRTUAL_HOST = new SourceAccessConfigurationType('VIRTUAL_HOST'); /** A custom source access configuration property */ public static of(name: string): SourceAccessConfigurationType { return new SourceAccessConfigurationType(name); } /** * The key to use in `SourceAccessConfigurationProperty.Type` property in CloudFormation * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html#cfn-lambda-eventsourcemapping-sourceaccessconfiguration-type */ public readonly type: string; private constructor(type: string) { this.type = type; } } /** * Specific settings like the authentication protocol or the VPC components to secure access to your event source. */ export interface SourceAccessConfiguration { /** * The type of authentication protocol or the VPC components for your event source. For example: "SASL_SCRAM_512_AUTH". */ readonly type: SourceAccessConfigurationType; /** * The value for your chosen configuration in type. * For example: "URI": "arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName". * The exact string depends on the type. * @see SourceAccessConfigurationType */ readonly uri: string; } /** * (Amazon MSK and self-managed Apache Kafka only) The provisioned mode configuration for the event source. */ export interface ProvisionedPollerConfig { /** * The minimum number of pollers that should be provisioned. * * @default - 1 */ readonly minimumPollers?: number; /** * The maximum number of pollers that can be provisioned. * * @default - 200 */ readonly maximumPollers?: number; } export interface EventSourceMappingOptions { /** * The Amazon Resource Name (ARN) of the event source. Any record added to * this stream can invoke the Lambda function. * * @default - not set if using a self managed Kafka cluster, throws an error otherwise */ readonly eventSourceArn?: string; /** * The largest number of records that AWS Lambda will retrieve from your event * source at the time of invoking your function. Your function receives an * event with all the retrieved records. * * Valid Range: Minimum value of 1. Maximum value of 10000. * * @default - Amazon Kinesis, Amazon DynamoDB, and Amazon MSK is 100 records. * The default for Amazon SQS is 10 messages. For standard SQS queues, the maximum is 10,000. For FIFO SQS queues, the maximum is 10. */ readonly batchSize?: number; /** * If the function returns an error, split the batch in two and retry. * * @default false */ readonly bisectBatchOnError?: boolean; /** * An Amazon S3, Amazon SQS queue or Amazon SNS topic destination for discarded records. * * @default discarded records are ignored */ readonly onFailure?: IEventSourceDlq; /** * Set to false to disable the event source upon creation. * * @default true */ readonly enabled?: boolean; /** * The position in the DynamoDB, Kinesis or MSK stream where AWS Lambda should * start reading. * * @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType * * @default - no starting position */ readonly startingPosition?: StartingPosition; /** * The time from which to start reading, in Unix time seconds. * * @default - no timestamp */ readonly startingPositionTimestamp?: number; /** * Allow functions to return partially successful responses for a batch of records. * * @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting * * @default false */ readonly reportBatchItemFailures?: boolean; /** * The maximum amount of time to gather records before invoking the function. * Maximum of Duration.minutes(5) * * @default Duration.seconds(0) */ readonly maxBatchingWindow?: cdk.Duration; /** * The maximum concurrency setting limits the number of concurrent instances of the function that an Amazon SQS event source can invoke. * * @see https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency * * Valid Range: Minimum value of 2. Maximum value of 1000. * * @default - No specific limit. */ readonly maxConcurrency?: number; /** * The maximum age of a record that Lambda sends to a function for processing. * Valid Range: * * Minimum value of 60 seconds * * Maximum value of 7 days * * @default - infinite or until the record expires. */ readonly maxRecordAge?: cdk.Duration; /** * The maximum number of times to retry when the function returns an error. * Set to `undefined` if you want lambda to keep retrying infinitely or until * the record expires. * * Valid Range: * * Minimum value of 0 * * Maximum value of 10000 * * @default - infinite or until the record expires. */ readonly retryAttempts?: number; /** * The number of batches to process from each shard concurrently. * Valid Range: * * Minimum value of 1 * * Maximum value of 10 * * @default 1 */ readonly parallelizationFactor?: number; /** * The name of the Kafka topic. * * @default - no topic */ readonly kafkaTopic?: string; /** * The size of the tumbling windows to group records sent to DynamoDB or Kinesis * * @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows * * Valid Range: 0 - 15 minutes * * @default - None */ readonly tumblingWindow?: cdk.Duration; /** * A list of host and port pairs that are the addresses of the Kafka brokers in a self managed "bootstrap" Kafka cluster * that a Kafka client connects to initially to bootstrap itself. * They are in the format `abc.example.com:9096`. * * @default - none */ readonly kafkaBootstrapServers?: string[]; /** * The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a length between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id). * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-amazonmanagedkafkaeventsourceconfig.html * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html * * @default - none */ readonly kafkaConsumerGroupId?: string; /** * Specific settings like the authentication protocol or the VPC components to secure access to your event source. * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html * * @default - none */ readonly sourceAccessConfigurations?: SourceAccessConfiguration[]; /** * Add filter criteria to Event Source * @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html * * @default - none */ readonly filters?: Array<{[key: string]: any}>; /** * Add Customer managed KMS key to encrypt Filter Criteria. * @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html * By default, Lambda will encrypt Filter Criteria using AWS managed keys * @see https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#aws-managed-cmk * * @default - none */ readonly filterEncryption?: IKey; /** * Check if support S3 onfailure destination(OFD). Kinesis, DynamoDB, MSK and self managed kafka event support S3 OFD * @default false */ readonly supportS3OnFailureDestination?: boolean; /** * Configuration for provisioned pollers that read from the event source. * When specified, allows control over the minimum and maximum number of pollers * that can be provisioned to process events from the source. * @default - no provisioned pollers */ readonly provisionedPollerConfig?: ProvisionedPollerConfig; /** * Configuration for enhanced monitoring metrics collection * When specified, enables collection of additional metrics for the stream event source * * @default - Enhanced monitoring is disabled */ readonly metricsConfig?: MetricsConfig; } export enum MetricType { /** * Event Count metrics provide insights into the processing behavior of your event source mapping, * including the number of events successfully processed, filtered out, or dropped. * These metrics help you monitor the flow and status of events through your event source mapping. */ EVENT_COUNT = 'EventCount', } /** * Configuration for collecting metrics from the event source */ export interface MetricsConfig { /** * List of metric types to enable for this event source */ readonly metrics: MetricType[]; } /** * Properties for declaring a new event source mapping. */ export interface EventSourceMappingProps extends EventSourceMappingOptions { /** * The target AWS Lambda function. */ readonly target: IFunction; } /** * Represents an event source mapping for a lambda function. * @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html */ export interface IEventSourceMapping extends cdk.IResource { /** * The identifier for this EventSourceMapping * @attribute */ readonly eventSourceMappingId: string; /** * The ARN of the event source mapping (i.e. arn:aws:lambda:region:account-id:event-source-mapping/event-source-mapping-id) */ readonly eventSourceMappingArn: string; } /** * Defines a Lambda EventSourceMapping resource. * * Usually, you won't need to define the mapping yourself. This will usually be done by * event sources. For example, to add an SQS event source to a function: * * ```ts * import * as sqs from 'aws-cdk-lib/aws-sqs'; * import * as eventsources from 'aws-cdk-lib/aws-lambda-event-sources'; * * declare const handler: lambda.Function; * declare const queue: sqs.Queue; * * handler.addEventSource(new eventsources.SqsEventSource(queue)); * ``` * * The `SqsEventSource` class will automatically create the mapping, and will also * modify the Lambda's execution role so it can consume messages from the queue. */ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapping { /** * Import an event source into this stack from its event source id. */ public static fromEventSourceMappingId(scope: Construct, id: string, eventSourceMappingId: string): IEventSourceMapping { const eventSourceMappingArn = EventSourceMapping.formatArn(scope, eventSourceMappingId, ); class Import extends cdk.Resource implements IEventSourceMapping { public readonly eventSourceMappingId = eventSourceMappingId; public readonly eventSourceMappingArn = eventSourceMappingArn; } return new Import(scope, id); } private static formatArn(scope: Construct, eventSourceMappingId: string): string { return cdk.Stack.of(scope).formatArn({ service: 'lambda', resource: 'event-source-mapping', resourceName: eventSourceMappingId, arnFormat: cdk.ArnFormat.COLON_RESOURCE_NAME, }); } public readonly eventSourceMappingId: string; public readonly eventSourceMappingArn: string; constructor(scope: Construct, id: string, props: EventSourceMappingProps) { super(scope, id); // Enhanced CDK Analytics Telemetry addConstructMetadata(this, props); if (props.eventSourceArn == undefined && props.kafkaBootstrapServers == undefined) { throw new ValidationError('Either eventSourceArn or kafkaBootstrapServers must be set', this); } if (props.eventSourceArn !== undefined && props.kafkaBootstrapServers !== undefined) { throw new ValidationError('eventSourceArn and kafkaBootstrapServers are mutually exclusive', this); } if (props.provisionedPollerConfig) { const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; if (minimumPollers != undefined) { if (minimumPollers < 1 || minimumPollers > 200) { throw new ValidationError('Minimum provisioned pollers must be between 1 and 200 inclusive', this); } } if (maximumPollers != undefined) { if (maximumPollers < 1 || maximumPollers > 2000) { throw new ValidationError('Maximum provisioned pollers must be between 1 and 2000 inclusive', this); } } if (minimumPollers != undefined && maximumPollers != undefined) { if (minimumPollers > maximumPollers) { throw new ValidationError('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers', this); } } } if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) { throw new ValidationError('kafkaBootStrapServers must not be empty if set', this); } if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) { throw new ValidationError(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`, this); } if (props.maxConcurrency && !cdk.Token.isUnresolved(props.maxConcurrency) && (props.maxConcurrency < 2 || props.maxConcurrency > 1000)) { throw new ValidationError('maxConcurrency must be between 2 and 1000 concurrent instances', this); } if (props.maxRecordAge && (props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({ integral: false }) > 7)) { throw new ValidationError('maxRecordAge must be between 60 seconds and 7 days inclusive', this); } props.retryAttempts !== undefined && cdk.withResolved(props.retryAttempts, (attempts) => { // Allow -1 for infinite retries, otherwise validate the 0-10000 range if (!(attempts === -1 || (attempts >= 0 && attempts <= 10000))) { throw new ValidationError(`retryAttempts must be -1 (for infinite) or between 0 and 10000 inclusive, got ${attempts}`, this); } }); props.parallelizationFactor !== undefined && cdk.withResolved(props.parallelizationFactor, (factor) => { if (factor < 1 || factor > 10) { throw new ValidationError(`parallelizationFactor must be between 1 and 10 inclusive, got ${factor}`, this); } }); if (props.tumblingWindow && !cdk.Token.isUnresolved(props.tumblingWindow) && props.tumblingWindow.toSeconds() > 900) { throw new ValidationError(`tumblingWindow cannot be over 900 seconds, got ${props.tumblingWindow.toSeconds()}`, this); } if (props.startingPosition === StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { throw new ValidationError('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP', this); } if (props.startingPosition !== StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { throw new ValidationError('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP', this); } if (props.kafkaConsumerGroupId) { this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId); } if (props.filterEncryption !== undefined && props.filters == undefined) { throw new ValidationError('filter criteria must be provided to enable setting filter criteria encryption', this); } /** * Grants the Lambda function permission to decrypt data using the specified KMS key. * This step is necessary for setting up encrypted filter criteria. * * If the KMS key was created within this CloudFormation stack (via `new Key`), a Key policy * will be attached to the key to allow the Lambda function to access it. However, if the * Key is imported from an existing ARN (`Key.fromKeyArn`), no action will be taken. */ if (props.filterEncryption !== undefined) { const lambdaPrincipal = new iam.ServicePrincipal('lambda.amazonaws.com'); props.filterEncryption.grantDecrypt(lambdaPrincipal); } let destinationConfig; if (props.onFailure) { destinationConfig = { onFailure: props.onFailure.bind(this, props.target), }; } let selfManagedEventSource; if (props.kafkaBootstrapServers) { selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } }; } let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined; const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', { batchSize: props.batchSize, bisectBatchOnFunctionError: props.bisectBatchOnError, destinationConfig, enabled: props.enabled, eventSourceArn: props.eventSourceArn, functionName: props.target.functionName, startingPosition: props.startingPosition, startingPositionTimestamp: props.startingPositionTimestamp, functionResponseTypes: props.reportBatchItemFailures ? ['ReportBatchItemFailures'] : undefined, maximumBatchingWindowInSeconds: props.maxBatchingWindow?.toSeconds(), maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(), maximumRetryAttempts: props.retryAttempts, parallelizationFactor: props.parallelizationFactor, topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined, tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(), scalingConfig: props.maxConcurrency ? { maximumConcurrency: props.maxConcurrency } : undefined, sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}), selfManagedEventSource, filterCriteria: props.filters ? { filters: props.filters }: undefined, kmsKeyArn: props.filterEncryption?.keyArn, selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined, amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined, provisionedPollerConfig: props.provisionedPollerConfig, metricsConfig: props.metricsConfig, }); this.eventSourceMappingId = cfnEventSourceMapping.ref; this.eventSourceMappingArn = EventSourceMapping.formatArn(this, this.eventSourceMappingId); } private validateKafkaConsumerGroupIdOrThrow(kafkaConsumerGroupId: string) { if (cdk.Token.isUnresolved(kafkaConsumerGroupId)) { return; } if (kafkaConsumerGroupId.length > 200 || kafkaConsumerGroupId.length < 1) { throw new ValidationError('kafkaConsumerGroupId must be a valid string between 1 and 200 characters', this); } const regex = new RegExp(/[a-zA-Z0-9-\/*:_+=.@-]*/); const patternMatch = regex.exec(kafkaConsumerGroupId); if (patternMatch === null || patternMatch[0] !== kafkaConsumerGroupId) { throw new ValidationError('kafkaConsumerGroupId contains invalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"', this); } } } /** * The position in the DynamoDB, Kinesis or MSK stream where AWS Lambda should start * reading. */ export enum StartingPosition { /** * Start reading at the last untrimmed record in the shard in the system, * which is the oldest data record in the shard. */ TRIM_HORIZON = 'TRIM_HORIZON', /** * Start reading just after the most recent record in the shard, so that you * always read the most recent data in the shard */ LATEST = 'LATEST', /** * Start reading from a position defined by a time stamp. * Only supported for Amazon Kinesis streams, otherwise an error will occur. * If supplied, `startingPositionTimestamp` must also be set. */ AT_TIMESTAMP = 'AT_TIMESTAMP', }