constructor()

in packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts [405:542]


  constructor(scope: Construct, id: string, props: EventSourceMappingProps) {
    super(scope, id);
    // Enhanced CDK Analytics Telemetry
    addConstructMetadata(this, props);

    if (props.eventSourceArn == undefined && props.kafkaBootstrapServers == undefined) {
      throw new ValidationError('Either eventSourceArn or kafkaBootstrapServers must be set', this);
    }

    if (props.eventSourceArn !== undefined && props.kafkaBootstrapServers !== undefined) {
      throw new ValidationError('eventSourceArn and kafkaBootstrapServers are mutually exclusive', this);
    }

    if (props.provisionedPollerConfig) {
      const { minimumPollers, maximumPollers } = props.provisionedPollerConfig;
      if (minimumPollers != undefined) {
        if (minimumPollers < 1 || minimumPollers > 200) {
          throw new ValidationError('Minimum provisioned pollers must be between 1 and 200 inclusive', this);
        }
      }
      if (maximumPollers != undefined) {
        if (maximumPollers < 1 || maximumPollers > 2000) {
          throw new ValidationError('Maximum provisioned pollers must be between 1 and 2000 inclusive', this);
        }
      }
      if (minimumPollers != undefined && maximumPollers != undefined) {
        if (minimumPollers > maximumPollers) {
          throw new ValidationError('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers', this);
        }
      }
    }

    if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) {
      throw new ValidationError('kafkaBootStrapServers must not be empty if set', this);
    }

    if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
      throw new ValidationError(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`, this);
    }

    if (props.maxConcurrency && !cdk.Token.isUnresolved(props.maxConcurrency) && (props.maxConcurrency < 2 || props.maxConcurrency > 1000)) {
      throw new ValidationError('maxConcurrency must be between 2 and 1000 concurrent instances', this);
    }

    if (props.maxRecordAge && (props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({ integral: false }) > 7)) {
      throw new ValidationError('maxRecordAge must be between 60 seconds and 7 days inclusive', this);
    }

    props.retryAttempts !== undefined && cdk.withResolved(props.retryAttempts, (attempts) => {
      // Allow -1 for infinite retries, otherwise validate the 0-10000 range
      if (!(attempts === -1 || (attempts >= 0 && attempts <= 10000))) {
        throw new ValidationError(`retryAttempts must be -1 (for infinite) or between 0 and 10000 inclusive, got ${attempts}`, this);
      }
    });

    props.parallelizationFactor !== undefined && cdk.withResolved(props.parallelizationFactor, (factor) => {
      if (factor < 1 || factor > 10) {
        throw new ValidationError(`parallelizationFactor must be between 1 and 10 inclusive, got ${factor}`, this);
      }
    });

    if (props.tumblingWindow && !cdk.Token.isUnresolved(props.tumblingWindow) && props.tumblingWindow.toSeconds() > 900) {
      throw new ValidationError(`tumblingWindow cannot be over 900 seconds, got ${props.tumblingWindow.toSeconds()}`, this);
    }

    if (props.startingPosition === StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) {
      throw new ValidationError('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP', this);
    }

    if (props.startingPosition !== StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) {
      throw new ValidationError('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP', this);
    }

    if (props.kafkaConsumerGroupId) {
      this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId);
    }

    if (props.filterEncryption !== undefined && props.filters == undefined) {
      throw new ValidationError('filter criteria must be provided to enable setting filter criteria encryption', this);
    }

    /**
     * Grants the Lambda function permission to decrypt data using the specified KMS key.
     * This step is necessary for setting up encrypted filter criteria.
     *
     * If the KMS key was created within this CloudFormation stack (via `new Key`), a Key policy
     * will be attached to the key to allow the Lambda function to access it. However, if the
     * Key is imported from an existing ARN (`Key.fromKeyArn`), no action will be taken.
     */
    if (props.filterEncryption !== undefined) {
      const lambdaPrincipal = new iam.ServicePrincipal('lambda.amazonaws.com');
      props.filterEncryption.grantDecrypt(lambdaPrincipal);
    }

    let destinationConfig;

    if (props.onFailure) {
      destinationConfig = {
        onFailure: props.onFailure.bind(this, props.target),
      };
    }

    let selfManagedEventSource;
    if (props.kafkaBootstrapServers) {
      selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } };
    }

    let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined;

    const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
      batchSize: props.batchSize,
      bisectBatchOnFunctionError: props.bisectBatchOnError,
      destinationConfig,
      enabled: props.enabled,
      eventSourceArn: props.eventSourceArn,
      functionName: props.target.functionName,
      startingPosition: props.startingPosition,
      startingPositionTimestamp: props.startingPositionTimestamp,
      functionResponseTypes: props.reportBatchItemFailures ? ['ReportBatchItemFailures'] : undefined,
      maximumBatchingWindowInSeconds: props.maxBatchingWindow?.toSeconds(),
      maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(),
      maximumRetryAttempts: props.retryAttempts,
      parallelizationFactor: props.parallelizationFactor,
      topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
      tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
      scalingConfig: props.maxConcurrency ? { maximumConcurrency: props.maxConcurrency } : undefined,
      sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
      selfManagedEventSource,
      filterCriteria: props.filters ? { filters: props.filters }: undefined,
      kmsKeyArn: props.filterEncryption?.keyArn,
      selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined,
      amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined,
      provisionedPollerConfig: props.provisionedPollerConfig,
      metricsConfig: props.metricsConfig,
    });
    this.eventSourceMappingId = cfnEventSourceMapping.ref;
    this.eventSourceMappingArn = EventSourceMapping.formatArn(this, this.eventSourceMappingId);
  }