in infrastructure/cdk/lib/layer/ingestionConsumptionLayer.ts [48:174]
createKinesis(props: IParameterAwareProps) {
this.kinesisStreams = new KDS.Stream(this, props.getApplicationName() + 'InputStream', {
streamName: props.getApplicationName() + '_InputStream',
shardCount: 1
});
// MISSING KINESIS INTEGRATION
if (this.KINESIS_INTEGRATION) {
new KinesisEventSource( this.kinesisStreams , {
batchSize: 700,
startingPosition : Lambda.StartingPosition.LATEST
}).bind(<Lambda.Function> props.getParameter('lambda.scoreboard'));
}
// MISSING KINESIS FIREHOSE
//section starts here
if (this.FIREHOSE) {
let firehoseName = props.getApplicationName() + '_Firehose';
let firehoseLogGroupName = '/aws/kinesisfirehose/' + firehoseName;
let firehoseLogGroup = new Logs.LogGroup(this,props.getApplicationName()+'firehoseloggroup', {
logGroupName : firehoseLogGroupName
});
new Logs.LogStream(this,props.getApplicationName()+'firehoselogstream', {
logGroup : firehoseLogGroup,
logStreamName : "error"
});
let self = this;
let firehoseRole = new IAM.Role(this, props.getApplicationName()+ 'FirehoseToStreamsRole', {
roleName: props.getApplicationName() + 'FirehoseToStreamsRole',
assumedBy: new IAM.ServicePrincipal('firehose.amazonaws.com'),
inlinePolicies: {
'GluePermissions' : new IAM.PolicyDocument({
statements : [
new PolicyStatement({
actions : [
"glue:GetTableVersions"
],
resources : ["*"]
})
]
}),
'S3RawDataPermission': new IAM.PolicyDocument({
statements : [
new PolicyStatement(
{
actions : [
's3:AbortMultipartUpload',
's3:GetBucketLocation',
's3:GetObject',
's3:ListBucket',
's3:ListBucketMultipartUploads',
's3:PutObject',
],
resources : [
self.rawbucketarn,
self.rawbucketarn + '/*'
]
}
)
]
}),
'DefaultFirehoseLambda' : new IAM.PolicyDocument({
statements : [
new PolicyStatement({
actions: [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
resources : [
"arn:aws:lambda:"+props.region+":"+props.accountId+":function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%"
]
})
]
}),
'InputStreamReadPermissions': new PolicyDocument({
statements : [
new PolicyStatement({
actions : [
'kinesis:DescribeStream',
'kinesis:GetShardIterator',
'kinesis:GetRecords'
],
resources : [
this.kinesisStreams.streamArn
]
})
]
}),
'CloudWatchLogsPermissions': new PolicyDocument({
statements : [
new PolicyStatement({
actions : [ 'logs:PutLogEvents' ],
resources : [
'arn:aws:logs:' + props.region + ':' + props.accountId + ':log-group:/'+firehoseLogGroupName+':log-stream:*'
]
})
]
})
}
});
this.kinesisFirehose = new KDF.CfnDeliveryStream(this, props.getApplicationName() + 'RawData', {
deliveryStreamType: 'KinesisStreamAsSource',
deliveryStreamName: firehoseName,
kinesisStreamSourceConfiguration: {
kinesisStreamArn: this.kinesisStreams.streamArn,
roleArn: firehoseRole.roleArn
}
, s3DestinationConfiguration: {
bucketArn: <string>this.rawbucketarn,
bufferingHints: {
intervalInSeconds: 300,
sizeInMBs: 1
},
compressionFormat: 'GZIP',
roleArn: firehoseRole.roleArn,
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: firehoseLogGroupName,
logStreamName: firehoseLogGroupName
}
}
});
this.kinesisFirehose.node.addDependency(firehoseLogGroup);
}
}