constructor()

in packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts [463:702]


  constructor(scope: constructs.Construct, id: string, props: ClusterProps) {
    super(scope, id, { physicalName: props.clusterName });
    // Enhanced CDK Analytics Telemetry
    addConstructMetadata(this, props);

    const subnetSelection = props.vpc.selectSubnets(props.vpcSubnets);

    this._connections = new ec2.Connections({
      securityGroups: props.securityGroups ?? [
        new ec2.SecurityGroup(this, 'SecurityGroup', {
          description: 'MSK security group',
          vpc: props.vpc,
        }),
      ],
    });

    if (subnetSelection.subnets.length < 2) {
      throw new core.ValidationError(`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}`, this);
    }

    if (props.encryptionInTransit?.clientBroker === ClientBrokerEncryption.PLAINTEXT && props.clientAuthentication) {
      throw new core.ValidationError('To enable client authentication, you must enabled TLS-encrypted traffic between clients and brokers.', this);
    } else if (
      props.encryptionInTransit?.clientBroker ===
        ClientBrokerEncryption.TLS_PLAINTEXT &&
      (props.clientAuthentication?.saslProps?.scram ||
        props.clientAuthentication?.saslProps?.iam)
    ) {
      throw new core.ValidationError('To enable SASL/SCRAM or IAM authentication, you must only allow TLS-encrypted traffic between clients and brokers.', this);
    }

    const volumeSize = props.ebsStorageInfo?.volumeSize ?? 1000;
    // Minimum: 1 GiB, maximum: 16384 GiB
    if (volumeSize < 1 || volumeSize > 16384) {
      throw new core.ValidationError('EBS volume size should be in the range 1-16384', this);
    }

    const instanceType = props.instanceType
      ? this.mskInstanceType(props.instanceType)
      : this.mskInstanceType(
        ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
      );

    if (props.storageMode && props.storageMode === StorageMode.TIERED) {
      if (!props.kafkaVersion.isTieredStorageCompatible()) {
        throw new core.ValidationError(`To deploy a tiered cluster you must select a compatible Kafka version, got ${props.kafkaVersion.version}`, this);
      }
      if (instanceType === this.mskInstanceType(
        ec2.InstanceType.of(ec2.InstanceClass.T3, ec2.InstanceSize.SMALL),
      )) {
        throw new core.ValidationError('Tiered storage doesn\'t support broker type t3.small', this);
      }
    }

    const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
      ? {
        dataVolumeKmsKeyId:
            props.ebsStorageInfo.encryptionKey.keyId,
      }
      : undefined; // MSK will create the managed key

    const encryptionInTransit = {
      clientBroker:
        props.encryptionInTransit?.clientBroker ??
        ClientBrokerEncryption.TLS,
      inCluster: props.encryptionInTransit?.enableInCluster ?? true,
    };

    const openMonitoring =
      props.monitoring?.enablePrometheusJmxExporter ||
      props.monitoring?.enablePrometheusNodeExporter
        ? {
          prometheus: {
            jmxExporter: props.monitoring?.enablePrometheusJmxExporter
              ? { enabledInBroker: true }
              : undefined,
            nodeExporter: props.monitoring
              ?.enablePrometheusNodeExporter
              ? { enabledInBroker: true }
              : undefined,
          },
        }
        : undefined;

    const loggingBucket = props.logging?.s3?.bucket;
    if (loggingBucket && FeatureFlags.of(this).isEnabled(S3_CREATE_DEFAULT_LOGGING_POLICY)) {
      const stack = core.Stack.of(this);
      loggingBucket.addToResourcePolicy(new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        principals: [
          new iam.ServicePrincipal('delivery.logs.amazonaws.com'),
        ],
        resources: [
          loggingBucket.arnForObjects(`AWSLogs/${stack.account}/*`),
        ],
        actions: ['s3:PutObject'],
        conditions: {
          StringEquals: {
            's3:x-amz-acl': 'bucket-owner-full-control',
            'aws:SourceAccount': stack.account,
          },
          ArnLike: {
            'aws:SourceArn': stack.formatArn({
              service: 'logs',
              resource: '*',
            }),
          },
        },
      }));

      loggingBucket.addToResourcePolicy(new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        principals: [
          new iam.ServicePrincipal('delivery.logs.amazonaws.com'),
        ],
        resources: [loggingBucket.bucketArn],
        actions: [
          's3:GetBucketAcl',
          's3:ListBucket',
        ],
        conditions: {
          StringEquals: {
            'aws:SourceAccount': stack.account,
          },
          ArnLike: {
            'aws:SourceArn': stack.formatArn({
              service: 'logs',
              resource: '*',
            }),
          },
        },
      }));
    }
    const loggingInfo = {
      brokerLogs: {
        cloudWatchLogs: {
          enabled:
            props.logging?.cloudwatchLogGroup !== undefined,
          logGroup:
            props.logging?.cloudwatchLogGroup?.logGroupName,
        },
        firehose: {
          enabled:
            props.logging?.firehoseDeliveryStreamName !==
            undefined,
          deliveryStream:
            props.logging?.firehoseDeliveryStreamName,
        },
        s3: {
          enabled: loggingBucket !== undefined,
          bucket: loggingBucket?.bucketName,
          prefix: props.logging?.s3?.prefix,
        },
      },
    };

    if (props.clientAuthentication?.saslProps?.scram && props.clientAuthentication?.saslProps?.key === undefined) {
      this.saslScramAuthenticationKey = new kms.Key(this, 'SASLKey', {
        description: 'Used for encrypting MSK secrets for SASL/SCRAM authentication.',
        alias: `msk/${props.clusterName}/sasl/scram`,
      });

      // https://docs.aws.amazon.com/kms/latest/developerguide/services-secrets-manager.html#asm-policies
      this.saslScramAuthenticationKey.addToResourcePolicy(
        new iam.PolicyStatement({
          sid:
            'Allow access through AWS Secrets Manager for all principals in the account that are authorized to use AWS Secrets Manager',
          principals: [new iam.AnyPrincipal()],
          actions: [
            'kms:Encrypt',
            'kms:Decrypt',
            'kms:ReEncrypt*',
            'kms:GenerateDataKey*',
            'kms:CreateGrant',
            'kms:DescribeKey',
          ],
          resources: ['*'],
          conditions: {
            StringEquals: {
              'kms:ViaService': `secretsmanager.${core.Stack.of(this).region}.amazonaws.com`,
              'kms:CallerAccount': core.Stack.of(this).account,
            },
          },
        }),
      );
    }

    let clientAuthentication: CfnCluster.ClientAuthenticationProperty | undefined;
    if (props.clientAuthentication) {
      const { saslProps, tlsProps } = props.clientAuthentication;
      clientAuthentication = {
        sasl: saslProps ? {
          iam: saslProps.iam ? { enabled: true }: undefined,
          scram: saslProps.scram ? { enabled: true }: undefined,
        } : undefined,
        tls: tlsProps?.certificateAuthorities ? {
          certificateAuthorityArnList: tlsProps.certificateAuthorities?.map((ca) => ca.certificateAuthorityArn),
        } : undefined,
      };
    }

    const resource = new CfnCluster(this, 'Resource', {
      clusterName: props.clusterName,
      kafkaVersion: props.kafkaVersion.version,
      numberOfBrokerNodes:
        props.numberOfBrokerNodes !== undefined ?
          subnetSelection.availabilityZones.length * props.numberOfBrokerNodes : subnetSelection.availabilityZones.length,
      brokerNodeGroupInfo: {
        instanceType,
        clientSubnets: subnetSelection.subnetIds,
        securityGroups: this.connections.securityGroups.map(
          (group) => group.securityGroupId,
        ),
        storageInfo: {
          ebsStorageInfo: {
            volumeSize: volumeSize,
          },
        },
      },
      encryptionInfo: {
        encryptionAtRest,
        encryptionInTransit,
      },
      configurationInfo: props.configurationInfo,
      enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
      openMonitoring: openMonitoring,
      storageMode: props.storageMode,
      loggingInfo: loggingInfo,
      clientAuthentication,
    });

    this.clusterName = this.getResourceNameAttribute(
      core.Fn.select(1, core.Fn.split('/', resource.ref)),
    );
    this.clusterArn = resource.ref;

    resource.applyRemovalPolicy(props.removalPolicy, {
      default: core.RemovalPolicy.RETAIN,
    });
  }