constructor()

in source/labs/msk-cluster-setup.ts [22:224]


    constructor(scope: cdk.Construct, id: string, props: SolutionStackProps) {
        super(scope, id, props);

        const kafkaVersion = new cdk.CfnParameter(this, 'MSKKafkaVersion', {
            type: 'String',
            default: '2.8.1',
            allowedValues: ['2.8.1', '2.8.0', '2.7.1', '2.7.0', '2.6.2', '2.6.1', '2.6.0', '2.5.1', '2.4.1.1', '2.3.1', '2.2.1']
        });

        const tlsMutualAuth = new cdk.CfnParameter(this, 'TLSMutualAuthentication', {
            type: 'String',
            default: 'false',
            allowedValues: ['true', 'false']
        });

        const pcaArn = new cdk.CfnParameter(this, 'PcaArn', {
            type: 'String',
            allowedPattern: 'arn:aws:acm-pca:[us\\-east\\-1|us\\-east\\-2|eu\\-west\\-1]{9}:\\d{12}:certificate-authority\\/[\\da-f]{8}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{12}|^$',
            constraintDescription: 'Not a valid ACM PCA ARN'
        });

        const clientAndVpcStack = new cdk.CfnParameter(this, 'VpcBastionStack', {
            type: 'String'
        });

        //---------------------------------------------------------------------
        // Template metadata
        this.templateOptions.metadata = {
            'AWS::CloudFormation::Interface': {
                ParameterGroups: [
                    {
                        Label: { default: 'Amazon MSK cluster configuration' },
                        Parameters: [
                            clientAndVpcStack.logicalId,
                            kafkaVersion.logicalId,
                            tlsMutualAuth.logicalId,
                            pcaArn.logicalId
                        ]
                    }
                ],
                ParameterLabels: {
                    [clientAndVpcStack.logicalId]: {
                        default: 'Name of the Bastion / Kafka client instance stack'
                    },
                    [kafkaVersion.logicalId]: {
                        default: 'Apache Kafka version on the brokers'
                    },
                    [tlsMutualAuth.logicalId]: {
                        default: 'Whether TLS Mutual Auth should be enabled for the cluster'
                    },
                    [pcaArn.logicalId]: {
                        default: '(Optional) ARN of the ACM Certificate Authority (for TLS Mutual Auth)'
                    }
                }
            }
        };

        // With the CDK approach we took, `BastionStack` and `VPCStack` are the same
        const clientAndVpcStackName = clientAndVpcStack.valueAsString;

        const mtlsCondition = new cdk.CfnCondition(this, 'MTLS', {
            expression: cdk.Fn.conditionEquals(tlsMutualAuth.value, 'true')
        });

        const noMtlsCondition = new cdk.CfnCondition(this, 'noMTLS', {
            expression: cdk.Fn.conditionEquals(tlsMutualAuth.value, 'false')
        });

        //---------------------------------------------------------------------
        // MSK security group
        const kafkaClientSG = cdk.Fn.importValue(`${clientAndVpcStackName}-KafkaClientEC2InstanceSecurityGroupId`);

        const clusterSecurityGroup = new ec2.CfnSecurityGroup(this, 'MSKSecurityGroup', {
            vpcId: cdk.Fn.importValue(`${clientAndVpcStackName}-VPCID`),
            groupDescription: 'MSK Security Group',
            securityGroupIngress: [
                {
                    ipProtocol: 'tcp',
                    fromPort: 2181,
                    toPort: 2181,
                    sourceSecurityGroupId: kafkaClientSG,
                    description: 'ZooKeeper Plaintext'
                },
                {
                    ipProtocol: 'tcp',
                    fromPort: 9092,
                    toPort: 9092,
                    sourceSecurityGroupId: kafkaClientSG,
                    description: 'Bootstrap servers Plaintext'
                },
                {
                    ipProtocol: 'tcp',
                    fromPort: 9094,
                    toPort: 9094,
                    sourceSecurityGroupId: kafkaClientSG,
                    description: 'Bootstrap servers TLS'
                }
            ]
        });

        new ec2.CfnSecurityGroupIngress(this, 'MSKSecurityGroup9092', {
            groupId: clusterSecurityGroup.attrGroupId,
            sourceSecurityGroupId: clusterSecurityGroup.attrGroupId,
            description: 'Enable access to port 9092 inside the MSKSecurityGroup',
            ipProtocol: 'tcp',
            fromPort: 9092,
            toPort: 9092
        });

        new ec2.CfnSecurityGroupIngress(this, 'MSKSecurityGroup9094', {
            groupId: clusterSecurityGroup.attrGroupId,
            sourceSecurityGroupId: clusterSecurityGroup.attrGroupId,
            description: 'Enable access to port 9094 inside the MSKSecurityGroup',
            ipProtocol: 'tcp',
            fromPort: 9094,
            toPort: 9094
        });

        //---------------------------------------------------------------------
        // MSK cluster
        const clusterProps: msk.CfnClusterProps = {
            clusterName: `MSKCluster-${cdk.Aws.STACK_NAME}`,
            enhancedMonitoring: 'DEFAULT',
            kafkaVersion: kafkaVersion.valueAsString,
            numberOfBrokerNodes: 3,
            encryptionInfo: {
                encryptionInTransit: {
                    inCluster: true,
                    clientBroker: 'TLS_PLAINTEXT'
                }
            },
            brokerNodeGroupInfo: {
                clientSubnets: [
                    cdk.Fn.importValue(`${clientAndVpcStackName}-PrivateSubnetMSKOne`),
                    cdk.Fn.importValue(`${clientAndVpcStackName}-PrivateSubnetMSKTwo`),
                    cdk.Fn.importValue(`${clientAndVpcStackName}-PrivateSubnetMSKThree`)
                ],
                instanceType: 'kafka.m5.large',
                securityGroups: [clusterSecurityGroup.attrGroupId],
                storageInfo: {
                    ebsStorageInfo: {
                        volumeSize: 1000
                    }
                }
            }
        };

        const clientAuth: msk.CfnCluster.ClientAuthenticationProperty = {
            tls: {
                certificateAuthorityArnList: [pcaArn.valueAsString]
            }
        }

        const noMtlsCluster = new msk.CfnCluster(this, 'MSKClusterNoMTLS', clusterProps);
        const mtlsCluster = new msk.CfnCluster(this, 'MSKClusterMTLS', {
            ...clusterProps,
            clientAuthentication: clientAuth
        });

        noMtlsCluster.cfnOptions.condition = noMtlsCondition;
        mtlsCluster.cfnOptions.condition = mtlsCondition;

        //---------------------------------------------------------------------
        // Solution metrics
        new SolutionHelper(this, 'SolutionHelper', {
            solutionId: props.solutionId,
            pattern: MskClusterStack.name
        });

        //---------------------------------------------------------------------
        // Outputs
        new cdk.CfnOutput(this, 'MSKSecurityGroupID', {
            value: clusterSecurityGroup.attrGroupId,
            description: 'ID of the security group for the MSK cluster'
        });

        new cdk.CfnOutput(this, 'SSHKafkaClientEC2Instance', {
            value: cdk.Fn.importValue(`${clientAndVpcStackName}-SSHKafkaClientEC2Instance`),
            description: 'SSH command for the EC2 instance'
        });

        new cdk.CfnOutput(this, 'KafkaClientEC2InstanceSecurityGroupId', {
            value: cdk.Fn.importValue(`${clientAndVpcStackName}-KafkaClientEC2InstanceSecurityGroupId`),
            description: 'ID of the security group for the EC2 instance'
        });

        new cdk.CfnOutput(this, 'SchemaRegistryUrl', {
            value: cdk.Fn.importValue(`${clientAndVpcStackName}-SchemaRegistryUrl`),
            description: 'Url for the Schema Registry'
        });

        new cdk.CfnOutput(this, 'MSKClusterArn', {
            value: cdk.Fn.conditionIf(mtlsCondition.logicalId, mtlsCluster.ref, noMtlsCluster.ref).toString(),
            description: 'Arn for the MSK cluster',
            exportName: `${cdk.Aws.STACK_NAME}-MSKClusterArn`
        });

        // TODO: Check if this is still required
        new cdk.CfnOutput(this, 'VPCStackName', {
            value: clientAndVpcStackName,
            description: 'The name of the VPC Stack'
        });
    }