constructor()

in source/lib/kdf-delivery-stream.ts [59:189]


    constructor(scope: cdk.Construct, id: string, props: DeliveryStreamProps) {
        super(scope, id);

        const firehoseRole = new iam.Role(this, 'Role', {
            assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
            inlinePolicies: {
                ReadSource: new iam.PolicyDocument({
                    statements: [new iam.PolicyStatement({
                        resources: [props.inputDataStream.streamArn],
                        actions: [
                            'kinesis:DescribeStream',
                            'kinesis:DescribeStreamSummary',
                            'kinesis:GetShardIterator',
                            'kinesis:GetRecords',
                            'kinesis:ListShards',
                            'kinesis:SubscribeToShard'
                        ]
                    })]
                })
            }
        });

        this.Output = new EncryptedBucket(this, 'Output', {
            enableIntelligentTiering: true
        });

        this.OutputBucket.grantWrite(firehoseRole);

        const dpEnabledCondition = new cdk.CfnCondition(this, 'DynamicPartitioningEnabled', {
            expression: cdk.Fn.conditionEquals(props.dynamicPartitioning, FeatureStatus.Enabled)
        });

        const dpDisabledCondition = new cdk.CfnCondition(this, 'DynamicPartitioningDisabled', {
            expression: cdk.Fn.conditionEquals(props.dynamicPartitioning, FeatureStatus.Disabled)
        });

        const newLineCondition = new cdk.CfnCondition(this, 'NewLineDelimiter', {
            expression: cdk.Fn.conditionEquals(props.newLineDelimiter, FeatureStatus.Enabled)
        });

        const commonFirehoseProps = {
            deliveryStreamType: 'KinesisStreamAsSource',
            kinesisStreamSourceConfiguration: {
                kinesisStreamArn: props.inputDataStream.streamArn,
                roleArn: firehoseRole.roleArn
            }
        };

        const commonDestinationProps = {
            bucketArn: this.OutputBucket.bucketArn,
            roleArn: firehoseRole.roleArn,
            bufferingHints: {
                intervalInSeconds: props.bufferingInterval,
                sizeInMBs: props.bufferingSize
            },
            compressionFormat: props.compressionFormat,
            prefix: props.dataPrefix,
            errorOutputPrefix: props.errorsPrefix
        }

        const kdfWithoutDP = new firehose.CfnDeliveryStream(this, 'DeliveryStreamWithoutDP', {
            ...commonFirehoseProps,
            extendedS3DestinationConfiguration: {
                ...commonDestinationProps
            }
        });

        const kdfWithDp = new firehose.CfnDeliveryStream(this, 'DeliveryStreamWithDP', {
            ...commonFirehoseProps,
            extendedS3DestinationConfiguration: {
                ...commonDestinationProps,
                dynamicPartitioningConfiguration: {
                    enabled: true,
                    retryOptions: {
                        durationInSeconds: props.retryDuration
                    }
                },
                processingConfiguration: {
                    enabled: true,
                    processors: [
                        {
                            type: 'MetadataExtraction',
                            parameters: [
                                {
                                    parameterName: 'MetadataExtractionQuery',
                                    parameterValue: props.jqExpression!
                                },
                                {
                                    parameterName: 'JsonParsingEngine',
                                    parameterValue: 'JQ-1.6'
                                }
                            ]
                        },
                        {
                            type: 'AppendDelimiterToRecord',
                            parameters: [{
                                parameterName: 'Delimiter',
                                parameterValue: cdk.Fn.conditionIf(newLineCondition.logicalId, '\\n', '').toString()
                            }]
                        }
                        // Other processors can be added here as well.
                        // For instance, if multi record deaggregation needs to be enabled, you can umcomment the following code:
                        /*
                        {
                            type: 'RecordDeAggregation',
                            parameters: [{
                                parameterName: 'SubRecordType',
                                parameterValue: 'JSON'
                            }]
                        }
                        */
                    ]
                }
            }
        });

        kdfWithoutDP.cfnOptions.condition = dpDisabledCondition;
        kdfWithDp.cfnOptions.condition = dpEnabledCondition;

        this.DeliveryStreamArn = cdk.Fn.conditionIf(
            dpEnabledCondition.logicalId,
            kdfWithDp.getAtt('Arn'),
            kdfWithoutDP.getAtt('Arn')
        ).toString();

        this.DeliveryStreamName = cdk.Fn.conditionIf(
            dpEnabledCondition.logicalId,
            kdfWithDp.ref,
            kdfWithoutDP.ref
        ).toString();
    }