Gems/AWSMetrics/cdk/aws_metrics/real_time_data_processing.py [31:117]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        self._stack = stack
        self._input_stream_arn = input_stream_arn
        self._application_name = application_name

        self._create_analytics_processing_lambda()

        self._create_analytics_application()

    def _create_analytics_application(self) -> None:
        """
        Generate the Kinesis data analytics application to process the real-time data.
        The sample application filters input events and counts the total number of login within one minute.

        :return: The created Kinesis data analytics application.
        """
        self._analytics_application_role = self._create_analytics_application_role()

        self._analytics_application = analytics.CfnApplication(
            self._stack,
            'AnalyticsApplication',
            application_name=resource_name_sanitizer.sanitize_resource_name(
                f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'),
            inputs=[
                analytics.CfnApplication.InputProperty(
                    input_schema=analytics.CfnApplication.InputSchemaProperty(
                        record_columns=[
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_id',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_id'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_type',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_type'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_name',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_name'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_version',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_version'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_timestamp',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_timestamp'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='application_id',
                                sql_type='VARCHAR(32)',
                                mapping='$.application_id'
                            )
                        ],
                        record_format=analytics.CfnApplication.RecordFormatProperty(
                            record_format_type='JSON'
                        )
                    ),
                    name_prefix='AnalyticsApp',
                    kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty(
                        resource_arn=self._input_stream_arn,
                        role_arn=self._analytics_application_role.role_arn
                    )
                )
            ],
            application_description='',
            application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE
        )

        self._application_output = analytics.CfnApplicationOutput(
            self._stack,
            'AnalyticsApplicationOutput',
            application_name=self._analytics_application.ref,
            output=analytics.CfnApplicationOutput.OutputProperty(
                destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty(
                    record_format_type='JSON'
                ),
                lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty(
                    resource_arn=self._analytics_processing_lambda.function_arn,
                    role_arn=self._analytics_application_role.role_arn
                ),
                name='DESTINATION_STREAM'
            ),
        )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



Gems/AWSMetrics/cdv1/aws_metrics/real_time_data_processing.py [28:114]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        self._stack = stack
        self._input_stream_arn = input_stream_arn
        self._application_name = application_name

        self._create_analytics_processing_lambda()

        self._create_analytics_application()

    def _create_analytics_application(self) -> None:
        """
        Generate the Kinesis data analytics application to process the real-time data.
        The sample application filters input events and counts the total number of login within one minute.

        :return: The created Kinesis data analytics application.
        """
        self._analytics_application_role = self._create_analytics_application_role()

        self._analytics_application = analytics.CfnApplication(
            self._stack,
            'AnalyticsApplication',
            application_name=resource_name_sanitizer.sanitize_resource_name(
                f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'),
            inputs=[
                analytics.CfnApplication.InputProperty(
                    input_schema=analytics.CfnApplication.InputSchemaProperty(
                        record_columns=[
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_id',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_id'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_type',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_type'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_name',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_name'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_version',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_version'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='event_timestamp',
                                sql_type='VARCHAR(32)',
                                mapping='$.event_timestamp'
                            ),
                            analytics.CfnApplication.RecordColumnProperty(
                                name='application_id',
                                sql_type='VARCHAR(32)',
                                mapping='$.application_id'
                            )
                        ],
                        record_format=analytics.CfnApplication.RecordFormatProperty(
                            record_format_type='JSON'
                        )
                    ),
                    name_prefix='AnalyticsApp',
                    kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty(
                        resource_arn=self._input_stream_arn,
                        role_arn=self._analytics_application_role.role_arn
                    )
                )
            ],
            application_description='',
            application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE
        )

        self._application_output = analytics.CfnApplicationOutput(
            self._stack,
            'AnalyticsApplicationOutput',
            application_name=self._analytics_application.ref,
            output=analytics.CfnApplicationOutput.OutputProperty(
                destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty(
                    record_format_type='JSON'
                ),
                lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty(
                    resource_arn=self._analytics_processing_lambda.function_arn,
                    role_arn=self._analytics_application_role.role_arn
                ),
                name='DESTINATION_STREAM'
            ),
        )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



