in packages/@aws-cdk/aws-msk/lib/cluster.ts [401:625]
constructor(scope: constructs.Construct, id: string, props: ClusterProps) {
super(scope, id, {
physicalName: props.clusterName,
});
const subnetSelection = props.vpc.selectSubnets(props.vpcSubnets);
this._connections = new ec2.Connections({
securityGroups: props.securityGroups ?? [
new ec2.SecurityGroup(this, 'SecurityGroup', {
description: 'MSK security group',
vpc: props.vpc,
}),
],
});
if (subnetSelection.subnets.length < 2) {
throw Error(
`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}`,
);
}
if (
!core.Token.isUnresolved(props.clusterName) &&
!/^[a-zA-Z0-9]+$/.test(props.clusterName) &&
props.clusterName.length > 64
) {
throw Error(
'The cluster name must only contain alphanumeric characters and have a maximum length of 64 characters.' +
`got: '${props.clusterName}. length: ${props.clusterName.length}'`,
);
}
if (
props.clientAuthentication?.saslProps?.iam &&
props.clientAuthentication?.saslProps?.scram
) {
throw Error('Only one client authentication method can be enabled.');
}
if (
props.encryptionInTransit?.clientBroker ===
ClientBrokerEncryption.PLAINTEXT &&
props.clientAuthentication
) {
throw Error(
'To enable client authentication, you must enabled TLS-encrypted traffic between clients and brokers.',
);
} else if (
props.encryptionInTransit?.clientBroker ===
ClientBrokerEncryption.TLS_PLAINTEXT &&
(props.clientAuthentication?.saslProps?.scram ||
props.clientAuthentication?.saslProps?.iam)
) {
throw Error(
'To enable SASL/SCRAM or IAM authentication, you must only allow TLS-encrypted traffic between clients and brokers.',
);
}
const volumeSize =
props.ebsStorageInfo?.volumeSize ?? 1000;
// Minimum: 1 GiB, maximum: 16384 GiB
if (volumeSize < 1 || volumeSize > 16384) {
throw Error(
'EBS volume size should be in the range 1-16384',
);
}
const instanceType = props.instanceType
? this.mskInstanceType(props.instanceType)
: this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);
const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
? {
dataVolumeKmsKeyId:
props.ebsStorageInfo.encryptionKey.keyId,
}
: undefined; // MSK will create the managed key
const encryptionInTransit = {
clientBroker:
props.encryptionInTransit?.clientBroker ??
ClientBrokerEncryption.TLS,
inCluster: props.encryptionInTransit?.enableInCluster ?? true,
};
const openMonitoring =
props.monitoring?.enablePrometheusJmxExporter ||
props.monitoring?.enablePrometheusNodeExporter
? {
prometheus: {
jmxExporter: props.monitoring?.enablePrometheusJmxExporter
? { enabledInBroker: true }
: undefined,
nodeExporter: props.monitoring
?.enablePrometheusNodeExporter
? { enabledInBroker: true }
: undefined,
},
}
: undefined;
const loggingInfo = {
brokerLogs: {
cloudWatchLogs: {
enabled:
props.logging?.cloudwatchLogGroup !== undefined,
logGroup:
props.logging?.cloudwatchLogGroup?.logGroupName,
},
firehose: {
enabled:
props.logging?.firehoseDeliveryStreamName !==
undefined,
deliveryStream:
props.logging?.firehoseDeliveryStreamName,
},
s3: {
enabled: props.logging?.s3?.bucket !== undefined,
bucket: props.logging?.s3?.bucket.bucketName,
prefix: props.logging?.s3?.prefix,
},
},
};
if (
props.clientAuthentication?.saslProps?.scram &&
props.clientAuthentication?.saslProps?.key === undefined
) {
this.saslScramAuthenticationKey = new kms.Key(this, 'SASLKey', {
description:
'Used for encrypting MSK secrets for SASL/SCRAM authentication.',
alias: `msk/${props.clusterName}/sasl/scram`,
});
// https://docs.aws.amazon.com/kms/latest/developerguide/services-secrets-manager.html#asm-policies
this.saslScramAuthenticationKey.addToResourcePolicy(
new iam.PolicyStatement({
sid:
'Allow access through AWS Secrets Manager for all principals in the account that are authorized to use AWS Secrets Manager',
principals: [new iam.AnyPrincipal()],
actions: [
'kms:Encrypt',
'kms:Decrypt',
'kms:ReEncrypt*',
'kms:GenerateDataKey*',
'kms:CreateGrant',
'kms:DescribeKey',
],
resources: ['*'],
conditions: {
StringEquals: {
'kms:ViaService': `secretsmanager.${core.Stack.of(this).region}.amazonaws.com`,
'kms:CallerAccount': core.Stack.of(this).account,
},
},
}),
);
}
let clientAuthentication;
if (props.clientAuthentication?.saslProps?.iam) {
clientAuthentication = {
sasl: { iam: { enabled: props.clientAuthentication.saslProps.iam } },
};
} else if (props.clientAuthentication?.saslProps?.scram) {
clientAuthentication = {
sasl: {
scram: {
enabled: props.clientAuthentication.saslProps.scram,
},
},
};
} else if (
props.clientAuthentication?.tlsProps?.certificateAuthorities !== undefined
) {
clientAuthentication = {
tls: {
certificateAuthorityArnList: props.clientAuthentication?.tlsProps?.certificateAuthorities.map(
(ca) => ca.certificateAuthorityArn,
),
},
};
}
const resource = new CfnCluster(this, 'Resource', {
clusterName: props.clusterName,
kafkaVersion: props.kafkaVersion.version,
numberOfBrokerNodes:
props.numberOfBrokerNodes !== undefined ?
subnetSelection.availabilityZones.length * props.numberOfBrokerNodes : subnetSelection.availabilityZones.length,
brokerNodeGroupInfo: {
instanceType,
clientSubnets: subnetSelection.subnetIds,
securityGroups: this.connections.securityGroups.map(
(group) => group.securityGroupId,
),
storageInfo: {
ebsStorageInfo: {
volumeSize: volumeSize,
},
},
},
encryptionInfo: {
encryptionAtRest,
encryptionInTransit,
},
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
loggingInfo: loggingInfo,
clientAuthentication: clientAuthentication,
});
this.clusterName = this.getResourceNameAttribute(
core.Fn.select(1, core.Fn.split('/', resource.ref)),
);
this.clusterArn = resource.ref;
resource.applyRemovalPolicy(props.removalPolicy, {
default: core.RemovalPolicy.RETAIN,
});
}