sql_generators/usage_reporting/usage_reporting.py (247 lines of code) (raw):

"""Usage Reporting ETL.""" import re from functools import partial from os import path from pathlib import Path from typing import Any, Dict from jinja2 import Environment, FileSystemLoader from bigquery_etl.config import ConfigLoader from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.util.common import write_sql from sql_generators.glean_usage.common import get_app_info GENERATOR_ROOT = Path(path.dirname(__file__)) HEADER = f"Generated via `{GENERATOR_ROOT.name}` SQL generator." VERSION = "v1" TEMPLATES_LOCATION = "templates" # TODO: can we have the templates picked up in some way automatically? CHANNEL_TEMPLATES = ( "usage_reporting_clients_daily_v1.query.sql.jinja", "usage_reporting_clients_first_seen_v1.query.sql.jinja", "usage_reporting_clients_last_seen_v1.query.sql.jinja", ) CHANNEL_VIEW_TEMPLATE = "channel.view.sql.jinja" ARTIFACT_TEMPLATES = ( "metadata.yaml.jinja", "schema.yaml.jinja", "bigconfig.yml.jinja", ) BIGEYE_COLLECTION = "Operational Checks" BIGEYE_NOTIFICATION_SLACK_CHANNEL = "#de-bigeye-triage" APP_UNION_VIEW_TEMPLATE = "app_union.view.sql.jinja" ACTIVE_USERS_VIEW_TEMPLATE = "usage_reporting_active_users.view.sql.jinja" COMPOSITE_ACTIVE_USERS_TEMPLATE = "composite_active_users.view.sql.jinja" ACTIVE_USERS_AGGREGATES_TEMPLATE = ( "usage_reporting_active_users_aggregates_v1.query.sql.jinja" ) ACTIVE_USERS_AGGREGATES_VIEW_TEMPLATE = ( "usage_reporting_active_users_aggregates.view.sql.jinja" ) COMPOSITE_ACTIVE_USERS_AGGREGATES_VIEW_TEMPLATE = ( "composite_active_users_aggregates.view.sql.jinja" ) def get_generation_config(): """Retrieve external configuration for this generator.""" # TODO: maybe we should have a data structure defined for this config and # do validation as part of it. return ConfigLoader.get("generate", "usage_reporting", "apps", fallback=[]) def get_specific_apps_app_info_from_probe_scraper(usage_reporting_apps): """Retrieve app_info from probe scraper app definitions \ and return only the info of apps defined in the generator configuration. The app info returned includes app_name, and bq namespaces containing data \ for specific app channels. """ probe_scraper_app_info = get_app_info() app_info_filtered: dict = dict() for app_name, app_info in probe_scraper_app_info.items(): if app_name not in usage_reporting_apps: continue # TODO: turn the generatic dict into a custom app data structure. app_info_filtered[app_name] = dict() # If channels is set to None it means data from multiple channels exists in the same table. # TODO: should we use "multichannel" instead of None as an indicator of this in the config file? if usage_reporting_apps[app_name]["channels"] is None: app_info_filtered[app_name]["multichannel"] = { "app_channel": None, "app_name": app_info[0]["app_name"], "bq_dataset_family": app_info[0]["bq_dataset_family"], } continue for index, channel_info in enumerate(app_info): if ( # this assumes that if no channel defined for an app in probe scraper # then the channel is "release". channel := channel_info.get("app_channel", "release") ) not in usage_reporting_apps[app_name]["channels"]: continue app_info_filtered[app_name][f"{channel}__{index}"] = { "app_channel": channel, "app_name": channel_info["app_name"], "bq_dataset_family": channel_info["bq_dataset_family"], } return app_info_filtered def render_and_write_to_file( jinja_env: Environment, output_dir: Path, template: str, template_args: Dict[str, Any], table_id: str, basename: str, format: bool = True, ) -> None: """Render a Jinja template and write it to file.""" rendered = jinja_env.get_template(template).render(**template_args) write_sql( output_dir=output_dir, skip_existing=False, full_table_id=table_id, basename=basename, sql=reformat(rendered) if format else rendered, ) return None def remove_table_version_suffix(table_id: str) -> str: """Remove version suffix from table_name. Example input: 'TABLE_NAME_v1' Return: 'TABLE_NAME' """ version_suffix_regex = r"(_v|_)?([0-9]+)?$" return re.sub(version_suffix_regex, "", table_id) def generate_usage_reporting(target_project: str, output_dir: Path): """Generate usage_reporting queries and views.""" usage_reporting_apps = get_generation_config() generator_apps_info = get_specific_apps_app_info_from_probe_scraper( usage_reporting_apps ) output_dir = Path(output_dir) / target_project jinja_env = Environment(loader=FileSystemLoader(str(GENERATOR_ROOT / "templates"))) render_and_write_to_file_partial = partial( render_and_write_to_file, jinja_env=jinja_env, output_dir=output_dir ) default_template_args = { "header": HEADER, "project_id": target_project, "usage_reporting_stable_table_name": "usage_reporting_v1", "bigeye_collection": BIGEYE_COLLECTION, "bigeye_notification_slack_channel": BIGEYE_NOTIFICATION_SLACK_CHANNEL, } for app_name, app_channels in generator_apps_info.items(): app_template_args = { "app_name": app_name, **default_template_args, } for channel_template in CHANNEL_TEMPLATES: table_name = channel_template.split(".")[0] for channel_name, channel_info in app_channels.items(): channel_dataset = channel_info["bq_dataset_family"] channel_args = { "channel_name": channel_name.split("__")[0], "channel_dataset": channel_dataset, "table_name": table_name, "view_name": remove_table_version_suffix(table_name), **app_template_args, } channel_table_id = ( f"{target_project}.{channel_dataset}_derived.{table_name}" ) render_and_write_to_file_partial( template=channel_template, template_args=channel_args, table_id=channel_table_id, basename="query.sql", ) for channel_query_artifact_template in ARTIFACT_TEMPLATES: render_and_write_to_file_partial( template=f"{table_name}.{channel_query_artifact_template}", template_args=channel_args, table_id=channel_table_id, basename=".".join( channel_query_artifact_template.split(".")[:-1] ), format=False, ) # Do not render channel view if only a single channel exists. if channel_name == "multichannel": continue channel_view_id = remove_table_version_suffix( f"{target_project}.{channel_dataset}.{table_name}" ) render_and_write_to_file_partial( template=CHANNEL_VIEW_TEMPLATE, template_args=channel_args, table_id=channel_view_id, basename="view.sql", ) channels_info = [ { "channel_dataset": channel_info["bq_dataset_family"], "channel_name": channel_info["app_channel"], } for channel_info in app_channels.values() ] render_and_write_to_file_partial( template=APP_UNION_VIEW_TEMPLATE, template_args={ "channels_info": channels_info, **app_template_args, "table_name": table_name, "view_name": remove_table_version_suffix(table_name), }, table_id=remove_table_version_suffix( f"{target_project}.{app_name}.{table_name}" ), basename="view.sql", ) active_users_dataset_name = ACTIVE_USERS_VIEW_TEMPLATE.split(".")[0] render_and_write_to_file_partial( template=ACTIVE_USERS_VIEW_TEMPLATE, template_args={ **app_template_args, "view_name": active_users_dataset_name, }, table_id=f"{target_project}.{app_name}.{active_users_dataset_name}", basename="view.sql", ) composite_active_users_dataset_name = COMPOSITE_ACTIVE_USERS_TEMPLATE.split( "." )[0] render_and_write_to_file_partial( template=COMPOSITE_ACTIVE_USERS_TEMPLATE, template_args={ **app_template_args, "view_name": composite_active_users_dataset_name, }, table_id=f"{target_project}.{app_name}.{composite_active_users_dataset_name}", basename="view.sql", ) active_users_aggregates_view_name = ACTIVE_USERS_AGGREGATES_VIEW_TEMPLATE.split( "." )[0] render_and_write_to_file_partial( template=ACTIVE_USERS_AGGREGATES_VIEW_TEMPLATE, template_args={ **app_template_args, "view_name": active_users_aggregates_view_name, }, table_id=f"{target_project}.{app_name}.{active_users_aggregates_view_name}", basename="view.sql", ) active_users_aggregates_dataset_name = ACTIVE_USERS_AGGREGATES_TEMPLATE.split( "." )[0] active_users_aggregates_table_id = f"{target_project}.{app_name}_derived.{active_users_aggregates_dataset_name}" render_and_write_to_file_partial( template=ACTIVE_USERS_AGGREGATES_TEMPLATE, template_args={ **app_template_args, "view_name": active_users_aggregates_dataset_name, }, table_id=active_users_aggregates_table_id, basename="query.sql", ) for query_artifact_template in ARTIFACT_TEMPLATES: render_and_write_to_file_partial( template=f"{active_users_aggregates_dataset_name}.{query_artifact_template}", template_args={ **app_template_args, "table_name": active_users_aggregates_dataset_name, }, table_id=active_users_aggregates_table_id, basename=".".join(query_artifact_template.split(".")[:-1]), format=False, ) # composite active users aggregates composite_active_users_aggregates_dataset_name = ( COMPOSITE_ACTIVE_USERS_AGGREGATES_VIEW_TEMPLATE.split(".")[0] ) render_and_write_to_file_partial( template=COMPOSITE_ACTIVE_USERS_AGGREGATES_VIEW_TEMPLATE, template_args={ **app_template_args, "view_name": composite_active_users_aggregates_dataset_name, }, table_id=f"{target_project}.{app_name}.{composite_active_users_aggregates_dataset_name}", basename="view.sql", ) if __name__ == "__main__": from argparse import ArgumentParser parser = ArgumentParser(description=__doc__) parser.add_argument("--project", default="moz-fx-data-shared-prod") parser.add_argument("--output_dir", default="sql") args = parser.parse_args() generate_usage_reporting(args.project, args.output_dir)