sql_generators/glean_usage/glean_app_ping_views.py (182 lines of code) (raw):

""" Generate app (as opposed to channel) specific views for Glean ping tables. The generation logic sets fields that might not be present in stable tables (but are present in others) to NULL. Fields are ordered so that UNIONs across the stable tables are possible. For views that have incomaptible schemas (e.g due to fields having mismatching types), the view is only generated for the release channel. """ import os from copy import deepcopy from pathlib import Path import yaml from jinja2 import Environment, FileSystemLoader from mozilla_schema_generator.glean_ping import GleanPing from pathos.multiprocessing import ThreadingPool from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.schema import Schema from bigquery_etl.util.common import get_table_dir, write_sql from sql_generators.glean_usage.common import GleanTable VIEW_METADATA_TEMPLATE = """\ # Generated by bigquery_etl.glean_usage.GleanAppPingViews --- friendly_name: App-specific view for Glean ping "{ping_name}" description: |- This a view that UNIONs the stable ping tables across all channels of the Glean application "{app_name}" ({app_channels}). It is used by Looker. """ # Fields that exist in the source dataset, # but are manually overriden in the constructed SQL. # MUST be kept in sync with the query in `app_ping_view.view.sql` OVERRIDDEN_FIELDS = {"normalized_channel"} VIEW_SQL_LENGTH_LIMIT = 1024 * 1024 PATH = Path(os.path.dirname(__file__)) class GleanAppPingViews(GleanTable): """Represents generated Glean app ping view.""" def __init__(self): """Initialize Glean ping view.""" GleanTable.__init__(self) self.per_app_id_enabled = False self.per_app_enabled = True def generate_per_app( self, project_id, app_info, output_dir=None, use_cloud_function=True, parallelism=8, id_token=None, ): """ Generate per-app ping views across channels. If schemas are incompatible, then use release channel only. """ # get release channel info release_app = app_info[0] target_dataset = release_app["app_name"] # channels are all in the same repo, sending the same pings repo = next( (r for r in GleanPing.get_repos() if r["name"] == release_app["v1_name"]) ) # app name is the same as the bq_dataset_family for the release channel: do nothing if ( repo["app_id"] == release_app["app_name"] or release_app["bq_dataset_family"] == release_app["app_name"] ): return env = Environment(loader=FileSystemLoader(PATH / "templates")) view_template = env.get_template("app_ping_view.view.sql") skip_existing = self.skip_existing(output_dir, project_id) p = GleanPing(repo) # generate views for all available pings def _process_ping(ping_name): view_name = ping_name.replace("-", "_") full_view_id = f"moz-fx-data-shared-prod.{target_dataset}.{view_name}" # generate a unioned schema that contains all fields of all ping tables across all channels unioned_schema = Schema.empty() # cache schemas to be reused when generating the select expression cached_schemas = {} # iterate through app_info to get all channels included_channel_apps = [] included_channel_views = [] for channel_app in app_info: channel_dataset = channel_app["bq_dataset_family"] channel_dataset_view = f"{channel_dataset}.{view_name}" schema = Schema.for_table( "moz-fx-data-shared-prod", channel_dataset, view_name, partitioned_by="submission_timestamp", use_cloud_function=use_cloud_function, id_token=id_token, ) cached_schemas[channel_dataset] = deepcopy(schema) if schema.schema["fields"] == []: # check for empty schemas (e.g. restricted ones) and skip for now print(f"Cannot get schema for `{channel_dataset_view}`; Skipping") continue try: unioned_schema.merge( schema, add_missing_fields=True, ignore_incompatible_fields=True ) except Exception as e: # if schema incompatibilities are detected, then only generate for release channel print( f"Cannot UNION `moz-fx-data-shared-prod.{channel_dataset_view}`: {e}" ) break included_channel_apps.append(channel_app) included_channel_views.append(channel_dataset_view) if included_channel_apps == []: # nothing to render return # generate the SELECT expression used for UNIONing the stable tables; # fields that are not part of a table, but exist in others, are set to NULL def _generate_view_sql(restructure_metrics=False) -> str: queries = [] for channel_app in included_channel_apps: channel_dataset = channel_app["bq_dataset_family"] # compare table schema with unioned schema to determine fields that need to be NULL select_expression = cached_schemas[ channel_dataset ].generate_compatible_select_expression( unioned_schema, fields_to_remove=OVERRIDDEN_FIELDS, unnest_structs=restructure_metrics, max_unnest_depth=2, unnest_allowlist="metrics", ) queries.append( dict( select_expression=select_expression, dataset=channel_dataset, table=view_name, channel=channel_app.get("app_channel"), app_name=release_app["app_name"], includes_client_info=any( [ "client_info" == f["name"] for f in unioned_schema.schema["fields"] ] ), ) ) render_kwargs = dict( project_id=project_id, target_view=full_view_id, queries=queries ) return reformat(view_template.render(**render_kwargs)) view_sql = _generate_view_sql(restructure_metrics=True) if len(view_sql) > VIEW_SQL_LENGTH_LIMIT: print( f"Generated SQL for `{full_view_id}` view with restructured `metrics` exceeds {VIEW_SQL_LENGTH_LIMIT:,} character limit." " Regenerating SQL without restructured `metrics`." ) view_sql = _generate_view_sql(restructure_metrics=False) # write generated SQL files to destination folders if output_dir: write_sql( output_dir, full_view_id, "view.sql", view_sql, skip_existing=str( get_table_dir(output_dir, full_view_id) / "view.sql" ) in skip_existing, ) metadata_content = VIEW_METADATA_TEMPLATE.format( ping_name=ping_name, app_name=release_app["canonical_app_name"], app_channels=", ".join(included_channel_views), ) metadata_file = Path( get_table_dir(output_dir, full_view_id) / "metadata.yaml" ) if metadata_file.exists(): with metadata_file.open() as f: existing_metadata = yaml.load(f, Loader=yaml.FullLoader) if ( "friendly_name" not in existing_metadata and "description" not in existing_metadata ): metadata_content = metadata_content + yaml.dump( existing_metadata ) write_sql( output_dir, full_view_id, "metadata.yaml", metadata_content, skip_existing=str( get_table_dir(output_dir, full_view_id) / "metadata.yaml" ) in skip_existing, ) schema_dir = get_table_dir(output_dir, full_view_id) # remove overridden fields from schema # it's assumed that these fields are added separately, or ignored completely unioned_schema.schema["fields"] = [ field for field in unioned_schema.schema["fields"] if field["name"] not in OVERRIDDEN_FIELDS ] # normalized_app_id is not part of the underlying table the schemas are derived from, # the field gets added as part of the view definition, so we have to add it manually to the schema unioned_schema.schema["fields"] = [ { "name": "normalized_app_id", "mode": "NULLABLE", "type": "STRING", "description": "App ID of the channel data was received from", }, { "name": "normalized_channel", "mode": "NULLABLE", "type": "STRING", "description": "Normalized channel name", }, ] + unioned_schema.schema["fields"] unioned_schema.to_yaml_file(schema_dir / "schema.yaml") # Using ThreadingPool instead of ProcessingPool here, due to issues with pickling the GleanAppPingViews class (self references), # and ProcessingPools cannot be nested - glean_usage is using ProcessingPool to kick off generating the different tables per app with ThreadingPool(parallelism) as pool: pool.map( _process_ping, p.get_pings(), )