in Gems/AWSMetrics/cdv1/aws_metrics/batch_processing.py [0:0]
def _create_events_firehose_delivery_stream(self) -> None:
"""
Generate the Kinesis Data Firehose delivery stream to convert input data and deliver them to the data lake.
"""
self._create_firehose_s3_delivery_log_option()
self._create_data_firehose_role()
self._events_firehose_delivery_stream = kinesisfirehose.CfnDeliveryStream(
self._stack,
id=f'EventsFirehoseDeliveryStream',
delivery_stream_type='KinesisStreamAsSource',
delivery_stream_name=resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-EventsFirehoseDeliveryStream', 'firehose_delivery_stream'),
kinesis_stream_source_configuration=kinesisfirehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=self._input_stream_arn,
role_arn=self._firehose_delivery_stream_role.role_arn
),
extended_s3_destination_configuration=kinesisfirehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
bucket_arn=self._analytics_bucket_arn,
buffering_hints=kinesisfirehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_INTERVAL_IN_SECONDS,
size_in_m_bs=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_SIZE_IN_MBS
),
prefix=aws_metrics_constants.S3_DESTINATION_PREFIX,
error_output_prefix=aws_metrics_constants.S3_DESTINATION_ERROR_OUTPUT_PREFIX,
role_arn=self._firehose_delivery_stream_role.role_arn,
compression_format=aws_metrics_constants.S3_COMPRESSION_FORMAT,
cloud_watch_logging_options=kinesisfirehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
enabled=True,
log_group_name=self._firehose_delivery_stream_log_group.log_group_name,
log_stream_name=self._firehose_s3_delivery_log_stream.log_stream_name
),
processing_configuration=kinesisfirehose.CfnDeliveryStream.ProcessingConfigurationProperty(
enabled=True,
processors=[
kinesisfirehose.CfnDeliveryStream.ProcessorProperty(
type='Lambda',
parameters=[
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='LambdaArn',
parameter_value=self._events_processing_lambda.function_arn
),
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='BufferIntervalInSeconds',
parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_INTERVAL_IN_SECONDS
),
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='BufferSizeInMBs',
parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_SIZE_IN_MBS
),
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='NumberOfRetries',
parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_NUM_OF_RETRIES
)
]
)
]
),
data_format_conversion_configuration=kinesisfirehose.CfnDeliveryStream.DataFormatConversionConfigurationProperty(
enabled=True,
input_format_configuration=kinesisfirehose.CfnDeliveryStream.InputFormatConfigurationProperty(
deserializer=kinesisfirehose.CfnDeliveryStream.DeserializerProperty(
open_x_json_ser_de=kinesisfirehose.CfnDeliveryStream.OpenXJsonSerDeProperty(
case_insensitive=True,
convert_dots_in_json_keys_to_underscores=False
)
)
),
output_format_configuration=kinesisfirehose.CfnDeliveryStream.OutputFormatConfigurationProperty(
serializer=kinesisfirehose.CfnDeliveryStream.SerializerProperty(
parquet_ser_de=kinesisfirehose.CfnDeliveryStream.ParquetSerDeProperty(
compression=aws_metrics_constants.PARQUET_SER_DE_COMPRESSION
)
)
),
schema_configuration=kinesisfirehose.CfnDeliveryStream.SchemaConfigurationProperty(
catalog_id=self._stack.account,
role_arn=self._firehose_delivery_stream_role.role_arn,
database_name=self._events_database_name,
table_name=self._events_table_name,
region=self._stack.region,
version_id='LATEST'
)
)
)
)