packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts (452 lines of code) (raw):
import * as acmpca from 'aws-cdk-lib/aws-acmpca';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import * as core from 'aws-cdk-lib/core';
import { FeatureFlags } from 'aws-cdk-lib/core';
import * as cr from 'aws-cdk-lib/custom-resources';
import { S3_CREATE_DEFAULT_LOGGING_POLICY } from 'aws-cdk-lib/cx-api';
import * as constructs from 'constructs';
import { addressOf } from 'constructs/lib/private/uniqueid';
import { KafkaVersion } from './';
import { CfnCluster } from 'aws-cdk-lib/aws-msk';
import { addConstructMetadata, MethodMetadata } from 'aws-cdk-lib/core/lib/metadata-resource';
/**
* Represents a MSK Cluster
*/
export interface ICluster extends core.IResource, ec2.IConnectable {
/**
* The ARN of cluster.
*
* @attribute
*/
readonly clusterArn: string;
/**
* The physical name of the cluster.
*
* @attribute
*/
readonly clusterName: string;
}
/**
* A new or imported MSK Cluster.
*/
export abstract class ClusterBase extends core.Resource implements ICluster {
public abstract readonly clusterArn: string;
public abstract readonly clusterName: string;
/** @internal */
protected _connections: ec2.Connections | undefined;
/** Manages connections for the cluster */
public get connections(): ec2.Connections {
if (!this._connections) {
throw new core.ValidationError('An imported Cluster cannot manage its security groups', this);
}
return this._connections;
}
}
/**
* Properties for a MSK Cluster
*/
export interface ClusterProps {
/**
* The physical name of the cluster.
*/
readonly clusterName: string;
/**
* The version of Apache Kafka.
*/
readonly kafkaVersion: KafkaVersion;
/**
* Number of Apache Kafka brokers deployed in each Availability Zone.
*
* @default 1
*/
readonly numberOfBrokerNodes?: number;
/**
* Defines the virtual networking environment for this cluster.
* Must have at least 2 subnets in two different AZs.
*/
readonly vpc: ec2.IVpc;
/**
* Where to place the nodes within the VPC.
* Amazon MSK distributes the broker nodes evenly across the subnets that you specify.
* The subnets that you specify must be in distinct Availability Zones.
* Client subnets can't be in Availability Zone us-east-1e.
*
* @default - the Vpc default strategy if not specified.
*/
readonly vpcSubnets?: ec2.SubnetSelection;
/**
* The EC2 instance type that you want Amazon MSK to use when it creates your brokers.
*
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#broker-instance-types
* @default kafka.m5.large
*/
readonly instanceType?: ec2.InstanceType;
/**
* The AWS security groups to associate with the elastic network interfaces in order to specify who can
* connect to and communicate with the Amazon MSK cluster.
*
* @default - create new security group
*/
readonly securityGroups?: ec2.ISecurityGroup[];
/**
* Information about storage volumes attached to MSK broker nodes.
*
* @default - 1000 GiB EBS volume
*/
readonly ebsStorageInfo?: EbsStorageInfo;
/**
* This controls storage mode for supported storage tiers.
*
* @default - StorageMode.LOCAL
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html
*/
readonly storageMode?: StorageMode;
/**
* The Amazon MSK configuration to use for the cluster.
*
* @default - none
*/
readonly configurationInfo?: ClusterConfigurationInfo;
/**
* Cluster monitoring configuration.
*
* @default - DEFAULT monitoring level
*/
readonly monitoring?: MonitoringConfiguration;
/**
* Configure your MSK cluster to send broker logs to different destination types.
*
* @default - disabled
*/
readonly logging?: BrokerLogging;
/**
* Config details for encryption in transit.
*
* @default - enabled
*/
readonly encryptionInTransit?: EncryptionInTransitConfig;
/**
* Configuration properties for client authentication.
* MSK supports using private TLS certificates or SASL/SCRAM to authenticate the identity of clients.
*
* @default - disabled
*/
readonly clientAuthentication?: ClientAuthentication;
/**
* What to do when this resource is deleted from a stack.
*
* @default RemovalPolicy.RETAIN
*/
readonly removalPolicy?: core.RemovalPolicy;
}
/**
* EBS volume information.
*/
export interface EbsStorageInfo {
/**
* The size in GiB of the EBS volume for the data drive on each broker node.
*
* @default 1000
*/
readonly volumeSize?: number;
/**
* The AWS KMS key for encrypting data at rest.
*
* @default Uses AWS managed CMK (aws/kafka)
*/
readonly encryptionKey?: kms.IKey;
}
/**
* The storage mode for the cluster brokers.
*/
export enum StorageMode {
/**
* Local storage mode utilizes network attached EBS storage.
*/
LOCAL = 'LOCAL',
/**
* Tiered storage mode utilizes EBS storage and Tiered storage.
*/
TIERED = 'TIERED',
}
/**
* The Amazon MSK configuration to use for the cluster.
* Note: There is currently no Cloudformation Resource to create a Configuration
*/
export interface ClusterConfigurationInfo {
/**
* The Amazon Resource Name (ARN) of the MSK configuration to use.
* For example, arn:aws:kafka:us-east-1:123456789012:configuration/example-configuration-name/abcdabcd-1234-abcd-1234-abcd123e8e8e-1.
*/
readonly arn: string;
/**
* The revision of the Amazon MSK configuration to use.
*/
readonly revision: number;
}
/**
* The level of monitoring for the MSK cluster
*
* @see https://docs.aws.amazon.com/msk/latest/developerguide/monitoring.html#metrics-details
*/
export enum ClusterMonitoringLevel {
/**
* Default metrics are the essential metrics to monitor.
*/
DEFAULT = 'DEFAULT',
/**
* Per Broker metrics give you metrics at the broker level.
*/
PER_BROKER = 'PER_BROKER',
/**
* Per Topic Per Broker metrics help you understand volume at the topic level.
*/
PER_TOPIC_PER_BROKER = 'PER_TOPIC_PER_BROKER',
/**
* Per Topic Per Partition metrics help you understand consumer group lag at the topic partition level.
*/
PER_TOPIC_PER_PARTITION = 'PER_TOPIC_PER_PARTITION',
}
/**
* Monitoring Configuration
*/
export interface MonitoringConfiguration {
/**
* Specifies the level of monitoring for the MSK cluster.
*
* @default DEFAULT
*/
readonly clusterMonitoringLevel?: ClusterMonitoringLevel;
/**
* Indicates whether you want to enable or disable the JMX Exporter.
*
* @default false
*/
readonly enablePrometheusJmxExporter?: boolean;
/**
* Indicates whether you want to enable or disable the Prometheus Node Exporter.
*
* You can use the Prometheus Node Exporter to get CPU and disk metrics for the broker nodes.
*
* @default false
*/
readonly enablePrometheusNodeExporter?: boolean;
}
/**
* Configuration details related to broker logs.
*/
export interface BrokerLogging {
/**
* The Amazon Data Firehose delivery stream that is the destination for broker logs.
*
* @default - disabled
*/
readonly firehoseDeliveryStreamName?: string;
/**
* The CloudWatch Logs group that is the destination for broker logs.
*
* @default - disabled
*/
readonly cloudwatchLogGroup?: logs.ILogGroup;
/**
* Details of the Amazon S3 destination for broker logs.
*
* @default - disabled
*/
readonly s3?: S3LoggingConfiguration;
}
/**
* Details of the Amazon S3 destination for broker logs.
*/
export interface S3LoggingConfiguration {
/**
* The S3 bucket that is the destination for broker logs.
*/
readonly bucket: s3.IBucket;
/**
* The S3 prefix that is the destination for broker logs.
*
* @default - no prefix
*/
readonly prefix?: string;
}
/**
* Indicates the encryption setting for data in transit between clients and brokers.
*/
export enum ClientBrokerEncryption {
/**
* TLS means that client-broker communication is enabled with TLS only.
*/
TLS = 'TLS',
/**
* TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data.
*/
TLS_PLAINTEXT = 'TLS_PLAINTEXT',
/**
* PLAINTEXT means that client-broker communication is enabled in plaintext only.
*/
PLAINTEXT = 'PLAINTEXT',
}
/**
* The settings for encrypting data in transit.
*
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html#msk-encryption-in-transit
*/
export interface EncryptionInTransitConfig {
/**
* Indicates the encryption setting for data in transit between clients and brokers.
*
* @default - TLS
*/
readonly clientBroker?: ClientBrokerEncryption;
/**
* Indicates that data communication among the broker nodes of the cluster is encrypted.
*
* @default true
*/
readonly enableInCluster?: boolean;
}
/**
* SASL authentication properties
*/
export interface SaslAuthProps {
/**
* Enable SASL/SCRAM authentication.
*
* @default false
*/
readonly scram?: boolean;
/**
* Enable IAM access control.
*
* @default false
*/
readonly iam?: boolean;
/**
* KMS Key to encrypt SASL/SCRAM secrets.
*
* You must use a customer master key (CMK) when creating users in secrets manager.
* You cannot use a Secret with Amazon MSK that uses the default Secrets Manager encryption key.
*
* @default - CMK will be created with alias msk/{clusterName}/sasl/scram
*/
readonly key?: kms.IKey;
}
/**
* TLS authentication properties
*/
export interface TlsAuthProps {
/**
* List of ACM Certificate Authorities to enable TLS authentication.
*
* @default - none
*/
readonly certificateAuthorities?: acmpca.ICertificateAuthority[];
}
/**
* SASL + TLS authentication properties
*/
export interface SaslTlsAuthProps extends SaslAuthProps, TlsAuthProps { }
/**
* Configuration properties for client authentication.
*/
export class ClientAuthentication {
/**
* SASL authentication
*/
public static sasl(props: SaslAuthProps): ClientAuthentication {
return new ClientAuthentication(props, undefined);
}
/**
* TLS authentication
*/
public static tls(props: TlsAuthProps): ClientAuthentication {
return new ClientAuthentication(undefined, props);
}
/**
* SASL + TLS authentication
*/
public static saslTls(saslTlsProps: SaslTlsAuthProps): ClientAuthentication {
return new ClientAuthentication(saslTlsProps, saslTlsProps);
}
/**
* @param saslProps - properties for SASL authentication
* @param tlsProps - properties for TLS authentication
*/
private constructor(
public readonly saslProps?: SaslAuthProps,
public readonly tlsProps?: TlsAuthProps,
) {}
}
/**
* Create a MSK Cluster.
*
* @resource AWS::MSK::Cluster
*/
export class Cluster extends ClusterBase {
/**
* Reference an existing cluster, defined outside of the CDK code, by name.
*/
public static fromClusterArn(scope: constructs.Construct, id: string, clusterArn: string): ICluster {
class Import extends ClusterBase {
public readonly clusterArn = clusterArn;
public readonly clusterName = core.Fn.select(1, core.Fn.split('/', clusterArn)); // ['arn:partition:kafka:region:account-id', clusterName, clusterId]
}
return new Import(scope, id);
}
public readonly clusterArn: string;
public readonly clusterName: string;
/** Key used to encrypt SASL/SCRAM users */
public readonly saslScramAuthenticationKey?: kms.IKey;
private _clusterDescription?: cr.AwsCustomResource;
private _clusterBootstrapBrokers?: cr.AwsCustomResource;
constructor(scope: constructs.Construct, id: string, props: ClusterProps) {
super(scope, id, { physicalName: props.clusterName });
// Enhanced CDK Analytics Telemetry
addConstructMetadata(this, props);
const subnetSelection = props.vpc.selectSubnets(props.vpcSubnets);
this._connections = new ec2.Connections({
securityGroups: props.securityGroups ?? [
new ec2.SecurityGroup(this, 'SecurityGroup', {
description: 'MSK security group',
vpc: props.vpc,
}),
],
});
if (subnetSelection.subnets.length < 2) {
throw new core.ValidationError(`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}`, this);
}
if (props.encryptionInTransit?.clientBroker === ClientBrokerEncryption.PLAINTEXT && props.clientAuthentication) {
throw new core.ValidationError('To enable client authentication, you must enabled TLS-encrypted traffic between clients and brokers.', this);
} else if (
props.encryptionInTransit?.clientBroker ===
ClientBrokerEncryption.TLS_PLAINTEXT &&
(props.clientAuthentication?.saslProps?.scram ||
props.clientAuthentication?.saslProps?.iam)
) {
throw new core.ValidationError('To enable SASL/SCRAM or IAM authentication, you must only allow TLS-encrypted traffic between clients and brokers.', this);
}
const volumeSize = props.ebsStorageInfo?.volumeSize ?? 1000;
// Minimum: 1 GiB, maximum: 16384 GiB
if (volumeSize < 1 || volumeSize > 16384) {
throw new core.ValidationError('EBS volume size should be in the range 1-16384', this);
}
const instanceType = props.instanceType
? this.mskInstanceType(props.instanceType)
: this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);
if (props.storageMode && props.storageMode === StorageMode.TIERED) {
if (!props.kafkaVersion.isTieredStorageCompatible()) {
throw new core.ValidationError(`To deploy a tiered cluster you must select a compatible Kafka version, got ${props.kafkaVersion.version}`, this);
}
if (instanceType === this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.T3, ec2.InstanceSize.SMALL),
)) {
throw new core.ValidationError('Tiered storage doesn\'t support broker type t3.small', this);
}
}
const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
? {
dataVolumeKmsKeyId:
props.ebsStorageInfo.encryptionKey.keyId,
}
: undefined; // MSK will create the managed key
const encryptionInTransit = {
clientBroker:
props.encryptionInTransit?.clientBroker ??
ClientBrokerEncryption.TLS,
inCluster: props.encryptionInTransit?.enableInCluster ?? true,
};
const openMonitoring =
props.monitoring?.enablePrometheusJmxExporter ||
props.monitoring?.enablePrometheusNodeExporter
? {
prometheus: {
jmxExporter: props.monitoring?.enablePrometheusJmxExporter
? { enabledInBroker: true }
: undefined,
nodeExporter: props.monitoring
?.enablePrometheusNodeExporter
? { enabledInBroker: true }
: undefined,
},
}
: undefined;
const loggingBucket = props.logging?.s3?.bucket;
if (loggingBucket && FeatureFlags.of(this).isEnabled(S3_CREATE_DEFAULT_LOGGING_POLICY)) {
const stack = core.Stack.of(this);
loggingBucket.addToResourcePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
principals: [
new iam.ServicePrincipal('delivery.logs.amazonaws.com'),
],
resources: [
loggingBucket.arnForObjects(`AWSLogs/${stack.account}/*`),
],
actions: ['s3:PutObject'],
conditions: {
StringEquals: {
's3:x-amz-acl': 'bucket-owner-full-control',
'aws:SourceAccount': stack.account,
},
ArnLike: {
'aws:SourceArn': stack.formatArn({
service: 'logs',
resource: '*',
}),
},
},
}));
loggingBucket.addToResourcePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
principals: [
new iam.ServicePrincipal('delivery.logs.amazonaws.com'),
],
resources: [loggingBucket.bucketArn],
actions: [
's3:GetBucketAcl',
's3:ListBucket',
],
conditions: {
StringEquals: {
'aws:SourceAccount': stack.account,
},
ArnLike: {
'aws:SourceArn': stack.formatArn({
service: 'logs',
resource: '*',
}),
},
},
}));
}
const loggingInfo = {
brokerLogs: {
cloudWatchLogs: {
enabled:
props.logging?.cloudwatchLogGroup !== undefined,
logGroup:
props.logging?.cloudwatchLogGroup?.logGroupName,
},
firehose: {
enabled:
props.logging?.firehoseDeliveryStreamName !==
undefined,
deliveryStream:
props.logging?.firehoseDeliveryStreamName,
},
s3: {
enabled: loggingBucket !== undefined,
bucket: loggingBucket?.bucketName,
prefix: props.logging?.s3?.prefix,
},
},
};
if (props.clientAuthentication?.saslProps?.scram && props.clientAuthentication?.saslProps?.key === undefined) {
this.saslScramAuthenticationKey = new kms.Key(this, 'SASLKey', {
description: 'Used for encrypting MSK secrets for SASL/SCRAM authentication.',
alias: `msk/${props.clusterName}/sasl/scram`,
});
// https://docs.aws.amazon.com/kms/latest/developerguide/services-secrets-manager.html#asm-policies
this.saslScramAuthenticationKey.addToResourcePolicy(
new iam.PolicyStatement({
sid:
'Allow access through AWS Secrets Manager for all principals in the account that are authorized to use AWS Secrets Manager',
principals: [new iam.AnyPrincipal()],
actions: [
'kms:Encrypt',
'kms:Decrypt',
'kms:ReEncrypt*',
'kms:GenerateDataKey*',
'kms:CreateGrant',
'kms:DescribeKey',
],
resources: ['*'],
conditions: {
StringEquals: {
'kms:ViaService': `secretsmanager.${core.Stack.of(this).region}.amazonaws.com`,
'kms:CallerAccount': core.Stack.of(this).account,
},
},
}),
);
}
let clientAuthentication: CfnCluster.ClientAuthenticationProperty | undefined;
if (props.clientAuthentication) {
const { saslProps, tlsProps } = props.clientAuthentication;
clientAuthentication = {
sasl: saslProps ? {
iam: saslProps.iam ? { enabled: true }: undefined,
scram: saslProps.scram ? { enabled: true }: undefined,
} : undefined,
tls: tlsProps?.certificateAuthorities ? {
certificateAuthorityArnList: tlsProps.certificateAuthorities?.map((ca) => ca.certificateAuthorityArn),
} : undefined,
};
}
const resource = new CfnCluster(this, 'Resource', {
clusterName: props.clusterName,
kafkaVersion: props.kafkaVersion.version,
numberOfBrokerNodes:
props.numberOfBrokerNodes !== undefined ?
subnetSelection.availabilityZones.length * props.numberOfBrokerNodes : subnetSelection.availabilityZones.length,
brokerNodeGroupInfo: {
instanceType,
clientSubnets: subnetSelection.subnetIds,
securityGroups: this.connections.securityGroups.map(
(group) => group.securityGroupId,
),
storageInfo: {
ebsStorageInfo: {
volumeSize: volumeSize,
},
},
},
encryptionInfo: {
encryptionAtRest,
encryptionInTransit,
},
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
storageMode: props.storageMode,
loggingInfo: loggingInfo,
clientAuthentication,
});
this.clusterName = this.getResourceNameAttribute(
core.Fn.select(1, core.Fn.split('/', resource.ref)),
);
this.clusterArn = resource.ref;
resource.applyRemovalPolicy(props.removalPolicy, {
default: core.RemovalPolicy.RETAIN,
});
}
private mskInstanceType(instanceType: ec2.InstanceType): string {
return `kafka.${instanceType.toString()}`;
}
/**
* Get the ZooKeeper Connection string
*
* Uses a Custom Resource to make an API call to `describeCluster` using the Javascript SDK
*
* @param responseField Field to return from API call. eg. ZookeeperConnectString, ZookeeperConnectStringTls
* @returns - The connection string to use to connect to the Apache ZooKeeper cluster.
*/
private _zookeeperConnectionString(responseField: string): string {
if (!this._clusterDescription) {
this._clusterDescription = new cr.AwsCustomResource(this, 'ZookeeperConnect', {
onUpdate: {
service: 'Kafka',
action: 'describeCluster',
parameters: {
ClusterArn: this.clusterArn,
},
physicalResourceId: cr.PhysicalResourceId.of(
'ZooKeeperConnectionString',
),
// Limit the output of describeCluster that is otherwise too large
outputPaths: [
'ClusterInfo.ZookeeperConnectString',
'ClusterInfo.ZookeeperConnectStringTls',
],
},
policy: cr.AwsCustomResourcePolicy.fromSdkCalls({
resources: [this.clusterArn],
}),
installLatestAwsSdk: false,
});
}
return this._clusterDescription.getResponseField(`ClusterInfo.${responseField}`);
}
/**
* Get the ZooKeeper Connection string
*
* Uses a Custom Resource to make an API call to `describeCluster` using the Javascript SDK
*
* @returns - The connection string to use to connect to the Apache ZooKeeper cluster.
*/
public get zookeeperConnectionString(): string {
return this._zookeeperConnectionString('ZookeeperConnectString');
}
/**
* Get the ZooKeeper Connection string for a TLS enabled cluster
*
* Uses a Custom Resource to make an API call to `describeCluster` using the Javascript SDK
*
* @returns - The connection string to use to connect to zookeeper cluster on TLS port.
*/
public get zookeeperConnectionStringTls(): string {
return this._zookeeperConnectionString('ZookeeperConnectStringTls');
}
/**
* Get the list of brokers that a client application can use to bootstrap
*
* Uses a Custom Resource to make an API call to `getBootstrapBrokers` using the Javascript SDK
*
* @param responseField Field to return from API call. eg. BootstrapBrokerStringSaslScram, BootstrapBrokerString
* @returns - A string containing one or more hostname:port pairs.
*/
private _bootstrapBrokers(responseField: string): string {
if (!this._clusterBootstrapBrokers) {
this._clusterBootstrapBrokers = new cr.AwsCustomResource(this, `BootstrapBrokers${responseField}`, {
onUpdate: {
service: 'Kafka',
action: 'getBootstrapBrokers',
parameters: {
ClusterArn: this.clusterArn,
},
physicalResourceId: cr.PhysicalResourceId.of('BootstrapBrokers'),
},
policy: cr.AwsCustomResourcePolicy.fromSdkCalls({
resources: [this.clusterArn],
}),
// APIs are available in 2.1055.0
installLatestAwsSdk: false,
});
}
return this._clusterBootstrapBrokers.getResponseField(responseField);
}
/**
* Get the list of brokers that a client application can use to bootstrap
*
* Uses a Custom Resource to make an API call to `getBootstrapBrokers` using the Javascript SDK
*
* @returns - A string containing one or more hostname:port pairs.
*/
public get bootstrapBrokers(): string {
return this._bootstrapBrokers('BootstrapBrokerString');
}
/**
* Get the list of brokers that a TLS authenticated client application can use to bootstrap
*
* Uses a Custom Resource to make an API call to `getBootstrapBrokers` using the Javascript SDK
*
* @returns - A string containing one or more DNS names (or IP) and TLS port pairs.
*/
public get bootstrapBrokersTls(): string {
return this._bootstrapBrokers('BootstrapBrokerStringTls');
}
/**
* Get the list of brokers that a SASL/SCRAM authenticated client application can use to bootstrap
*
* Uses a Custom Resource to make an API call to `getBootstrapBrokers` using the Javascript SDK
*
* @returns - A string containing one or more dns name (or IP) and SASL SCRAM port pairs.
*/
public get bootstrapBrokersSaslScram(): string {
return this._bootstrapBrokers('BootstrapBrokerStringSaslScram');
}
/**
* Get the list of brokers that a SASL/IAM authenticated client application can use to bootstrap
*
* Uses a Custom Resource to make an API call to `getBootstrapBrokers` using the Javascript SDK
*
* @returns - A string containing one or more DNS names (or IP) and TLS port pairs.
*/
public get bootstrapBrokersSaslIam() {
return this._bootstrapBrokers('BootstrapBrokerStringSaslIam');
}
/**
* A list of usersnames to register with the cluster. The password will automatically be generated using Secrets
* Manager and the { username, password } JSON object stored in Secrets Manager as `AmazonMSK_username`.
*
* Must be using the SASL/SCRAM authentication mechanism.
*
* @param usernames - username(s) to register with the cluster
*/
@MethodMetadata()
public addUser(...usernames: string[]): void {
if (this.saslScramAuthenticationKey) {
const MSK_SECRET_PREFIX = 'AmazonMSK_'; // Required
const secrets = usernames.map(
(username) =>
new secretsmanager.Secret(this, `KafkaUser${username}`, {
secretName: `${MSK_SECRET_PREFIX}${this.clusterName}_${username}`,
generateSecretString: {
secretStringTemplate: JSON.stringify({ username }),
generateStringKey: 'password',
},
encryptionKey: this.saslScramAuthenticationKey,
}),
);
new cr.AwsCustomResource(this, `BatchAssociateScramSecrets${addressOf(usernames)}`, {
onUpdate: {
service: 'Kafka',
action: 'batchAssociateScramSecret',
parameters: {
ClusterArn: this.clusterArn,
SecretArnList: secrets.map((secret) => secret.secretArn),
},
physicalResourceId: cr.PhysicalResourceId.of('CreateUsers'),
},
policy: cr.AwsCustomResourcePolicy.fromStatements([
new iam.PolicyStatement({
actions: ['kms:CreateGrant'],
resources: [this.saslScramAuthenticationKey?.keyArn],
}),
new iam.PolicyStatement({
actions: ['kafka:BatchAssociateScramSecret'],
resources: [this.clusterArn],
}),
]),
installLatestAwsSdk: false,
});
} else {
throw new core.ValidationError('Cannot create users if an authentication KMS key has not been created/provided.', this);
}
}
}