constructor()

in core/src/data-lake-exporter.ts [59:230]


  constructor(scope: Construct, id: string, props: DataLakeExporterProps) {
    super(scope, id);

    if ( props.deliverySize || 128 > 128 ) { throw 'deliverySize cannot be more than 128MB';}
    if ( props.deliveryInterval || 900 > 900 ) { throw 'deliveryInterval cannot be more than 900s';}

    const stack = Stack.of(this);

    // Get the Bucket from Amazon S3 Location sink
    const sinkBucket = Bucket.fromBucketName(this, 'sinkBucket', props.sinkLocation.bucketName);

    // Create log group for storing Amazon Kinesis Firehose logs.
    const logGroup = new LogGroup(this, 'dataLakeExporterLogGroup', {
      logGroupName: '/data-lake-exporter/',
      removalPolicy: RemovalPolicy.DESTROY,
      retention: RetentionDays.ONE_DAY,
    });

    // Create the Kinesis Firehose log stream.
    const firehoseLogStream = new LogStream(this, 'dataLakeExporterLogStream', {
      logGroup: logGroup,
      logStreamName: 'firehose-stream',
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Create an Amazon IAM Role used by Amazon Kinesis Firehose delivery stream
    const role = new Role(this, 'dataLakeExporterRole', {
      assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
    });

    // add policy to access AWS Glue Database and Table for format conversion
    role.addToPolicy(
      new PolicyStatement({
        resources: [
          props.sourceGlueTable.tableArn,
          props.sourceGlueDatabase.catalogArn,
          props.sourceGlueDatabase.databaseArn,
        ],
        actions: [
          'glue:GetTable',
          'glue:GetTableVersion',
          'glue:GetTableVersions',
        ],
      }),
    );

    // add policy to access source stream
    role.addToPolicy(
      new PolicyStatement({
        resources: [
          props.sourceKinesisDataStream.streamArn,
        ],
        actions: [
          'kinesis:DescribeStream',
          'kinesis:GetShardIterator',
          'kinesis:GetRecords',
          'kinesis:ListShards',
        ],
      }),
    );

    // add policy to write data in Amazon S3 Location
    role.addToPolicy(
      new PolicyStatement({
        resources: [
          stack.formatArn({
            account: '',
            region: '',
            service: 's3',
            resource: props.sinkLocation.bucketName,
            resourceName: props.sinkLocation.objectKey,
          }),
          stack.formatArn({
            account: '',
            region: '',
            service: 's3',
            resource: props.sinkLocation.bucketName,
            resourceName: `${props.sinkLocation.objectKey}/*`,
          }),
          stack.formatArn({
            account: '',
            region: '',
            service: 's3',
            resource: props.sinkLocation.bucketName,
          }),
        ],
        actions: [
          's3:AbortMultipartUpload',
          's3:GetBucketLocation',
          's3:GetObject',
          's3:ListBucket',
          's3:ListBucketMultipartUploads',
          's3:PutObject',
        ],
      }),
    );

    // add policy to write logs to CloudWatch logs
    role.addToPolicy(
      new PolicyStatement({
        resources: [
          `${logGroup.logGroupArn}:log-stream:${firehoseLogStream.logStreamName}`,
        ],
        actions: [
          'logs:PutLogEvents',
        ],
      }),
    );

    // TODO add policy for KMS managed?

    /*     this.ingestionStream = new DeliveryStream(this, 'dataLakeExporter', {
      sourceStream: props.sourceKinesisDataStream,
      destinations: [new S3Bucket(sinkBucket,{
        dataOutputPrefix: props.sinkLocation.objectKey,
        errorOutputPrefix: `${props.sinkLocation.objectKey}-error`,
        logGroup: logGroup,
        compression: Compression.SNAPPY,
        bufferingInterval: Duration.seconds(props.deliveryInterval || 900),
        bufferingSize: Size.mebibytes(props.deliverySize || 128),
      })],
      encryption: StreamEncryption.AWS_OWNED,
    }) */

    // Create the Delivery stream from Cfn because L2 Construct doesn't support conversion to parquet and custom partitioning
    this.cfnIngestionStream = new CfnDeliveryStream(this, 'dataLakeExporter', {
      deliveryStreamType: 'KinesisStreamAsSource',
      deliveryStreamEncryptionConfigurationInput: {
        keyType: 'AWS_OWNED_CMK',
      },
      extendedS3DestinationConfiguration: {
        bucketArn: sinkBucket.bucketArn,
        bufferingHints: {
          intervalInSeconds: props.deliveryInterval || 900,
          sizeInMBs: props.deliverySize || 128,
        },
        cloudWatchLoggingOptions: {
          logGroupName: logGroup.logGroupName,
          logStreamName: firehoseLogStream.logStreamName,
        },
        roleArn: role.roleArn,
        errorOutputPrefix: `${props.sinkLocation.objectKey}-error`,
        prefix: props.sinkLocation.objectKey,
        compressionFormat: 'UNCOMPRESSED',
        s3BackupMode: 'Disabled',
        dataFormatConversionConfiguration: {
          enabled: true,
          inputFormatConfiguration: {
            deserializer: {
              openXJsonSerDe: {},
            },
          },
          outputFormatConfiguration: {
            serializer: {
              parquetSerDe: {},
            },
          },
          schemaConfiguration: {
            roleArn: role.roleArn,
            catalogId: Aws.ACCOUNT_ID,
            region: Aws.REGION,
            databaseName: props.sourceGlueDatabase.databaseName,
            tableName: props.sourceGlueTable.tableName,
          },
        },
      },
      kinesisStreamSourceConfiguration: {
        kinesisStreamArn: '',
        roleArn: role.roleArn,
      },
    });
  }