constructor()

in packages/@aws-cdk/aws-msk/lib/cluster.ts [401:625]


  constructor(scope: constructs.Construct, id: string, props: ClusterProps) {
    super(scope, id, {
      physicalName: props.clusterName,
    });

    const subnetSelection = props.vpc.selectSubnets(props.vpcSubnets);

    this._connections = new ec2.Connections({
      securityGroups: props.securityGroups ?? [
        new ec2.SecurityGroup(this, 'SecurityGroup', {
          description: 'MSK security group',
          vpc: props.vpc,
        }),
      ],
    });

    if (subnetSelection.subnets.length < 2) {
      throw Error(
        `Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}`,
      );
    }

    if (
      !core.Token.isUnresolved(props.clusterName) &&
      !/^[a-zA-Z0-9]+$/.test(props.clusterName) &&
      props.clusterName.length > 64
    ) {
      throw Error(
        'The cluster name must only contain alphanumeric characters and have a maximum length of 64 characters.' +
          `got: '${props.clusterName}. length: ${props.clusterName.length}'`,
      );
    }

    if (
      props.clientAuthentication?.saslProps?.iam &&
      props.clientAuthentication?.saslProps?.scram
    ) {
      throw Error('Only one client authentication method can be enabled.');
    }

    if (
      props.encryptionInTransit?.clientBroker ===
        ClientBrokerEncryption.PLAINTEXT &&
      props.clientAuthentication
    ) {
      throw Error(
        'To enable client authentication, you must enabled TLS-encrypted traffic between clients and brokers.',
      );
    } else if (
      props.encryptionInTransit?.clientBroker ===
        ClientBrokerEncryption.TLS_PLAINTEXT &&
      (props.clientAuthentication?.saslProps?.scram ||
        props.clientAuthentication?.saslProps?.iam)
    ) {
      throw Error(
        'To enable SASL/SCRAM or IAM authentication, you must only allow TLS-encrypted traffic between clients and brokers.',
      );
    }

    const volumeSize =
      props.ebsStorageInfo?.volumeSize ?? 1000;
    // Minimum: 1 GiB, maximum: 16384 GiB
    if (volumeSize < 1 || volumeSize > 16384) {
      throw Error(
        'EBS volume size should be in the range 1-16384',
      );
    }

    const instanceType = props.instanceType
      ? this.mskInstanceType(props.instanceType)
      : this.mskInstanceType(
        ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
      );

    const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
      ? {
        dataVolumeKmsKeyId:
            props.ebsStorageInfo.encryptionKey.keyId,
      }
      : undefined; // MSK will create the managed key

    const encryptionInTransit = {
      clientBroker:
        props.encryptionInTransit?.clientBroker ??
        ClientBrokerEncryption.TLS,
      inCluster: props.encryptionInTransit?.enableInCluster ?? true,
    };

    const openMonitoring =
      props.monitoring?.enablePrometheusJmxExporter ||
      props.monitoring?.enablePrometheusNodeExporter
        ? {
          prometheus: {
            jmxExporter: props.monitoring?.enablePrometheusJmxExporter
              ? { enabledInBroker: true }
              : undefined,
            nodeExporter: props.monitoring
              ?.enablePrometheusNodeExporter
              ? { enabledInBroker: true }
              : undefined,
          },
        }
        : undefined;

    const loggingInfo = {
      brokerLogs: {
        cloudWatchLogs: {
          enabled:
            props.logging?.cloudwatchLogGroup !== undefined,
          logGroup:
            props.logging?.cloudwatchLogGroup?.logGroupName,
        },
        firehose: {
          enabled:
            props.logging?.firehoseDeliveryStreamName !==
            undefined,
          deliveryStream:
            props.logging?.firehoseDeliveryStreamName,
        },
        s3: {
          enabled: props.logging?.s3?.bucket !== undefined,
          bucket: props.logging?.s3?.bucket.bucketName,
          prefix: props.logging?.s3?.prefix,
        },
      },
    };

    if (
      props.clientAuthentication?.saslProps?.scram &&
      props.clientAuthentication?.saslProps?.key === undefined
    ) {
      this.saslScramAuthenticationKey = new kms.Key(this, 'SASLKey', {
        description:
          'Used for encrypting MSK secrets for SASL/SCRAM authentication.',
        alias: `msk/${props.clusterName}/sasl/scram`,
      });

      // https://docs.aws.amazon.com/kms/latest/developerguide/services-secrets-manager.html#asm-policies
      this.saslScramAuthenticationKey.addToResourcePolicy(
        new iam.PolicyStatement({
          sid:
            'Allow access through AWS Secrets Manager for all principals in the account that are authorized to use AWS Secrets Manager',
          principals: [new iam.AnyPrincipal()],
          actions: [
            'kms:Encrypt',
            'kms:Decrypt',
            'kms:ReEncrypt*',
            'kms:GenerateDataKey*',
            'kms:CreateGrant',
            'kms:DescribeKey',
          ],
          resources: ['*'],
          conditions: {
            StringEquals: {
              'kms:ViaService': `secretsmanager.${core.Stack.of(this).region}.amazonaws.com`,
              'kms:CallerAccount': core.Stack.of(this).account,
            },
          },
        }),
      );
    }

    let clientAuthentication;
    if (props.clientAuthentication?.saslProps?.iam) {
      clientAuthentication = {
        sasl: { iam: { enabled: props.clientAuthentication.saslProps.iam } },
      };
    } else if (props.clientAuthentication?.saslProps?.scram) {
      clientAuthentication = {
        sasl: {
          scram: {
            enabled: props.clientAuthentication.saslProps.scram,
          },
        },
      };
    } else if (
      props.clientAuthentication?.tlsProps?.certificateAuthorities !== undefined
    ) {
      clientAuthentication = {
        tls: {
          certificateAuthorityArnList: props.clientAuthentication?.tlsProps?.certificateAuthorities.map(
            (ca) => ca.certificateAuthorityArn,
          ),
        },
      };
    }

    const resource = new CfnCluster(this, 'Resource', {
      clusterName: props.clusterName,
      kafkaVersion: props.kafkaVersion.version,
      numberOfBrokerNodes:
        props.numberOfBrokerNodes !== undefined ?
          subnetSelection.availabilityZones.length * props.numberOfBrokerNodes : subnetSelection.availabilityZones.length,
      brokerNodeGroupInfo: {
        instanceType,
        clientSubnets: subnetSelection.subnetIds,
        securityGroups: this.connections.securityGroups.map(
          (group) => group.securityGroupId,
        ),
        storageInfo: {
          ebsStorageInfo: {
            volumeSize: volumeSize,
          },
        },
      },
      encryptionInfo: {
        encryptionAtRest,
        encryptionInTransit,
      },
      configurationInfo: props.configurationInfo,
      enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
      openMonitoring: openMonitoring,
      loggingInfo: loggingInfo,
      clientAuthentication: clientAuthentication,
    });

    this.clusterName = this.getResourceNameAttribute(
      core.Fn.select(1, core.Fn.split('/', resource.ref)),
    );
    this.clusterArn = resource.ref;

    resource.applyRemovalPolicy(props.removalPolicy, {
      default: core.RemovalPolicy.RETAIN,
    });
  }