in source/patterns/msk-lambda-kdf.ts [25:189]
constructor(scope: cdk.Construct, id: string, props: SolutionStackProps) {
super(scope, id, props);
//---------------------------------------------------------------------
// Kinesis Data Firehose configuration
const bufferingSize = new cdk.CfnParameter(this, 'BufferingSize', {
type: 'Number',
default: 5,
minValue: 1,
maxValue: 128
});
const bufferingInterval = new cdk.CfnParameter(this, 'BufferingInterval', {
type: 'Number',
default: 300,
minValue: 60,
maxValue: 900
});
const compressionFormat = new cdk.CfnParameter(this, 'CompressionFormat', {
type: 'String',
default: 'GZIP',
allowedValues: ['GZIP', 'HADOOP_SNAPPY', 'Snappy', 'UNCOMPRESSED', 'ZIP']
});
const outputBucket = new EncryptedBucket(this, 'Output', {
enableIntelligentTiering: true
});
const kdfToS3 = new KinesisFirehoseToS3(this, 'KdfToS3', {
existingBucketObj: outputBucket.Bucket,
kinesisFirehoseProps: {
deliveryStreamType: 'DirectPut',
deliveryStreamEncryptionConfigurationInput: {
keyType: 'AWS_OWNED_CMK'
},
extendedS3DestinationConfiguration: {
bufferingHints: {
intervalInSeconds: bufferingInterval.valueAsNumber,
sizeInMBs: bufferingSize.valueAsNumber
},
compressionFormat: compressionFormat.valueAsString,
prefix: 'data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
errorOutputPrefix: 'errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}'
}
}
});
//---------------------------------------------------------------------
// Lambda function configuration
const clusterArn = new cdk.CfnParameter(this, 'ClusterArn', {
type: 'String',
allowedPattern: 'arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}(-gov)?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)',
constraintDescription: 'Cluster ARN must be in the following format: arn:${Partition}:kafka:${Region}:${Account}:cluster/${ClusterName}/${UUID}'
});
const batchSize = new cdk.CfnParameter(this, 'BatchSize', {
type: 'Number',
default: 100,
minValue: 1,
maxValue: 10000
});
const topicName = new cdk.CfnParameter(this, 'TopicName', {
type: 'String',
allowedPattern: '.+',
constraintDescription: 'Topic name must not be empty'
});
const secretArn = new cdk.CfnParameter(this, 'SecretArn', {
type: 'String',
maxLength: 200
});
const lambdaConsumer = new KafkaConsumer(this, 'LambdaFn', {
clusterArn: clusterArn.valueAsString,
scramSecretArn: secretArn.valueAsString,
batchSize: batchSize.valueAsNumber,
startingPosition: lambda.StartingPosition.LATEST,
topicName: topicName.valueAsString,
enabled: true,
code: lambda.Code.fromAsset('lambda/msk-lambda-kdf'),
timeout: cdk.Duration.minutes(5),
environmentVariables: {
'DELIVERY_STREAM_NAME': kdfToS3.kinesisFirehose.ref
}
});
const firehosePolicy = new iam.PolicyStatement({
actions: ['firehose:PutRecord', 'firehose:PutRecordBatch'],
resources: [kdfToS3.kinesisFirehose.getAtt('Arn').toString()],
});
// This is a valid TypeScript expression, the role property will not be null.
lambdaConsumer.Function.role?.addToPrincipalPolicy(firehosePolicy); // NOSONAR (typescript:S905)
//---------------------------------------------------------------------
// Solution metrics
new SolutionHelper(this, 'SolutionHelper', {
solutionId: props.solutionId,
pattern: MskLambdaKdf.name,
bufferingSize: bufferingSize.valueAsNumber,
bufferingInterval: bufferingInterval.valueAsNumber,
compressionFormat: compressionFormat.valueAsString
});
//---------------------------------------------------------------------
// Template metadata
this.templateOptions.metadata = {
'AWS::CloudFormation::Interface': {
ParameterGroups: [
{
Label: { default: 'AWS Lambda consumer configuration' },
Parameters: [clusterArn.logicalId, batchSize.logicalId, topicName.logicalId, secretArn.logicalId]
},
{
Label: { default: 'Amazon Kinesis Data Firehose configuration' },
Parameters: [bufferingSize.logicalId, bufferingInterval.logicalId, compressionFormat.logicalId]
}
],
ParameterLabels: {
[clusterArn.logicalId]: {
default: 'ARN of the MSK cluster'
},
[batchSize.logicalId]: {
default: 'Maximum number of items to retrieve in a single batch'
},
[topicName.logicalId]: {
default: 'Name of a Kafka topic to consume (topic must already exist before the stack is launched)'
},
[secretArn.logicalId]: {
default: '(Optional) Secret ARN used for SASL/SCRAM authentication of the brokers in your MSK cluster'
},
[bufferingSize.logicalId]: {
default: 'Size of the buffer (in MBs) that incoming data is buffered before delivery'
},
[bufferingInterval.logicalId]: {
default: 'Length of time (in seconds) that incoming data is buffered before delivery'
},
[compressionFormat.logicalId]: {
default: 'Compression format for delivered data in Amazon S3'
}
}
}
};
//---------------------------------------------------------------------
// Stack outputs
new cdk.CfnOutput(this, 'LambdaFunctionName', {
description: 'Name of the AWS Lambda function',
value: lambdaConsumer.Function.functionName
});
new cdk.CfnOutput(this, 'DeliveryStreamName', {
description: 'Name of the Amazon Kinesis Data Firehose delivery stream',
value: kdfToS3.kinesisFirehose.ref
});
new cdk.CfnOutput(this, 'OutputBucketName', {
description: 'Name of the Amazon S3 destination bucket',
value: outputBucket.Bucket.bucketName
});
}