in source/lib/kdf-delivery-stream.ts [59:189]
constructor(scope: cdk.Construct, id: string, props: DeliveryStreamProps) {
super(scope, id);
const firehoseRole = new iam.Role(this, 'Role', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
inlinePolicies: {
ReadSource: new iam.PolicyDocument({
statements: [new iam.PolicyStatement({
resources: [props.inputDataStream.streamArn],
actions: [
'kinesis:DescribeStream',
'kinesis:DescribeStreamSummary',
'kinesis:GetShardIterator',
'kinesis:GetRecords',
'kinesis:ListShards',
'kinesis:SubscribeToShard'
]
})]
})
}
});
this.Output = new EncryptedBucket(this, 'Output', {
enableIntelligentTiering: true
});
this.OutputBucket.grantWrite(firehoseRole);
const dpEnabledCondition = new cdk.CfnCondition(this, 'DynamicPartitioningEnabled', {
expression: cdk.Fn.conditionEquals(props.dynamicPartitioning, FeatureStatus.Enabled)
});
const dpDisabledCondition = new cdk.CfnCondition(this, 'DynamicPartitioningDisabled', {
expression: cdk.Fn.conditionEquals(props.dynamicPartitioning, FeatureStatus.Disabled)
});
const newLineCondition = new cdk.CfnCondition(this, 'NewLineDelimiter', {
expression: cdk.Fn.conditionEquals(props.newLineDelimiter, FeatureStatus.Enabled)
});
const commonFirehoseProps = {
deliveryStreamType: 'KinesisStreamAsSource',
kinesisStreamSourceConfiguration: {
kinesisStreamArn: props.inputDataStream.streamArn,
roleArn: firehoseRole.roleArn
}
};
const commonDestinationProps = {
bucketArn: this.OutputBucket.bucketArn,
roleArn: firehoseRole.roleArn,
bufferingHints: {
intervalInSeconds: props.bufferingInterval,
sizeInMBs: props.bufferingSize
},
compressionFormat: props.compressionFormat,
prefix: props.dataPrefix,
errorOutputPrefix: props.errorsPrefix
}
const kdfWithoutDP = new firehose.CfnDeliveryStream(this, 'DeliveryStreamWithoutDP', {
...commonFirehoseProps,
extendedS3DestinationConfiguration: {
...commonDestinationProps
}
});
const kdfWithDp = new firehose.CfnDeliveryStream(this, 'DeliveryStreamWithDP', {
...commonFirehoseProps,
extendedS3DestinationConfiguration: {
...commonDestinationProps,
dynamicPartitioningConfiguration: {
enabled: true,
retryOptions: {
durationInSeconds: props.retryDuration
}
},
processingConfiguration: {
enabled: true,
processors: [
{
type: 'MetadataExtraction',
parameters: [
{
parameterName: 'MetadataExtractionQuery',
parameterValue: props.jqExpression!
},
{
parameterName: 'JsonParsingEngine',
parameterValue: 'JQ-1.6'
}
]
},
{
type: 'AppendDelimiterToRecord',
parameters: [{
parameterName: 'Delimiter',
parameterValue: cdk.Fn.conditionIf(newLineCondition.logicalId, '\\n', '').toString()
}]
}
// Other processors can be added here as well.
// For instance, if multi record deaggregation needs to be enabled, you can umcomment the following code:
/*
{
type: 'RecordDeAggregation',
parameters: [{
parameterName: 'SubRecordType',
parameterValue: 'JSON'
}]
}
*/
]
}
}
});
kdfWithoutDP.cfnOptions.condition = dpDisabledCondition;
kdfWithDp.cfnOptions.condition = dpEnabledCondition;
this.DeliveryStreamArn = cdk.Fn.conditionIf(
dpEnabledCondition.logicalId,
kdfWithDp.getAtt('Arn'),
kdfWithoutDP.getAtt('Arn')
).toString();
this.DeliveryStreamName = cdk.Fn.conditionIf(
dpEnabledCondition.logicalId,
kdfWithDp.ref,
kdfWithoutDP.ref
).toString();
}