bigquery_etl/glam/generate.py (260 lines of code) (raw):

"""Generate templated views.""" from argparse import ArgumentParser, Namespace from dataclasses import dataclass from functools import partial from pathlib import Path from jinja2 import Environment, PackageLoader, TemplateNotFound from jsonschema import validate from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.glam import models class QueryType: """Types of queries in the template folder.""" VIEW = "view" INIT = "init" TABLE = "query" @dataclass class TemplateResult: """Results of templating a query.""" table_id: str query_type: QueryType query_text: str def from_template( query_type: QueryType, template_name: str, environment: Environment, args: Namespace, dataset_path: Path, query_name_prefix=None, **kwargs, ) -> TemplateResult: """Fill in templates and write them to disk.""" if query_type == QueryType.INIT: template = environment.get_template(f"{template_name}.init.sql") else: template = environment.get_template(f"{template_name}.sql") template_filename = template_name.split("__")[-1] if query_name_prefix: table_id = f"{args.prefix}__{query_name_prefix}_{template_filename}" else: table_id = f"{args.prefix}__{template_filename}" # replaces the header, if it exists kwargs["header"] = f"-- {query_type} for {table_id};" # create the directory for the view (dataset_path / table_id).mkdir(exist_ok=True) view_path = dataset_path / table_id / f"{query_type}.sql" # write the query with appropriate variables query_text = reformat(template.render(**{**vars(args), **kwargs})) print(f"generated {view_path}") with view_path.open("w") as fp: print(query_text, file=fp) return TemplateResult(table_id, query_type, query_text) def main(): """Generate GLAM ETL queries.""" parser = ArgumentParser(description=main.__doc__) parser.add_argument("--prefix") parser.add_argument("--project", default="glam-fenix-dev") parser.add_argument("--dataset", default="glam_etl") parser.add_argument("--sql-root", default="sql/") parser.add_argument("--daily-view-only", action="store_true", default=False) parser.add_argument("--use-sample-id", action="store_true", default=False) args = parser.parse_args() env = Environment(loader=PackageLoader("bigquery_etl", "glam/templates")) dataset_path = Path(args.sql_root) / args.project / args.dataset if not dataset_path.is_dir(): raise NotADirectoryError(f"path to {dataset_path} not found") # curry functions for convenience template = partial( from_template, environment=env, dataset_path=dataset_path, args=args ) view = partial(template, QueryType.VIEW) table = partial(template, QueryType.TABLE) init = partial(template, QueryType.INIT) # If this is a logical app id, generate it. Assert that the daily view for # the app exists. This assumes that both scalar and histogram aggregates # exist and will break down in the case where a glean app only contains one # of the scalar or histogram view. for daily_view in [ "view_clients_daily_scalar_aggregates_v1", "view_clients_daily_histogram_aggregates_v1", ]: try: view(f"logical_app_id/{args.prefix}__{daily_view}") except TemplateNotFound: print(f"{args.prefix} is not a logical app id") # generate the view for the app id directly view(daily_view) if not (dataset_path / f"{args.prefix}__{daily_view}").is_dir(): raise ValueError(f"missing {daily_view}") # exit early if we're only generating a daily view if args.daily_view_only: return # Supported fenix/firefox for android products. These are logical ids that # are formed from the union of several app_ids (sometimes across date # boundaries). config_schema = { "type": "object", "additionalProperties": { "type": "object", "properties": { "build_date_udf": { "type": "string", "description": ( "The fully qualified path to a UDF that accepts the " "client_info.app_build_id field and returns a datetime." ), }, "filter_version": { "type": "boolean", "description": ( "Whether the integer extracted from the " "client_info.app_display_version should be used to " "filter incremental aggregates." ), }, "num_versions_to_keep": { "type": "integer", "minimum": 1, "description": "The number of versions to keep.", }, "total_users": { "type": "integer", "minimum": 1, "description": "The number of users to filter the data on.", }, "minimum_client_count": { "type": "integer", "minimum": 0, "description": "The minimum client count for each build id." "We generally want this to be roughly 0.5% of WAU." "For context see https://github.com/mozilla/glam/issues/1575#issuecomment-946880387", # noqa E501 }, }, "required": [ "build_date_udf", "filter_version", "num_versions_to_keep", "total_users", ], }, } config = { "org_mozilla_fenix_glam_nightly": { "build_date_udf": "mozfun.glam.build_hour_to_datetime", "filter_version": True, "num_versions_to_keep": 3, "total_users": 10, "minimum_client_count": 800, }, "org_mozilla_fenix_glam_beta": { "build_date_udf": "mozfun.glam.build_hour_to_datetime", "filter_version": True, "num_versions_to_keep": 3, "total_users": 10, "minimum_client_count": 2000, }, "org_mozilla_fenix_glam_release": { "build_date_udf": "mozfun.glam.build_hour_to_datetime", "filter_version": True, "num_versions_to_keep": 3, "total_users": 10, "minimum_client_count": 90000, }, "firefox_desktop_glam_nightly": { "build_date_udf": "mozfun.glam.build_hour_to_datetime", "filter_version": True, "num_versions_to_keep": 3, "total_users": 10, "minimum_client_count": 100, }, "firefox_desktop_glam_beta": { "build_date_udf": "mozfun.glam.build_hour_to_datetime", "filter_version": True, "num_versions_to_keep": 3, "total_users": 100, "minimum_client_count": 1000, }, "firefox_desktop_glam_release": { "build_date_udf": "mozfun.glam.build_hour_to_datetime", "filter_version": True, "num_versions_to_keep": 3, "total_users": 100, "minimum_client_count": 100000, }, } channel_prefixes = { "firefox_desktop_glam_nightly": "nightly", "firefox_desktop_glam_beta": "beta", "firefox_desktop_glam_release": "release", "org_mozilla_fenix_glam_nightly": "nightly", "org_mozilla_fenix_glam_beta": "beta", "org_mozilla_fenix_glam_release": "release", } validate(instance=config, schema=config_schema) if not config.get(args.prefix, {}).get("build_date_udf"): raise ValueError(f"build date udf for {args.prefix} was not found") [ table( "latest_versions_v1", **dict(app_id_channel=(f"'{channel_prefixes[args.prefix]}'")), ), init( "clients_scalar_aggregates_v1", **models.clients_scalar_aggregates( source_table=( f"glam_etl.{args.prefix}__view_clients_daily_scalar_aggregates_v1" ), destination_table=( f"glam_etl.{args.prefix}__clients_scalar_aggregates_v1" ), ), ), table( "clients_scalar_aggregates_v1", **models.clients_scalar_aggregates( source_table=( f"glam_etl.{args.prefix}__view_clients_daily_scalar_aggregates_v1" ), destination_table=( f"glam_etl.{args.prefix}__clients_scalar_aggregates_v1" ), **config[args.prefix], ), ), init( "clients_histogram_aggregates_v1", **models.clients_histogram_aggregates(parameterize=True), ), table( "clients_histogram_aggregates_v1", **models.clients_histogram_aggregates( parameterize=True, **config[args.prefix] ), ), table( "scalar_bucket_counts_v1", **models.scalar_bucket_counts( source_table=f"glam_etl.{args.prefix}__clients_scalar_aggregates_v1", **config[args.prefix], ), ), table( "histogram_bucket_counts_v1", **models.histogram_bucket_counts( source_table=f"glam_etl.{args.prefix}__clients_histogram_aggregates_v1", **config[args.prefix], ), channel=channel_prefixes[args.prefix], use_sample_id=args.use_sample_id, ), table( "probe_counts_v1", query_name_prefix="scalar", **models.probe_counts( source_table=f"glam_etl.{args.prefix}__scalar_bucket_counts_v1", is_scalar=True, ), channel=channel_prefixes[args.prefix], ), table( "probe_counts_v1", query_name_prefix="histogram", **models.probe_counts( source_table=f"glam_etl.{args.prefix}__histogram_bucket_counts_v1", is_scalar=False, ), channel=channel_prefixes[args.prefix], ), view("view_probe_counts_v1"), view("view_user_counts_v1", **models.user_counts()), view( "view_sample_counts_v1", **models.sample_counts(), channel=channel_prefixes[args.prefix], ), table("extract_user_counts_v1", **config[args.prefix]), table("extract_probe_counts_v1", **config[args.prefix]), ] if __name__ == "__main__": main()