in Gems/AWSMetrics/cdv1/aws_metrics/real_time_data_processing.py [0:0]
def _create_analytics_application(self) -> None:
"""
Generate the Kinesis data analytics application to process the real-time data.
The sample application filters input events and counts the total number of login within one minute.
:return: The created Kinesis data analytics application.
"""
self._analytics_application_role = self._create_analytics_application_role()
self._analytics_application = analytics.CfnApplication(
self._stack,
'AnalyticsApplication',
application_name=resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'),
inputs=[
analytics.CfnApplication.InputProperty(
input_schema=analytics.CfnApplication.InputSchemaProperty(
record_columns=[
analytics.CfnApplication.RecordColumnProperty(
name='event_id',
sql_type='VARCHAR(32)',
mapping='$.event_id'
),
analytics.CfnApplication.RecordColumnProperty(
name='event_type',
sql_type='VARCHAR(32)',
mapping='$.event_type'
),
analytics.CfnApplication.RecordColumnProperty(
name='event_name',
sql_type='VARCHAR(32)',
mapping='$.event_name'
),
analytics.CfnApplication.RecordColumnProperty(
name='event_version',
sql_type='VARCHAR(32)',
mapping='$.event_version'
),
analytics.CfnApplication.RecordColumnProperty(
name='event_timestamp',
sql_type='VARCHAR(32)',
mapping='$.event_timestamp'
),
analytics.CfnApplication.RecordColumnProperty(
name='application_id',
sql_type='VARCHAR(32)',
mapping='$.application_id'
)
],
record_format=analytics.CfnApplication.RecordFormatProperty(
record_format_type='JSON'
)
),
name_prefix='AnalyticsApp',
kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty(
resource_arn=self._input_stream_arn,
role_arn=self._analytics_application_role.role_arn
)
)
],
application_description='',
application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE
)
self._application_output = analytics.CfnApplicationOutput(
self._stack,
'AnalyticsApplicationOutput',
application_name=self._analytics_application.ref,
output=analytics.CfnApplicationOutput.OutputProperty(
destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty(
record_format_type='JSON'
),
lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty(
resource_arn=self._analytics_processing_lambda.function_arn,
role_arn=self._analytics_application_role.role_arn
),
name='DESTINATION_STREAM'
),
)
core.CfnOutput(
self._stack,
id='AnalyticsApplicationName',
description='Kinesis Data Analytics application to process the real-time metrics data',
export_name=f"{self._application_name}:AnalyticsApplication",
value=self._analytics_application.application_name)