sql_generators/glean_usage/event_monitoring_live.py (288 lines of code) (raw):

"""Generate Materialized Views and aggregate queries for event monitoring.""" import os import re from collections import namedtuple, OrderedDict from datetime import datetime from pathlib import Path from typing import List, Set import requests from bigquery_etl.config import ConfigLoader from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas from sql_generators.glean_usage.common import ( GleanTable, get_app_info, get_table_dir, render, table_names_from_baseline, write_sql, ) TARGET_TABLE_ID = "event_monitoring_live_v1" TARGET_DATASET_CROSS_APP = "monitoring" PREFIX = "event_monitoring" PATH = Path(os.path.dirname(__file__)) GLEAN_APP_BASE_URL = "https://probeinfo.telemetry.mozilla.org/glean/{app_name}" METRICS_INFO_URL = f"{GLEAN_APP_BASE_URL}/metrics" PING_INFO_URL = f"{GLEAN_APP_BASE_URL}/pings" class EventMonitoringLive(GleanTable): """Represents the generated materialized view for event monitoring.""" def __init__(self) -> None: """Initialize materialized view generation.""" self.per_app_id_enabled = True self.per_app_enabled = False self.across_apps_enabled = True self.prefix = PREFIX self.target_table_id = TARGET_TABLE_ID self.custom_render_kwargs = {} self.base_table_name = "events_v1" def _get_prod_datasets_with_event(self) -> List[str]: """Get glean datasets with an events table in generated schemas.""" return [ s.bq_dataset_family for s in get_stable_table_schemas() if s.schema_id == "moz://mozilla.org/schemas/glean/ping/1" and s.bq_table == "events_v1" ] def _get_tables_with_events( self, v1_name: str, bq_dataset_name: str, skip_min_ping: bool ) -> Set[str]: """Get tables for the given app that receive event type metrics.""" pings = set() metrics_resp = requests.get(METRICS_INFO_URL.format(app_name=v1_name)) metrics_resp.raise_for_status() metrics_json = metrics_resp.json() min_pings = set() if skip_min_ping: ping_resp = requests.get(PING_INFO_URL.format(app_name=v1_name)) ping_resp.raise_for_status() ping_json = ping_resp.json() min_pings = { name for name, info in ping_json.items() if not info["history"][-1].get("include_info_sections", True) } for _, metric in metrics_json.items(): if metric.get("type", None) == "event": latest_history = metric.get("history", [])[-1] pings.update(latest_history.get("send_in_pings", [])) if bq_dataset_name in self._get_prod_datasets_with_event(): pings.add("events") pings = pings.difference(min_pings) return pings def generate_per_app_id( self, project_id, baseline_table, output_dir=None, use_cloud_function=True, app_info=[], parallelism=8, id_token=None, ): # Get the app ID from the baseline_table name. # This is what `common.py` also does. app_id = re.sub(r"_stable\..+", "", baseline_table) app_id = ".".join(app_id.split(".")[1:]) # Skip any not-allowed app. if app_id in ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "skip_apps", fallback=[] ): return tables = table_names_from_baseline(baseline_table, include_project_id=False) init_filename = f"{self.target_table_id}.materialized_view.sql" metadata_filename = f"{self.target_table_id}.metadata.yaml" refresh_script_filename = f"{self.target_table_id}.script.sql" table = tables[f"{self.prefix}"] dataset = tables[self.prefix].split(".")[-2].replace("_derived", "") events_table_overwrites = ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "events_tables", fallback={} ) app_name = [ app_dataset["app_name"] for _, app in get_app_info().items() for app_dataset in app if dataset == app_dataset["bq_dataset_family"] ][0] if app_name in events_table_overwrites: events_tables = events_table_overwrites[app_name] else: v1_name = [ app_dataset["v1_name"] for _, app in get_app_info().items() for app_dataset in app if dataset == app_dataset["bq_dataset_family"] ][0] events_tables = self._get_tables_with_events( v1_name, dataset, skip_min_ping=True ) events_tables = [ f"{ping.replace('-', '_')}_v1" for ping in events_tables if ping not in ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "skip_pings" ) ] if len(events_tables) == 0: return manual_refresh = app_name in ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "manual_refresh", fallback=[], ) render_kwargs = dict( header="-- Generated via bigquery_etl.glean_usage\n", header_yaml="---\n# Generated via bigquery_etl.glean_usage\n", project_id=project_id, derived_dataset=tables[self.prefix].split(".")[-2], dataset=dataset, current_date=datetime.today().strftime("%Y-%m-%d"), app_name=[ app_dataset["canonical_app_name"] for _, app in get_app_info().items() for app_dataset in app if dataset == app_dataset["bq_dataset_family"] ][0], events_tables=sorted(events_tables), manual_refresh=manual_refresh, ) render_kwargs.update(self.custom_render_kwargs) render_kwargs.update(tables) # generated files to update Artifact = namedtuple("Artifact", "table_id basename sql") artifacts = [] init_sql = render( init_filename, template_folder=PATH / "templates", **render_kwargs ) metadata = render( metadata_filename, template_folder=PATH / "templates", format=False, **render_kwargs, ) artifacts.append(Artifact(table, "metadata.yaml", metadata)) if manual_refresh: refresh_script = render( refresh_script_filename, template_folder=PATH / "templates", format=False, **render_kwargs, ) artifacts.append(Artifact(table, "script.sql", refresh_script)) skip_existing_artifact = self.skip_existing(output_dir, project_id) if output_dir: artifacts.append(Artifact(table, "materialized_view.sql", init_sql)) for artifact in artifacts: destination = ( get_table_dir(output_dir, artifact.table_id) / artifact.basename ) skip_existing = str(destination) in skip_existing_artifact write_sql( output_dir, artifact.table_id, artifact.basename, artifact.sql, skip_existing=skip_existing, ) def generate_across_apps( self, project_id, apps, output_dir=None, use_cloud_function=True, parallelism=8 ): """Generate a query across all apps.""" if not self.across_apps_enabled: return aggregate_table = "event_monitoring_aggregates_v1" target_view_name = "_".join(self.target_table_id.split("_")[:-1]) events_table_overwrites = ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "events_tables", fallback={} ) event_tables_per_dataset = OrderedDict() # Skip any not-allowed app. skip_apps = ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "skip_apps", fallback=[] ) for app in apps: for app_dataset in app: if app_dataset["app_name"] in skip_apps: continue dataset = app_dataset["bq_dataset_family"] app_name = [ app_dataset["app_name"] for _, app in get_app_info().items() for app_dataset in app if dataset == app_dataset["bq_dataset_family"] ][0] if app_name in events_table_overwrites: event_tables_per_dataset[dataset] = events_table_overwrites[ app_name ] else: v1_name = [ app_dataset["v1_name"] for _, app in get_app_info().items() for app_dataset in app if dataset == app_dataset["bq_dataset_family"] ][0] event_tables = [ f"{ping.replace('-', '_')}_v1" for ping in self._get_tables_with_events( v1_name, app_dataset["bq_dataset_family"], skip_min_ping=True, ) if ping not in ConfigLoader.get( "generate", "glean_usage", "events_monitoring", "skip_pings" ) ] if len(event_tables) > 0: event_tables_per_dataset[dataset] = sorted(event_tables) render_kwargs = dict( header="-- Generated via bigquery_etl.glean_usage\n", header_yaml="---\n# Generated via bigquery_etl.glean_usage\n", project_id=project_id, target_view=f"{TARGET_DATASET_CROSS_APP}.{target_view_name}", table=target_view_name, target_table=f"{TARGET_DATASET_CROSS_APP}_derived.{aggregate_table}", apps=apps, prod_datasets=self._get_prod_datasets_with_event(), event_tables_per_dataset=event_tables_per_dataset, ) render_kwargs.update(self.custom_render_kwargs) skip_existing_artifacts = self.skip_existing(output_dir, project_id) Artifact = namedtuple("Artifact", "table_id basename sql") query_filename = f"{aggregate_table}.query.sql" query_sql = render( query_filename, template_folder=PATH / "templates", **render_kwargs ) metadata = render( f"{aggregate_table}.metadata.yaml", template_folder=PATH / "templates", format=False, **render_kwargs, ) table = f"{project_id}.{TARGET_DATASET_CROSS_APP}_derived.{aggregate_table}" view_sql = render( "event_monitoring_live.view.sql", template_folder=PATH / "templates", **render_kwargs, ) view_metadata = render( "event_monitoring_live.metadata.yaml", template_folder=PATH / "templates", format=False, **render_kwargs, ) schema = ( PATH / "templates" / "event_monitoring_aggregates_v1.schema.yaml" ).read_text() view = f"{project_id}.{TARGET_DATASET_CROSS_APP}.{target_view_name}" if output_dir: artifacts = [ Artifact(table, "metadata.yaml", metadata), Artifact(table, "query.sql", query_sql), Artifact(table, "schema.yaml", schema), Artifact(view, "metadata.yaml", view_metadata), Artifact(view, "view.sql", view_sql), ] for artifact in artifacts: destination = ( get_table_dir(output_dir, artifact.table_id) / artifact.basename ) skip_existing = destination in skip_existing_artifacts write_sql( output_dir, artifact.table_id, artifact.basename, artifact.sql, skip_existing=skip_existing, )