def _process_ping()

in sql_generators/glean_usage/glean_app_ping_views.py [0:0]


        def _process_ping(ping_name):
            view_name = ping_name.replace("-", "_")
            full_view_id = f"moz-fx-data-shared-prod.{target_dataset}.{view_name}"

            # generate a unioned schema that contains all fields of all ping tables across all channels
            unioned_schema = Schema.empty()

            # cache schemas to be reused when generating the select expression
            cached_schemas = {}

            # iterate through app_info to get all channels
            included_channel_apps = []
            included_channel_views = []
            for channel_app in app_info:
                channel_dataset = channel_app["bq_dataset_family"]
                channel_dataset_view = f"{channel_dataset}.{view_name}"
                schema = Schema.for_table(
                    "moz-fx-data-shared-prod",
                    channel_dataset,
                    view_name,
                    partitioned_by="submission_timestamp",
                    use_cloud_function=use_cloud_function,
                    id_token=id_token,
                )
                cached_schemas[channel_dataset] = deepcopy(schema)

                if schema.schema["fields"] == []:
                    # check for empty schemas (e.g. restricted ones) and skip for now
                    print(f"Cannot get schema for `{channel_dataset_view}`; Skipping")
                    continue

                try:
                    unioned_schema.merge(
                        schema, add_missing_fields=True, ignore_incompatible_fields=True
                    )
                except Exception as e:
                    # if schema incompatibilities are detected, then only generate for release channel
                    print(
                        f"Cannot UNION `moz-fx-data-shared-prod.{channel_dataset_view}`: {e}"
                    )
                    break

                included_channel_apps.append(channel_app)
                included_channel_views.append(channel_dataset_view)

            if included_channel_apps == []:
                # nothing to render
                return

            # generate the SELECT expression used for UNIONing the stable tables;
            # fields that are not part of a table, but exist in others, are set to NULL
            def _generate_view_sql(restructure_metrics=False) -> str:
                queries = []
                for channel_app in included_channel_apps:
                    channel_dataset = channel_app["bq_dataset_family"]

                    # compare table schema with unioned schema to determine fields that need to be NULL
                    select_expression = cached_schemas[
                        channel_dataset
                    ].generate_compatible_select_expression(
                        unioned_schema,
                        fields_to_remove=OVERRIDDEN_FIELDS,
                        unnest_structs=restructure_metrics,
                        max_unnest_depth=2,
                        unnest_allowlist="metrics",
                    )

                    queries.append(
                        dict(
                            select_expression=select_expression,
                            dataset=channel_dataset,
                            table=view_name,
                            channel=channel_app.get("app_channel"),
                            app_name=release_app["app_name"],
                            includes_client_info=any(
                                [
                                    "client_info" == f["name"]
                                    for f in unioned_schema.schema["fields"]
                                ]
                            ),
                        )
                    )

                render_kwargs = dict(
                    project_id=project_id, target_view=full_view_id, queries=queries
                )
                return reformat(view_template.render(**render_kwargs))

            view_sql = _generate_view_sql(restructure_metrics=True)
            if len(view_sql) > VIEW_SQL_LENGTH_LIMIT:
                print(
                    f"Generated SQL for `{full_view_id}` view with restructured `metrics` exceeds {VIEW_SQL_LENGTH_LIMIT:,} character limit."
                    " Regenerating SQL without restructured `metrics`."
                )
                view_sql = _generate_view_sql(restructure_metrics=False)

            # write generated SQL files to destination folders
            if output_dir:
                write_sql(
                    output_dir,
                    full_view_id,
                    "view.sql",
                    view_sql,
                    skip_existing=str(
                        get_table_dir(output_dir, full_view_id) / "view.sql"
                    )
                    in skip_existing,
                )

                metadata_content = VIEW_METADATA_TEMPLATE.format(
                    ping_name=ping_name,
                    app_name=release_app["canonical_app_name"],
                    app_channels=", ".join(included_channel_views),
                )
                metadata_file = Path(
                    get_table_dir(output_dir, full_view_id) / "metadata.yaml"
                )
                if metadata_file.exists():
                    with metadata_file.open() as f:
                        existing_metadata = yaml.load(f, Loader=yaml.FullLoader)
                        if (
                            "friendly_name" not in existing_metadata
                            and "description" not in existing_metadata
                        ):
                            metadata_content = metadata_content + yaml.dump(
                                existing_metadata
                            )

                write_sql(
                    output_dir,
                    full_view_id,
                    "metadata.yaml",
                    metadata_content,
                    skip_existing=str(
                        get_table_dir(output_dir, full_view_id) / "metadata.yaml"
                    )
                    in skip_existing,
                )

                schema_dir = get_table_dir(output_dir, full_view_id)

                # remove overridden fields from schema
                # it's assumed that these fields are added separately, or ignored completely
                unioned_schema.schema["fields"] = [
                    field
                    for field in unioned_schema.schema["fields"]
                    if field["name"] not in OVERRIDDEN_FIELDS
                ]

                # normalized_app_id is not part of the underlying table the schemas are derived from,
                # the field gets added as part of the view definition, so we have to add it manually to the schema
                unioned_schema.schema["fields"] = [
                    {
                        "name": "normalized_app_id",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "App ID of the channel data was received from",
                    },
                    {
                        "name": "normalized_channel",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "Normalized channel name",
                    },
                ] + unioned_schema.schema["fields"]

                unioned_schema.to_yaml_file(schema_dir / "schema.yaml")