in sql_generators/glean_usage/glean_app_ping_views.py [0:0]
def _process_ping(ping_name):
view_name = ping_name.replace("-", "_")
full_view_id = f"moz-fx-data-shared-prod.{target_dataset}.{view_name}"
# generate a unioned schema that contains all fields of all ping tables across all channels
unioned_schema = Schema.empty()
# cache schemas to be reused when generating the select expression
cached_schemas = {}
# iterate through app_info to get all channels
included_channel_apps = []
included_channel_views = []
for channel_app in app_info:
channel_dataset = channel_app["bq_dataset_family"]
channel_dataset_view = f"{channel_dataset}.{view_name}"
schema = Schema.for_table(
"moz-fx-data-shared-prod",
channel_dataset,
view_name,
partitioned_by="submission_timestamp",
use_cloud_function=use_cloud_function,
id_token=id_token,
)
cached_schemas[channel_dataset] = deepcopy(schema)
if schema.schema["fields"] == []:
# check for empty schemas (e.g. restricted ones) and skip for now
print(f"Cannot get schema for `{channel_dataset_view}`; Skipping")
continue
try:
unioned_schema.merge(
schema, add_missing_fields=True, ignore_incompatible_fields=True
)
except Exception as e:
# if schema incompatibilities are detected, then only generate for release channel
print(
f"Cannot UNION `moz-fx-data-shared-prod.{channel_dataset_view}`: {e}"
)
break
included_channel_apps.append(channel_app)
included_channel_views.append(channel_dataset_view)
if included_channel_apps == []:
# nothing to render
return
# generate the SELECT expression used for UNIONing the stable tables;
# fields that are not part of a table, but exist in others, are set to NULL
def _generate_view_sql(restructure_metrics=False) -> str:
queries = []
for channel_app in included_channel_apps:
channel_dataset = channel_app["bq_dataset_family"]
# compare table schema with unioned schema to determine fields that need to be NULL
select_expression = cached_schemas[
channel_dataset
].generate_compatible_select_expression(
unioned_schema,
fields_to_remove=OVERRIDDEN_FIELDS,
unnest_structs=restructure_metrics,
max_unnest_depth=2,
unnest_allowlist="metrics",
)
queries.append(
dict(
select_expression=select_expression,
dataset=channel_dataset,
table=view_name,
channel=channel_app.get("app_channel"),
app_name=release_app["app_name"],
includes_client_info=any(
[
"client_info" == f["name"]
for f in unioned_schema.schema["fields"]
]
),
)
)
render_kwargs = dict(
project_id=project_id, target_view=full_view_id, queries=queries
)
return reformat(view_template.render(**render_kwargs))
view_sql = _generate_view_sql(restructure_metrics=True)
if len(view_sql) > VIEW_SQL_LENGTH_LIMIT:
print(
f"Generated SQL for `{full_view_id}` view with restructured `metrics` exceeds {VIEW_SQL_LENGTH_LIMIT:,} character limit."
" Regenerating SQL without restructured `metrics`."
)
view_sql = _generate_view_sql(restructure_metrics=False)
# write generated SQL files to destination folders
if output_dir:
write_sql(
output_dir,
full_view_id,
"view.sql",
view_sql,
skip_existing=str(
get_table_dir(output_dir, full_view_id) / "view.sql"
)
in skip_existing,
)
metadata_content = VIEW_METADATA_TEMPLATE.format(
ping_name=ping_name,
app_name=release_app["canonical_app_name"],
app_channels=", ".join(included_channel_views),
)
metadata_file = Path(
get_table_dir(output_dir, full_view_id) / "metadata.yaml"
)
if metadata_file.exists():
with metadata_file.open() as f:
existing_metadata = yaml.load(f, Loader=yaml.FullLoader)
if (
"friendly_name" not in existing_metadata
and "description" not in existing_metadata
):
metadata_content = metadata_content + yaml.dump(
existing_metadata
)
write_sql(
output_dir,
full_view_id,
"metadata.yaml",
metadata_content,
skip_existing=str(
get_table_dir(output_dir, full_view_id) / "metadata.yaml"
)
in skip_existing,
)
schema_dir = get_table_dir(output_dir, full_view_id)
# remove overridden fields from schema
# it's assumed that these fields are added separately, or ignored completely
unioned_schema.schema["fields"] = [
field
for field in unioned_schema.schema["fields"]
if field["name"] not in OVERRIDDEN_FIELDS
]
# normalized_app_id is not part of the underlying table the schemas are derived from,
# the field gets added as part of the view definition, so we have to add it manually to the schema
unioned_schema.schema["fields"] = [
{
"name": "normalized_app_id",
"mode": "NULLABLE",
"type": "STRING",
"description": "App ID of the channel data was received from",
},
{
"name": "normalized_channel",
"mode": "NULLABLE",
"type": "STRING",
"description": "Normalized channel name",
},
] + unioned_schema.schema["fields"]
unioned_schema.to_yaml_file(schema_dir / "schema.yaml")