Gems/AWSMetrics/cdk/aws_metrics/batch_processing.py (297 lines of code) (raw):
"""
Copyright (c) Contributors to the Open 3D Engine Project.
For complete copyright and license terms please see the LICENSE at the root of this distribution.
SPDX-License-Identifier: Apache-2.0 OR MIT
"""
from constructs import Construct
from aws_cdk import (
CfnOutput,
Duration,
Fn,
RemovalPolicy,
aws_kinesisfirehose as kinesisfirehose,
aws_iam as iam,
aws_lambda as lambda_,
aws_logs as logs
)
import os
from . import aws_metrics_constants
from .aws_utils import resource_name_sanitizer
class BatchProcessing:
"""
Create the AWS resources including the events processing Lambda and
the Kinesis Data Firehose delivery stream for batch processing.
"""
def __init__(self,
stack: Construct,
application_name: str,
input_stream_arn: str,
analytics_bucket_arn: str,
events_database_name: str,
events_table_name) -> None:
self._stack = stack
self._application_name = application_name
self._input_stream_arn = input_stream_arn
self._analytics_bucket_arn = analytics_bucket_arn
self._events_database_name = events_database_name
self._events_table_name = events_table_name
self._create_events_processing_lambda()
self._create_events_firehose_delivery_stream()
def _create_events_processing_lambda(self) -> None:
"""
Generate the events processing lambda to filter the invalid metrics events.
"""
events_processing_lambda_name = resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-EventsProcessingLambda', 'lambda_function')
self._create_events_processing_lambda_role(events_processing_lambda_name)
self._events_processing_lambda = lambda_.Function(
self._stack,
id='EventsProcessingLambda',
function_name=events_processing_lambda_name,
log_retention=logs.RetentionDays.ONE_MONTH,
memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB,
runtime=lambda_.Runtime.PYTHON_3_9,
timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES),
handler='events_processing.lambda_handler',
code=lambda_.Code.from_asset(
os.path.join(os.path.dirname(__file__), 'lambdas', 'events_processing_lambda')),
role=self._events_processing_lambda_role
)
CfnOutput(
self._stack,
id='EventProcessingLambdaName',
description='Lambda function for processing metrics events data.',
export_name=f"{self._application_name}:EventProcessingLambda",
value=self._events_processing_lambda.function_name)
def _create_events_processing_lambda_role(self, function_name: str) -> None:
"""
Generate the IAM role for the events processing Lambda.
"""
events_processing_lambda_policy_document = iam.PolicyDocument(
statements=[
iam.PolicyStatement(
actions=[
'logs:CreateLogGroup',
'logs:CreateLogStream',
'logs:PutDestination',
'logs:PutLogEvents'
],
effect=iam.Effect.ALLOW,
resources=[
Fn.sub(
'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:'
'/aws/lambda/${FunctionName}*',
variables={
'FunctionName': function_name
}
)
]
)
]
)
self._events_processing_lambda_role = iam.Role(
self._stack,
id='EventsProcessingLambdaRole',
role_name=resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-EventsProcessingLambdaRole', 'iam_role'),
assumed_by=iam.ServicePrincipal(
service='lambda.amazonaws.com'
),
inline_policies={
'EventsProcessingLambdaPolicy': events_processing_lambda_policy_document
}
)
def _create_events_firehose_delivery_stream(self) -> None:
"""
Generate the Kinesis Data Firehose delivery stream to convert input data and deliver them to the data lake.
"""
self._create_firehose_s3_delivery_log_option()
self._create_data_firehose_role()
self._events_firehose_delivery_stream = kinesisfirehose.CfnDeliveryStream(
self._stack,
id=f'EventsFirehoseDeliveryStream',
delivery_stream_type='KinesisStreamAsSource',
delivery_stream_name=resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-EventsFirehoseDeliveryStream', 'firehose_delivery_stream'),
kinesis_stream_source_configuration=kinesisfirehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=self._input_stream_arn,
role_arn=self._firehose_delivery_stream_role.role_arn
),
extended_s3_destination_configuration=kinesisfirehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
bucket_arn=self._analytics_bucket_arn,
buffering_hints=kinesisfirehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_INTERVAL_IN_SECONDS,
size_in_m_bs=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_SIZE_IN_MBS
),
prefix=aws_metrics_constants.S3_DESTINATION_PREFIX,
error_output_prefix=aws_metrics_constants.S3_DESTINATION_ERROR_OUTPUT_PREFIX,
role_arn=self._firehose_delivery_stream_role.role_arn,
compression_format=aws_metrics_constants.S3_COMPRESSION_FORMAT,
cloud_watch_logging_options=kinesisfirehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
enabled=True,
log_group_name=self._firehose_delivery_stream_log_group.log_group_name,
log_stream_name=self._firehose_s3_delivery_log_stream.log_stream_name
),
processing_configuration=kinesisfirehose.CfnDeliveryStream.ProcessingConfigurationProperty(
enabled=True,
processors=[
kinesisfirehose.CfnDeliveryStream.ProcessorProperty(
type='Lambda',
parameters=[
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='LambdaArn',
parameter_value=self._events_processing_lambda.function_arn
),
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='BufferIntervalInSeconds',
parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_INTERVAL_IN_SECONDS
),
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='BufferSizeInMBs',
parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_SIZE_IN_MBS
),
kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name='NumberOfRetries',
parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_NUM_OF_RETRIES
)
]
)
]
),
data_format_conversion_configuration=kinesisfirehose.CfnDeliveryStream.DataFormatConversionConfigurationProperty(
enabled=True,
input_format_configuration=kinesisfirehose.CfnDeliveryStream.InputFormatConfigurationProperty(
deserializer=kinesisfirehose.CfnDeliveryStream.DeserializerProperty(
open_x_json_ser_de=kinesisfirehose.CfnDeliveryStream.OpenXJsonSerDeProperty(
case_insensitive=True,
convert_dots_in_json_keys_to_underscores=False
)
)
),
output_format_configuration=kinesisfirehose.CfnDeliveryStream.OutputFormatConfigurationProperty(
serializer=kinesisfirehose.CfnDeliveryStream.SerializerProperty(
parquet_ser_de=kinesisfirehose.CfnDeliveryStream.ParquetSerDeProperty(
compression=aws_metrics_constants.PARQUET_SER_DE_COMPRESSION
)
)
),
schema_configuration=kinesisfirehose.CfnDeliveryStream.SchemaConfigurationProperty(
catalog_id=self._stack.account,
role_arn=self._firehose_delivery_stream_role.role_arn,
database_name=self._events_database_name,
table_name=self._events_table_name,
region=self._stack.region,
version_id='LATEST'
)
)
)
)
def _create_firehose_s3_delivery_log_option(self) -> None:
"""
Generated the CloudWatch log group and log stream that Kinesis Data Firehose
uses for the delivery stream.
"""
self._firehose_delivery_stream_log_group = logs.LogGroup(
self._stack,
id='FirehoseLogGroup',
log_group_name=resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-FirehoseLogGroup', 'cloudwatch_log_group'),
removal_policy=RemovalPolicy.DESTROY,
retention=logs.RetentionDays.ONE_MONTH
)
self._firehose_s3_delivery_log_stream = logs.LogStream(
self._stack,
id='FirehoseS3DeliveryLogStream',
log_group=self._firehose_delivery_stream_log_group,
log_stream_name=f'{self._stack.stack_name}-FirehoseS3DeliveryLogStream',
removal_policy=RemovalPolicy.DESTROY
)
def _create_data_firehose_role(self) -> None:
"""
Generated IAM role for the Kinesis Data Firehose delivery stream.
"""
policy_statements = list()
data_lake_policy_statement = iam.PolicyStatement(
actions=[
's3:AbortMultipartUpload',
's3:GetBucketLocation',
's3:GetObject',
's3:ListBucket',
's3:ListBucketMultipartUploads',
's3:PutObject'
],
effect=iam.Effect.ALLOW,
resources=[
self._analytics_bucket_arn,
f'{self._analytics_bucket_arn}/*'
]
)
policy_statements.append(data_lake_policy_statement)
events_processing_lambda_policy_statement = iam.PolicyStatement(
actions=[
'lambda:InvokeFunction',
'lambda:GetFunctionConfiguration',
],
effect=iam.Effect.ALLOW,
resources=[
self._events_processing_lambda.function_arn
]
)
policy_statements.append(events_processing_lambda_policy_statement)
input_stream_policy_statement = iam.PolicyStatement(
actions=[
'kinesis:DescribeStream',
'kinesis:GetShardIterator',
'kinesis:GetRecords',
'kinesis:ListShards'
],
effect=iam.Effect.ALLOW,
resources=[
self._input_stream_arn
]
)
policy_statements.append(input_stream_policy_statement)
log_policy_statement = iam.PolicyStatement(
actions=[
'logs:PutLogEvents',
],
effect=iam.Effect.ALLOW,
resources=[
self._firehose_delivery_stream_log_group.log_group_arn
]
)
policy_statements.append(log_policy_statement)
data_catalog_policy_statement = iam.PolicyStatement(
actions=[
'glue:GetTable',
'glue:GetTableVersion',
'glue:GetTableVersions'
],
effect=iam.Effect.ALLOW,
resources=[
Fn.sub(
'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
),
Fn.sub(
body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:table/${EventsDatabase}/*',
variables={
'EventsDatabase': self._events_database_name
}
),
Fn.sub(
body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/${EventsDatabase}',
variables={
'EventsDatabase': self._events_database_name
}
)
]
)
policy_statements.append(data_catalog_policy_statement)
firehose_delivery_policy = iam.PolicyDocument(
statements=policy_statements
)
self._firehose_delivery_stream_role = iam.Role(
self._stack,
id='GameEventsFirehoseRole',
role_name=resource_name_sanitizer.sanitize_resource_name(
f'{self._stack.stack_name}-GameEventsFirehoseRole', 'iam_role'),
assumed_by=iam.ServicePrincipal(
service='firehose.amazonaws.com'
),
inline_policies={
'FirehoseDelivery': firehose_delivery_policy
}
)
@property
def events_processing_lambda_name(self) -> lambda_.Function.function_name:
return self._events_processing_lambda.function_name
@property
def events_processing_lambda_arn(self) -> lambda_.Function.function_name:
return self._events_processing_lambda.function_arn
@property
def events_processing_lambda_role_arn(self) -> iam.Role.role_arn:
return self._events_processing_lambda_role.role_arn
@property
def delivery_stream_name(self) -> kinesisfirehose.CfnDeliveryStream.delivery_stream_name:
return self._events_firehose_delivery_stream.ref
@property
def delivery_stream_role_arn(self) -> iam.Role.role_arn:
return self._firehose_delivery_stream_role.role_arn
@property
def delivery_stream_log_group_name(self) -> logs.LogGroup.log_group_name:
return self._firehose_delivery_stream_log_group.log_group_name