in lib/common/utils.ts [12:90]
export function CreateKirehoseDataStream(stack: cdk.Construct, streamName: string, index: string,
osDomain: opensearch.Domain, rawDataBucket: s3.Bucket, transformer?: lambda.Function): CfnDeliveryStream {
const firehoseRole = new iam.Role(stack, 'firehoseRole_'+streamName, {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
managedPolicies: [
new ManagedPolicy(stack, 'policy_'+streamName, {
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['s3:*'],
resources: ['*'],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['es:*'],
resources: ['*'],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['lambda:*'],
resources: ['*'],
}),
],
}),
],
});
const firehoseStreamToS3 = new kinesisfirehose.CfnDeliveryStream(stack, 'FirehoseStreamToS3_'+streamName, {
deliveryStreamName: streamName,
deliveryStreamType: 'DirectPut',
amazonopensearchserviceDestinationConfiguration:{
processingConfiguration: transformer == undefined ? {} : {
enabled: true,
processors: [
{
type: 'Lambda',
parameters: [
{ parameterName: 'LambdaArn',
parameterValue: transformer!.functionArn
},
]
}
]
},
retryOptions: {
durationInSeconds: 5,
},
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: streamName,
logStreamName: streamName,
},
domainArn: osDomain!.domainArn,
indexName: index,
roleArn: firehoseRole.roleArn,
s3BackupMode: 'FailedDocumentsOnly',
bufferingHints: {
intervalInSeconds: 60,
sizeInMBs: 1,
},
s3Configuration: {
bucketArn: rawDataBucket.bucketArn,
bufferingHints: {
intervalInSeconds: 60,
sizeInMBs: 1,
},
compressionFormat: 'UNCOMPRESSED',
roleArn: firehoseRole.roleArn,
},
},
});
firehoseStreamToS3.node.addDependency(osDomain);
firehoseStreamToS3.node.addDependency(rawDataBucket);
firehoseStreamToS3.node.addDependency(firehoseRole);
return firehoseStreamToS3;
}