generator/views/ping_view.py (108 lines of code) (raw):

"""Class to describe a Ping View.""" from __future__ import annotations from collections import defaultdict from typing import Any, Dict, Iterator, List, Optional, Union from . import lookml_utils from .view import OMIT_VIEWS, View, ViewDict class PingView(View): """A view on a ping table.""" type: str = "ping_view" allow_glean: bool = False def __init__(self, namespace: str, name: str, tables: List[Dict[str, Any]]): """Create instance of a PingView.""" super().__init__(namespace, name, self.__class__.type, tables) @classmethod def from_db_views( klass, namespace: str, is_glean: bool, channels: List[Dict[str, str]], db_views: dict, ) -> Iterator[PingView]: """Get Looker views for a namespace.""" if (klass.allow_glean and not is_glean) or (not klass.allow_glean and is_glean): return view_tables: Dict[str, Dict[str, Dict[str, str]]] = defaultdict(dict) for channel in channels: dataset = channel["dataset"] for view_id, references in db_views[dataset].items(): if view_id in OMIT_VIEWS: continue table_id = f"mozdata.{dataset}.{view_id}" table: Dict[str, str] = {"table": table_id} if channel.get("channel") is not None: table["channel"] = channel["channel"] # Only include those that select from a single ping source table # or union together multiple ping source tables of the same name. reference_table_names = set(r[-1] for r in references) reference_dataset_names = set(r[-2] for r in references) if ( len(reference_table_names) != 1 or channel["source_dataset"] not in reference_dataset_names ): continue view_tables[view_id][table_id] = table for view_id, tables_by_id in view_tables.items(): yield klass(namespace, view_id, list(tables_by_id.values())) @classmethod def from_dict(klass, namespace: str, name: str, _dict: ViewDict) -> PingView: """Get a view from a name and dict definition.""" return klass(namespace, name, _dict["tables"]) def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: """Generate LookML for this view.""" view_defn: Dict[str, Any] = {"name": self.name} # use schema for the table where channel=="release" or the first one table = next( (table for table in self.tables if table.get("channel") == "release"), self.tables[0], )["table"] dimensions = self.get_dimensions(table, v1_name, dryrun=dryrun) # set document id field as a primary key for joins view_defn["dimensions"] = [ d if d["name"] != "document_id" else dict(**d, primary_key="yes") for d in dimensions if not lookml_utils._is_dimension_group(d) ] view_defn["dimension_groups"] = [ d for d in dimensions if lookml_utils._is_dimension_group(d) ] # add measures view_defn["measures"] = self.get_measures(dimensions, table, v1_name) [project, dataset, table_id] = table.split(".") table_schema = dryrun.create( project=project, dataset=dataset, table=table_id, ).get_table_schema() nested_views = lookml_utils._generate_nested_dimension_views( table_schema, self.name ) # Round-tripping through a dict to get an ordered deduped list. suggestions = list( dict.fromkeys( _table["channel"] for _table in self.tables if "channel" in _table ) ) if len(suggestions) > 1: view_defn["filters"] = [ { "name": "channel", "type": "string", "description": "Filter by the app's channel", "sql": "{% condition %} ${TABLE}.normalized_channel {% endcondition %}", "default_value": suggestions[0], "suggestions": suggestions, } ] view_defn["sql_table_name"] = f"`{table}`" return {"views": [view_defn] + nested_views} def get_dimensions( self, table, v1_name: Optional[str], dryrun ) -> List[Dict[str, Any]]: """Get the set of dimensions for this view.""" # add dimensions and dimension groups return lookml_utils._generate_dimensions(table, dryrun=dryrun) def get_measures( self, dimensions: List[dict], table: str, v1_name: Optional[str] ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: """Generate measures from a list of dimensions. When no dimension-specific measures are found, return a single "count" measure. Raise ClickException if dimensions result in duplicate measures. """ # Iterate through each of the dimensions and accumulate any measures # that we want to include in the view. We pull out the client id first # since we'll use it to calculate per-measure client counts. measures: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [] client_id_field = self.get_client_id(dimensions, table) if client_id_field is not None: measures.append( { "name": "clients", "type": "count_distinct", "sql": f"${{{client_id_field}}}", } ) for dimension in dimensions: dimension_name = dimension["name"] if dimension_name == "document_id": measures += [{"name": "ping_count", "type": "count"}] return measures