sql_generators/glean_usage/glean_app_ping_views.py (182 lines of code) (raw):
"""
Generate app (as opposed to channel) specific views for Glean ping tables.
The generation logic sets fields that might not be present in stable tables
(but are present in others) to NULL. Fields are ordered so that UNIONs across
the stable tables are possible.
For views that have incomaptible schemas (e.g due to fields having mismatching
types), the view is only generated for the release channel.
"""
import os
from copy import deepcopy
from pathlib import Path
import yaml
from jinja2 import Environment, FileSystemLoader
from mozilla_schema_generator.glean_ping import GleanPing
from pathos.multiprocessing import ThreadingPool
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.schema import Schema
from bigquery_etl.util.common import get_table_dir, write_sql
from sql_generators.glean_usage.common import GleanTable
VIEW_METADATA_TEMPLATE = """\
# Generated by bigquery_etl.glean_usage.GleanAppPingViews
---
friendly_name: App-specific view for Glean ping "{ping_name}"
description: |-
This a view that UNIONs the stable ping tables
across all channels of the Glean application "{app_name}"
({app_channels}).
It is used by Looker.
"""
# Fields that exist in the source dataset,
# but are manually overriden in the constructed SQL.
# MUST be kept in sync with the query in `app_ping_view.view.sql`
OVERRIDDEN_FIELDS = {"normalized_channel"}
VIEW_SQL_LENGTH_LIMIT = 1024 * 1024
PATH = Path(os.path.dirname(__file__))
class GleanAppPingViews(GleanTable):
"""Represents generated Glean app ping view."""
def __init__(self):
"""Initialize Glean ping view."""
GleanTable.__init__(self)
self.per_app_id_enabled = False
self.per_app_enabled = True
def generate_per_app(
self,
project_id,
app_info,
output_dir=None,
use_cloud_function=True,
parallelism=8,
id_token=None,
):
"""
Generate per-app ping views across channels.
If schemas are incompatible, then use release channel only.
"""
# get release channel info
release_app = app_info[0]
target_dataset = release_app["app_name"]
# channels are all in the same repo, sending the same pings
repo = next(
(r for r in GleanPing.get_repos() if r["name"] == release_app["v1_name"])
)
# app name is the same as the bq_dataset_family for the release channel: do nothing
if (
repo["app_id"] == release_app["app_name"]
or release_app["bq_dataset_family"] == release_app["app_name"]
):
return
env = Environment(loader=FileSystemLoader(PATH / "templates"))
view_template = env.get_template("app_ping_view.view.sql")
skip_existing = self.skip_existing(output_dir, project_id)
p = GleanPing(repo)
# generate views for all available pings
def _process_ping(ping_name):
view_name = ping_name.replace("-", "_")
full_view_id = f"moz-fx-data-shared-prod.{target_dataset}.{view_name}"
# generate a unioned schema that contains all fields of all ping tables across all channels
unioned_schema = Schema.empty()
# cache schemas to be reused when generating the select expression
cached_schemas = {}
# iterate through app_info to get all channels
included_channel_apps = []
included_channel_views = []
for channel_app in app_info:
channel_dataset = channel_app["bq_dataset_family"]
channel_dataset_view = f"{channel_dataset}.{view_name}"
schema = Schema.for_table(
"moz-fx-data-shared-prod",
channel_dataset,
view_name,
partitioned_by="submission_timestamp",
use_cloud_function=use_cloud_function,
id_token=id_token,
)
cached_schemas[channel_dataset] = deepcopy(schema)
if schema.schema["fields"] == []:
# check for empty schemas (e.g. restricted ones) and skip for now
print(f"Cannot get schema for `{channel_dataset_view}`; Skipping")
continue
try:
unioned_schema.merge(
schema, add_missing_fields=True, ignore_incompatible_fields=True
)
except Exception as e:
# if schema incompatibilities are detected, then only generate for release channel
print(
f"Cannot UNION `moz-fx-data-shared-prod.{channel_dataset_view}`: {e}"
)
break
included_channel_apps.append(channel_app)
included_channel_views.append(channel_dataset_view)
if included_channel_apps == []:
# nothing to render
return
# generate the SELECT expression used for UNIONing the stable tables;
# fields that are not part of a table, but exist in others, are set to NULL
def _generate_view_sql(restructure_metrics=False) -> str:
queries = []
for channel_app in included_channel_apps:
channel_dataset = channel_app["bq_dataset_family"]
# compare table schema with unioned schema to determine fields that need to be NULL
select_expression = cached_schemas[
channel_dataset
].generate_compatible_select_expression(
unioned_schema,
fields_to_remove=OVERRIDDEN_FIELDS,
unnest_structs=restructure_metrics,
max_unnest_depth=2,
unnest_allowlist="metrics",
)
queries.append(
dict(
select_expression=select_expression,
dataset=channel_dataset,
table=view_name,
channel=channel_app.get("app_channel"),
app_name=release_app["app_name"],
includes_client_info=any(
[
"client_info" == f["name"]
for f in unioned_schema.schema["fields"]
]
),
)
)
render_kwargs = dict(
project_id=project_id, target_view=full_view_id, queries=queries
)
return reformat(view_template.render(**render_kwargs))
view_sql = _generate_view_sql(restructure_metrics=True)
if len(view_sql) > VIEW_SQL_LENGTH_LIMIT:
print(
f"Generated SQL for `{full_view_id}` view with restructured `metrics` exceeds {VIEW_SQL_LENGTH_LIMIT:,} character limit."
" Regenerating SQL without restructured `metrics`."
)
view_sql = _generate_view_sql(restructure_metrics=False)
# write generated SQL files to destination folders
if output_dir:
write_sql(
output_dir,
full_view_id,
"view.sql",
view_sql,
skip_existing=str(
get_table_dir(output_dir, full_view_id) / "view.sql"
)
in skip_existing,
)
metadata_content = VIEW_METADATA_TEMPLATE.format(
ping_name=ping_name,
app_name=release_app["canonical_app_name"],
app_channels=", ".join(included_channel_views),
)
metadata_file = Path(
get_table_dir(output_dir, full_view_id) / "metadata.yaml"
)
if metadata_file.exists():
with metadata_file.open() as f:
existing_metadata = yaml.load(f, Loader=yaml.FullLoader)
if (
"friendly_name" not in existing_metadata
and "description" not in existing_metadata
):
metadata_content = metadata_content + yaml.dump(
existing_metadata
)
write_sql(
output_dir,
full_view_id,
"metadata.yaml",
metadata_content,
skip_existing=str(
get_table_dir(output_dir, full_view_id) / "metadata.yaml"
)
in skip_existing,
)
schema_dir = get_table_dir(output_dir, full_view_id)
# remove overridden fields from schema
# it's assumed that these fields are added separately, or ignored completely
unioned_schema.schema["fields"] = [
field
for field in unioned_schema.schema["fields"]
if field["name"] not in OVERRIDDEN_FIELDS
]
# normalized_app_id is not part of the underlying table the schemas are derived from,
# the field gets added as part of the view definition, so we have to add it manually to the schema
unioned_schema.schema["fields"] = [
{
"name": "normalized_app_id",
"mode": "NULLABLE",
"type": "STRING",
"description": "App ID of the channel data was received from",
},
{
"name": "normalized_channel",
"mode": "NULLABLE",
"type": "STRING",
"description": "Normalized channel name",
},
] + unioned_schema.schema["fields"]
unioned_schema.to_yaml_file(schema_dir / "schema.yaml")
# Using ThreadingPool instead of ProcessingPool here, due to issues with pickling the GleanAppPingViews class (self references),
# and ProcessingPools cannot be nested - glean_usage is using ProcessingPool to kick off generating the different tables per app
with ThreadingPool(parallelism) as pool:
pool.map(
_process_ping,
p.get_pings(),
)