Gems/AWSMetrics/cdk/aws_metrics/batch_analytics.py (108 lines of code) (raw):

""" Copyright (c) Contributors to the Open 3D Engine Project. For complete copyright and license terms please see the LICENSE at the root of this distribution. SPDX-License-Identifier: Apache-2.0 OR MIT """ from constructs import Construct from aws_cdk import ( Fn, CfnOutput, aws_athena as athena ) from . import aws_metrics_constants from .aws_utils import resource_name_sanitizer class BatchAnalytics: """ Query the metrics stored in the S3 data lake via Amazon Athena """ def __init__(self, stack: Construct, application_name: str, analytics_bucket_name: str, events_database_name: str, events_table_name) -> None: self._stack = stack self._application_name = application_name self._analytics_bucket_name = analytics_bucket_name self._events_database_name = events_database_name self._events_table_name = events_table_name self._create_athena_work_group() self._create_athena_queries() def _create_athena_work_group(self) -> None: """ Create a specific athena work group for access control. """ self._athena_work_group = athena.CfnWorkGroup( self._stack, id='AthenaWorkGroup', name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-AthenaWorkGroup', 'athena_work_group'), recursive_delete_option=True, state='ENABLED', work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty( publish_cloud_watch_metrics_enabled=True, result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty( encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty( encryption_option='SSE_S3' ), output_location=Fn.sub( body='s3://${AnalyticsBucket}/${AthenaOutputDirectory}/', variables={ 'AnalyticsBucket': self._analytics_bucket_name, 'AthenaOutputDirectory': aws_metrics_constants.ATHENA_OUTPUT_DIRECTORY } ) ) ) ) CfnOutput( self._stack, id='AthenaWorkGroupName', description='Name of the Athena work group that contains sample queries', export_name=f"{self._application_name}:AthenaWorkGroup", value=self._athena_work_group.name) def _create_athena_queries(self) -> None: """ Create several example queries for reference. """ self._named_queries = [ athena.CfnNamedQuery( self._stack, id='NamedQuery-CreatePartitionedEventsJson', name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-NamedQuery-CreatePartitionedEventsJson', 'athena_named_query'), database=self._events_database_name, query_string="CREATE TABLE events_json " "WITH (format='JSON',partitioned_by=ARRAY['application_id']) " "AS SELECT year, month, day, event_id, application_id " f"FROM \"{self._events_database_name}\".\"{self._events_table_name}\"", description='This command demonstrates how to create a new table of raw events' ' transformed to JSON format. Output is partitioned by Application', work_group=self._athena_work_group.name ), athena.CfnNamedQuery( self._stack, id='NamedQuery-TotalEventsLastMonth', name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-NamedQuery-TotalEventsLastMonth', 'athena_named_query'), database=self._events_database_name, query_string="WITH detail AS " "(SELECT date_trunc('month', date(date_parse(CONCAT(year, '-', month, '-', day), '%Y-%m-%d'))) as event_month, * " f"FROM \"{self._events_database_name}\".\"{self._events_table_name}\") " "SELECT " "date_trunc('month', event_month) as month, application_id, count(DISTINCT event_id) as event_count " "FROM detail " "GROUP BY date_trunc('month', event_month), application_id", description='Total events over last month', work_group=self._athena_work_group.name ), athena.CfnNamedQuery( self._stack, id='NamedQuery-LoginLastMonth', name=resource_name_sanitizer.sanitize_resource_name( f'{self._stack.stack_name}-NamedQuery-LoginLastMonth', 'athena_named_query'), database=self._events_database_name, query_string="WITH detail AS (" "SELECT date_trunc('month', date(date_parse(CONCAT(year, '-', month, '-', day), '%Y-%m-%d'))) as event_month, * " f"FROM \"{self._events_database_name}\".\"{self._events_table_name}\") " "SELECT " "date_trunc('month', event_month) as month, " "count(*) as new_accounts " "FROM detail " "WHERE event_name = 'login' " "GROUP BY date_trunc('month', event_month)", description='Total number of login events over the last month', work_group=self._athena_work_group.name ) ] for named_query in self._named_queries: named_query.node.add_dependency(self._athena_work_group) @property def athena_work_group_name(self) -> athena.CfnWorkGroup.name: return self._athena_work_group.name