in packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts [463:702]
constructor(scope: constructs.Construct, id: string, props: ClusterProps) {
super(scope, id, { physicalName: props.clusterName });
// Enhanced CDK Analytics Telemetry
addConstructMetadata(this, props);
const subnetSelection = props.vpc.selectSubnets(props.vpcSubnets);
this._connections = new ec2.Connections({
securityGroups: props.securityGroups ?? [
new ec2.SecurityGroup(this, 'SecurityGroup', {
description: 'MSK security group',
vpc: props.vpc,
}),
],
});
if (subnetSelection.subnets.length < 2) {
throw new core.ValidationError(`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}`, this);
}
if (props.encryptionInTransit?.clientBroker === ClientBrokerEncryption.PLAINTEXT && props.clientAuthentication) {
throw new core.ValidationError('To enable client authentication, you must enabled TLS-encrypted traffic between clients and brokers.', this);
} else if (
props.encryptionInTransit?.clientBroker ===
ClientBrokerEncryption.TLS_PLAINTEXT &&
(props.clientAuthentication?.saslProps?.scram ||
props.clientAuthentication?.saslProps?.iam)
) {
throw new core.ValidationError('To enable SASL/SCRAM or IAM authentication, you must only allow TLS-encrypted traffic between clients and brokers.', this);
}
const volumeSize = props.ebsStorageInfo?.volumeSize ?? 1000;
// Minimum: 1 GiB, maximum: 16384 GiB
if (volumeSize < 1 || volumeSize > 16384) {
throw new core.ValidationError('EBS volume size should be in the range 1-16384', this);
}
const instanceType = props.instanceType
? this.mskInstanceType(props.instanceType)
: this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);
if (props.storageMode && props.storageMode === StorageMode.TIERED) {
if (!props.kafkaVersion.isTieredStorageCompatible()) {
throw new core.ValidationError(`To deploy a tiered cluster you must select a compatible Kafka version, got ${props.kafkaVersion.version}`, this);
}
if (instanceType === this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.T3, ec2.InstanceSize.SMALL),
)) {
throw new core.ValidationError('Tiered storage doesn\'t support broker type t3.small', this);
}
}
const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
? {
dataVolumeKmsKeyId:
props.ebsStorageInfo.encryptionKey.keyId,
}
: undefined; // MSK will create the managed key
const encryptionInTransit = {
clientBroker:
props.encryptionInTransit?.clientBroker ??
ClientBrokerEncryption.TLS,
inCluster: props.encryptionInTransit?.enableInCluster ?? true,
};
const openMonitoring =
props.monitoring?.enablePrometheusJmxExporter ||
props.monitoring?.enablePrometheusNodeExporter
? {
prometheus: {
jmxExporter: props.monitoring?.enablePrometheusJmxExporter
? { enabledInBroker: true }
: undefined,
nodeExporter: props.monitoring
?.enablePrometheusNodeExporter
? { enabledInBroker: true }
: undefined,
},
}
: undefined;
const loggingBucket = props.logging?.s3?.bucket;
if (loggingBucket && FeatureFlags.of(this).isEnabled(S3_CREATE_DEFAULT_LOGGING_POLICY)) {
const stack = core.Stack.of(this);
loggingBucket.addToResourcePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
principals: [
new iam.ServicePrincipal('delivery.logs.amazonaws.com'),
],
resources: [
loggingBucket.arnForObjects(`AWSLogs/${stack.account}/*`),
],
actions: ['s3:PutObject'],
conditions: {
StringEquals: {
's3:x-amz-acl': 'bucket-owner-full-control',
'aws:SourceAccount': stack.account,
},
ArnLike: {
'aws:SourceArn': stack.formatArn({
service: 'logs',
resource: '*',
}),
},
},
}));
loggingBucket.addToResourcePolicy(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
principals: [
new iam.ServicePrincipal('delivery.logs.amazonaws.com'),
],
resources: [loggingBucket.bucketArn],
actions: [
's3:GetBucketAcl',
's3:ListBucket',
],
conditions: {
StringEquals: {
'aws:SourceAccount': stack.account,
},
ArnLike: {
'aws:SourceArn': stack.formatArn({
service: 'logs',
resource: '*',
}),
},
},
}));
}
const loggingInfo = {
brokerLogs: {
cloudWatchLogs: {
enabled:
props.logging?.cloudwatchLogGroup !== undefined,
logGroup:
props.logging?.cloudwatchLogGroup?.logGroupName,
},
firehose: {
enabled:
props.logging?.firehoseDeliveryStreamName !==
undefined,
deliveryStream:
props.logging?.firehoseDeliveryStreamName,
},
s3: {
enabled: loggingBucket !== undefined,
bucket: loggingBucket?.bucketName,
prefix: props.logging?.s3?.prefix,
},
},
};
if (props.clientAuthentication?.saslProps?.scram && props.clientAuthentication?.saslProps?.key === undefined) {
this.saslScramAuthenticationKey = new kms.Key(this, 'SASLKey', {
description: 'Used for encrypting MSK secrets for SASL/SCRAM authentication.',
alias: `msk/${props.clusterName}/sasl/scram`,
});
// https://docs.aws.amazon.com/kms/latest/developerguide/services-secrets-manager.html#asm-policies
this.saslScramAuthenticationKey.addToResourcePolicy(
new iam.PolicyStatement({
sid:
'Allow access through AWS Secrets Manager for all principals in the account that are authorized to use AWS Secrets Manager',
principals: [new iam.AnyPrincipal()],
actions: [
'kms:Encrypt',
'kms:Decrypt',
'kms:ReEncrypt*',
'kms:GenerateDataKey*',
'kms:CreateGrant',
'kms:DescribeKey',
],
resources: ['*'],
conditions: {
StringEquals: {
'kms:ViaService': `secretsmanager.${core.Stack.of(this).region}.amazonaws.com`,
'kms:CallerAccount': core.Stack.of(this).account,
},
},
}),
);
}
let clientAuthentication: CfnCluster.ClientAuthenticationProperty | undefined;
if (props.clientAuthentication) {
const { saslProps, tlsProps } = props.clientAuthentication;
clientAuthentication = {
sasl: saslProps ? {
iam: saslProps.iam ? { enabled: true }: undefined,
scram: saslProps.scram ? { enabled: true }: undefined,
} : undefined,
tls: tlsProps?.certificateAuthorities ? {
certificateAuthorityArnList: tlsProps.certificateAuthorities?.map((ca) => ca.certificateAuthorityArn),
} : undefined,
};
}
const resource = new CfnCluster(this, 'Resource', {
clusterName: props.clusterName,
kafkaVersion: props.kafkaVersion.version,
numberOfBrokerNodes:
props.numberOfBrokerNodes !== undefined ?
subnetSelection.availabilityZones.length * props.numberOfBrokerNodes : subnetSelection.availabilityZones.length,
brokerNodeGroupInfo: {
instanceType,
clientSubnets: subnetSelection.subnetIds,
securityGroups: this.connections.securityGroups.map(
(group) => group.securityGroupId,
),
storageInfo: {
ebsStorageInfo: {
volumeSize: volumeSize,
},
},
},
encryptionInfo: {
encryptionAtRest,
encryptionInTransit,
},
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
storageMode: props.storageMode,
loggingInfo: loggingInfo,
clientAuthentication,
});
this.clusterName = this.getResourceNameAttribute(
core.Fn.select(1, core.Fn.split('/', resource.ref)),
);
this.clusterArn = resource.ref;
resource.applyRemovalPolicy(props.removalPolicy, {
default: core.RemovalPolicy.RETAIN,
});
}