Gems/AWSMetrics/cdk/aws_metrics/real_time_data_processing.py (234 lines of code) (raw):

""" Copyright (c) Contributors to the Open 3D Engine Project. For complete copyright and license terms please see the LICENSE at the root of this distribution. SPDX-License-Identifier: Apache-2.0 OR MIT """ from constructs import Construct from aws_cdk import ( CfnOutput, Duration, Fn, aws_iam as iam, aws_kinesisanalytics as analytics, aws_lambda as lambda_, aws_logs as logs ) import os from . import aws_metrics_constants from .aws_utils import resource_name_sanitizer class RealTimeDataProcessing: """ Create the AWS resources used for real time data processing """ def __init__(self, stack: Construct, input_stream_arn: str, application_name: str) -> None: self._stack = stack self._input_stream_arn = input_stream_arn self._application_name = application_name self._create_analytics_processing_lambda() self._create_analytics_application() def _create_analytics_application(self) -> None: """ Generate the Kinesis data analytics application to process the real-time data. The sample application filters input events and counts the total number of login within one minute. :return: The created Kinesis data analytics application. """ self._analytics_application_role = self._create_analytics_application_role() self._analytics_application = analytics.CfnApplication( self._stack, 'AnalyticsApplication', application_name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'), inputs=[ analytics.CfnApplication.InputProperty( input_schema=analytics.CfnApplication.InputSchemaProperty( record_columns=[ analytics.CfnApplication.RecordColumnProperty( name='event_id', sql_type='VARCHAR(32)', mapping='$.event_id' ), analytics.CfnApplication.RecordColumnProperty( name='event_type', sql_type='VARCHAR(32)', mapping='$.event_type' ), analytics.CfnApplication.RecordColumnProperty( name='event_name', sql_type='VARCHAR(32)', mapping='$.event_name' ), analytics.CfnApplication.RecordColumnProperty( name='event_version', sql_type='VARCHAR(32)', mapping='$.event_version' ), analytics.CfnApplication.RecordColumnProperty( name='event_timestamp', sql_type='VARCHAR(32)', mapping='$.event_timestamp' ), analytics.CfnApplication.RecordColumnProperty( name='application_id', sql_type='VARCHAR(32)', mapping='$.application_id' ) ], record_format=analytics.CfnApplication.RecordFormatProperty( record_format_type='JSON' ) ), name_prefix='AnalyticsApp', kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty( resource_arn=self._input_stream_arn, role_arn=self._analytics_application_role.role_arn ) ) ], application_description='', application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE ) self._application_output = analytics.CfnApplicationOutput( self._stack, 'AnalyticsApplicationOutput', application_name=self._analytics_application.ref, output=analytics.CfnApplicationOutput.OutputProperty( destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty( record_format_type='JSON' ), lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty( resource_arn=self._analytics_processing_lambda.function_arn, role_arn=self._analytics_application_role.role_arn ), name='DESTINATION_STREAM' ), ) CfnOutput( self._stack, id='AnalyticsApplicationName', description='Kinesis Data Analytics application to process the real-time metrics data', export_name=f"{self._application_name}:AnalyticsApplication", value=self._analytics_application.application_name) def _create_analytics_application_role(self) -> iam.Role: """ Generate the IAM role for the Kinesis analytics application to read events from the input stream and send the processed data to the analytics processing lambda. :return: The created IAM role. """ kinesis_access_policy_document = iam.PolicyDocument( statements=[ iam.PolicyStatement( actions=[ 'kinesis:DescribeStream', 'kinesis:GetShardIterator', 'kinesis:GetRecords', 'kinesis:ListShards' ], effect=iam.Effect.ALLOW, sid='ReadKinesisStream', resources=[ self._input_stream_arn ] ) ] ) lambda_access_policy_document = iam.PolicyDocument( statements=[ iam.PolicyStatement( actions=[ 'lambda:InvokeFunction', 'lambda:GetFunctionConfiguration', ], effect=iam.Effect.ALLOW, sid='AnalyticsProcessingInvokePermissions', resources=[ self._analytics_processing_lambda.function_arn ] ) ] ) kinesis_analytics_role = iam.Role( self._stack, id='AnalyticsApplicationRole', role_name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-AnalyticsApplicationRole', 'iam_role'), assumed_by=iam.ServicePrincipal( service='kinesisanalytics.amazonaws.com' ), inline_policies={ 'KinesisAccess': kinesis_access_policy_document, 'LambdaAccess': lambda_access_policy_document } ) return kinesis_analytics_role def _create_analytics_processing_lambda(self) -> None: """ Generate the analytics processing lambda to send processed data to CloudWatch for visualization. """ analytics_processing_function_name = resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-AnalyticsProcessingLambda', 'lambda_function') self._analytics_processing_lambda_role = self._create_analytics_processing_lambda_role( analytics_processing_function_name ) self._analytics_processing_lambda = lambda_.Function( self._stack, id='AnalyticsProcessingLambda', function_name=analytics_processing_function_name, log_retention=logs.RetentionDays.ONE_MONTH, memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB, runtime=lambda_.Runtime.PYTHON_3_9, timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES), handler='analytics_processing.lambda_handler', code=lambda_.Code.from_asset( os.path.join(os.path.dirname(__file__), 'lambdas', 'analytics_processing_lambda')), role=self._analytics_processing_lambda_role ) CfnOutput( self._stack, id='AnalyticsProcessingLambdaName', description='Lambda function for sending processed data to CloudWatch.', export_name=f"{self._application_name}:AnalyticsProcessingLambda", value=self._analytics_processing_lambda.function_name) def _create_analytics_processing_lambda_role(self, function_name: str) -> iam.Role: """ Generate the IAM role for the analytics processing lambda to send metrics to CloudWatch. @param function_name Name of the Lambda function. @return The created IAM role. """ analytics_processing_policy_document = iam.PolicyDocument( statements=[ # The following policy limits the user to publishing metrics only in the namespace named AWSMetrics. # Check the following document for more details: # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/iam-cw-condition-keys-namespace.html iam.PolicyStatement( actions=[ 'cloudwatch:PutMetricData', ], effect=iam.Effect.ALLOW, resources=[ '*' ], conditions={ "StringEquals": { "cloudwatch:namespace": "AWSMetrics" } } ), iam.PolicyStatement( actions=[ 'logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutDestination', 'logs:PutLogEvents' ], effect=iam.Effect.ALLOW, resources=[ Fn.sub( 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:' '/aws/lambda/${FunctionName}*', variables={ 'FunctionName': function_name } ) ] ) ] ) analytics_processing_lambda_role = iam.Role( self._stack, id='AnalyticsLambdaRole', role_name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-AnalyticsLambdaRole', 'iam_role'), assumed_by=iam.ServicePrincipal( service='lambda.amazonaws.com' ), inline_policies={ 'AnalyticsProcessingPolicy': analytics_processing_policy_document } ) return analytics_processing_lambda_role @property def analytics_processing_lambda_name(self) -> lambda_.Function.function_name: return self._analytics_processing_lambda.function_name @property def analytics_processing_lambda_arn(self) -> lambda_.Function.function_arn: return self._analytics_processing_lambda.function_arn @property def analytics_application_lambda_role_arn(self) -> iam.Role.role_arn: return self._analytics_processing_lambda_role.role_arn @property def analytics_application_name(self) -> analytics.CfnApplication.application_name: return self._analytics_application.application_name @property def analytics_application_role_arn(self) -> iam.Role.role_arn: return self._analytics_application_role.role_arn