createKinesis()

in infrastructure/cdk/lib/layer/ingestionConsumptionLayer.ts [48:174]


    createKinesis(props: IParameterAwareProps) {

        this.kinesisStreams = new KDS.Stream(this, props.getApplicationName() + 'InputStream', {
            streamName: props.getApplicationName() + '_InputStream',
            shardCount: 1
        });
    
        // MISSING KINESIS INTEGRATION
        if (this.KINESIS_INTEGRATION) {
            new KinesisEventSource( this.kinesisStreams , {
                batchSize: 700,
                startingPosition : Lambda.StartingPosition.LATEST
            }).bind(<Lambda.Function> props.getParameter('lambda.scoreboard'));
        }
    
        // MISSING KINESIS FIREHOSE
        //section starts here
        if (this.FIREHOSE) {
            let firehoseName = props.getApplicationName() + '_Firehose';
            let firehoseLogGroupName = '/aws/kinesisfirehose/' + firehoseName;
            let firehoseLogGroup = new Logs.LogGroup(this,props.getApplicationName()+'firehoseloggroup', {
                logGroupName : firehoseLogGroupName
            });
            new Logs.LogStream(this,props.getApplicationName()+'firehoselogstream', {
                logGroup : firehoseLogGroup,
                logStreamName : "error"
            });
            let self = this;
            let firehoseRole = new IAM.Role(this, props.getApplicationName()+ 'FirehoseToStreamsRole', {
                roleName: props.getApplicationName() + 'FirehoseToStreamsRole',
                assumedBy: new IAM.ServicePrincipal('firehose.amazonaws.com'),
                inlinePolicies: {
                    'GluePermissions' : new IAM.PolicyDocument({
                        statements : [
                            new PolicyStatement({
                                actions : [
                                  "glue:GetTableVersions"
                                ],
                                resources : ["*"]
                            })
                        ]
                    }),
                    'S3RawDataPermission': new IAM.PolicyDocument({
                        statements : [
                            new PolicyStatement(
                                {
                                    actions : [
                                        's3:AbortMultipartUpload',
                                        's3:GetBucketLocation',
                                        's3:GetObject',
                                        's3:ListBucket',
                                        's3:ListBucketMultipartUploads',
                                        's3:PutObject',
                                    ],
                                    resources : [
                                        self.rawbucketarn,
                                        self.rawbucketarn + '/*'
                                    ]
                                }
                            )
                        ]
                    }),
                    'DefaultFirehoseLambda' : new IAM.PolicyDocument({
                        statements : [
                            new PolicyStatement({
                                actions: [
                                    "lambda:InvokeFunction",
                                    "lambda:GetFunctionConfiguration"
                                ],
                                resources : [
                                    "arn:aws:lambda:"+props.region+":"+props.accountId+":function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%"
                                ] 
                            })
                        ]
                    }),
                    'InputStreamReadPermissions': new PolicyDocument({
                        statements : [
                            new PolicyStatement({
                                actions : [
                                    'kinesis:DescribeStream',
                                    'kinesis:GetShardIterator',
                                    'kinesis:GetRecords'
                                ],
                                resources : [
                                    this.kinesisStreams.streamArn
                                ]
                            })
                        ]
                    }),
                    'CloudWatchLogsPermissions': new PolicyDocument({
                        statements : [
                            new PolicyStatement({
                                actions : [ 'logs:PutLogEvents' ],
                                resources : [
                                    'arn:aws:logs:' + props.region + ':' + props.accountId + ':log-group:/'+firehoseLogGroupName+':log-stream:*'
                                ]
                            })
                        ]
                    })
                }
            });
            
            this.kinesisFirehose = new KDF.CfnDeliveryStream(this, props.getApplicationName() + 'RawData', {
                deliveryStreamType: 'KinesisStreamAsSource',
                deliveryStreamName: firehoseName,
                kinesisStreamSourceConfiguration: {
                    kinesisStreamArn: this.kinesisStreams.streamArn,
                    roleArn: firehoseRole.roleArn
                }
                , s3DestinationConfiguration: {
                    bucketArn: <string>this.rawbucketarn,
                    bufferingHints: {
                        intervalInSeconds: 300,
                        sizeInMBs: 1
                    },
                    compressionFormat: 'GZIP',
                    roleArn: firehoseRole.roleArn,
                    cloudWatchLoggingOptions: {
                        enabled: true,
                        logGroupName: firehoseLogGroupName,
                        logStreamName: firehoseLogGroupName
                    }
                }
            });
            this.kinesisFirehose.node.addDependency(firehoseLogGroup);
        }
    }