in core/src/data-lake-exporter.ts [59:230]
constructor(scope: Construct, id: string, props: DataLakeExporterProps) {
super(scope, id);
if ( props.deliverySize || 128 > 128 ) { throw 'deliverySize cannot be more than 128MB';}
if ( props.deliveryInterval || 900 > 900 ) { throw 'deliveryInterval cannot be more than 900s';}
const stack = Stack.of(this);
// Get the Bucket from Amazon S3 Location sink
const sinkBucket = Bucket.fromBucketName(this, 'sinkBucket', props.sinkLocation.bucketName);
// Create log group for storing Amazon Kinesis Firehose logs.
const logGroup = new LogGroup(this, 'dataLakeExporterLogGroup', {
logGroupName: '/data-lake-exporter/',
removalPolicy: RemovalPolicy.DESTROY,
retention: RetentionDays.ONE_DAY,
});
// Create the Kinesis Firehose log stream.
const firehoseLogStream = new LogStream(this, 'dataLakeExporterLogStream', {
logGroup: logGroup,
logStreamName: 'firehose-stream',
removalPolicy: RemovalPolicy.DESTROY,
});
// Create an Amazon IAM Role used by Amazon Kinesis Firehose delivery stream
const role = new Role(this, 'dataLakeExporterRole', {
assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
});
// add policy to access AWS Glue Database and Table for format conversion
role.addToPolicy(
new PolicyStatement({
resources: [
props.sourceGlueTable.tableArn,
props.sourceGlueDatabase.catalogArn,
props.sourceGlueDatabase.databaseArn,
],
actions: [
'glue:GetTable',
'glue:GetTableVersion',
'glue:GetTableVersions',
],
}),
);
// add policy to access source stream
role.addToPolicy(
new PolicyStatement({
resources: [
props.sourceKinesisDataStream.streamArn,
],
actions: [
'kinesis:DescribeStream',
'kinesis:GetShardIterator',
'kinesis:GetRecords',
'kinesis:ListShards',
],
}),
);
// add policy to write data in Amazon S3 Location
role.addToPolicy(
new PolicyStatement({
resources: [
stack.formatArn({
account: '',
region: '',
service: 's3',
resource: props.sinkLocation.bucketName,
resourceName: props.sinkLocation.objectKey,
}),
stack.formatArn({
account: '',
region: '',
service: 's3',
resource: props.sinkLocation.bucketName,
resourceName: `${props.sinkLocation.objectKey}/*`,
}),
stack.formatArn({
account: '',
region: '',
service: 's3',
resource: props.sinkLocation.bucketName,
}),
],
actions: [
's3:AbortMultipartUpload',
's3:GetBucketLocation',
's3:GetObject',
's3:ListBucket',
's3:ListBucketMultipartUploads',
's3:PutObject',
],
}),
);
// add policy to write logs to CloudWatch logs
role.addToPolicy(
new PolicyStatement({
resources: [
`${logGroup.logGroupArn}:log-stream:${firehoseLogStream.logStreamName}`,
],
actions: [
'logs:PutLogEvents',
],
}),
);
// TODO add policy for KMS managed?
/* this.ingestionStream = new DeliveryStream(this, 'dataLakeExporter', {
sourceStream: props.sourceKinesisDataStream,
destinations: [new S3Bucket(sinkBucket,{
dataOutputPrefix: props.sinkLocation.objectKey,
errorOutputPrefix: `${props.sinkLocation.objectKey}-error`,
logGroup: logGroup,
compression: Compression.SNAPPY,
bufferingInterval: Duration.seconds(props.deliveryInterval || 900),
bufferingSize: Size.mebibytes(props.deliverySize || 128),
})],
encryption: StreamEncryption.AWS_OWNED,
}) */
// Create the Delivery stream from Cfn because L2 Construct doesn't support conversion to parquet and custom partitioning
this.cfnIngestionStream = new CfnDeliveryStream(this, 'dataLakeExporter', {
deliveryStreamType: 'KinesisStreamAsSource',
deliveryStreamEncryptionConfigurationInput: {
keyType: 'AWS_OWNED_CMK',
},
extendedS3DestinationConfiguration: {
bucketArn: sinkBucket.bucketArn,
bufferingHints: {
intervalInSeconds: props.deliveryInterval || 900,
sizeInMBs: props.deliverySize || 128,
},
cloudWatchLoggingOptions: {
logGroupName: logGroup.logGroupName,
logStreamName: firehoseLogStream.logStreamName,
},
roleArn: role.roleArn,
errorOutputPrefix: `${props.sinkLocation.objectKey}-error`,
prefix: props.sinkLocation.objectKey,
compressionFormat: 'UNCOMPRESSED',
s3BackupMode: 'Disabled',
dataFormatConversionConfiguration: {
enabled: true,
inputFormatConfiguration: {
deserializer: {
openXJsonSerDe: {},
},
},
outputFormatConfiguration: {
serializer: {
parquetSerDe: {},
},
},
schemaConfiguration: {
roleArn: role.roleArn,
catalogId: Aws.ACCOUNT_ID,
region: Aws.REGION,
databaseName: props.sourceGlueDatabase.databaseName,
tableName: props.sourceGlueTable.tableName,
},
},
},
kinesisStreamSourceConfiguration: {
kinesisStreamArn: '',
roleArn: role.roleArn,
},
});
}