generator/views/lookml_utils.py (243 lines of code) (raw):

"""Utils for generating lookml.""" import re import tarfile import urllib.request from collections import defaultdict from io import BytesIO from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple import click import yaml from jinja2 import Environment, FileSystemLoader GENERATOR_PATH = Path(__file__).parent.parent BIGQUERY_TYPE_TO_DIMENSION_TYPE = { "BIGNUMERIC": "string", "BOOLEAN": "yesno", "BYTES": "string", "DATE": "time", "DATETIME": "time", "FLOAT": "number", "INTEGER": "number", "NUMERIC": "number", "STRING": "string", "TIME": "time", "TIMESTAMP": "time", } HIDDEN_DIMENSIONS = { ("document_id",), ("client_id",), ("client_info", "client_id"), ("context_id",), ("additional_properties",), } MAP_LAYER_NAMES = { ("country",): "countries", ("metadata", "geo", "country"): "countries", } DEFAULT_MAX_SUGGEST_PERSIST_FOR = "24 hours" UPPERCASE_INITIALISMS_PATTERN = re.compile( ( r"\b(" + "|".join( [ "CPU", "DB", "DNS", "DNT", "DOM", "GC", "GPU", "HTTP", "ID", "IO", "IP", "ISP", "JWE", "LB", "OS", "SDK", "SERP", "SSL", "TLS", "UI", "URI", "URL", "UTM", "UUID", ] ) + r")\b" ), re.IGNORECASE, ) def _get_dimension( path: Tuple[str, ...], field_type: str, mode: str, description: Optional[str] ) -> Dict[str, Any]: result: Dict[str, Any] = {} result["sql"] = "${TABLE}." + ".".join(path) name = path if ( mode == "REPEATED" or path in HIDDEN_DIMENSIONS or field_type not in BIGQUERY_TYPE_TO_DIMENSION_TYPE ): result["hidden"] = "yes" else: result["type"] = BIGQUERY_TYPE_TO_DIMENSION_TYPE[field_type] result["suggest_persist_for"] = DEFAULT_MAX_SUGGEST_PERSIST_FOR group_label, group_item_label = None, None if len(path) > 1: group_label = slug_to_title(" ".join(path[:-1])) group_item_label = slug_to_title(path[-1]) if result["type"] == "time": # Remove _{type} suffix from the last path element for dimension group # names For example submission_date and submission_timestamp become # submission, and metadata.header.parsed_date becomes # metadata__header__parsed. This is because the timeframe will add a _{type} # suffix to the individual dimension names. name = *path[:-1], re.sub("_(date|time(stamp)?)$", "", path[-1]) result["timeframes"] = [ "raw", "time", "date", "week", "month", "quarter", "year", ] if field_type == "DATE": result["timeframes"].remove("time") result["convert_tz"] = "no" result["datatype"] = "date" if group_label and group_item_label: # Dimension groups should not be nested, see issue #82 result["label"] = f"{group_label}: {group_item_label}" elif len(path) > 1: result["group_label"] = group_label result["group_item_label"] = group_item_label if path in MAP_LAYER_NAMES: result["map_layer_name"] = MAP_LAYER_NAMES[path] result["name"] = "__".join(name) if description: result["description"] = description return result def _generate_dimensions_helper(schema: List[Any], *prefix: str) -> Iterable[dict]: for field in sorted(schema, key=lambda f: f["name"]): if field["type"] == "RECORD" and not field.get("mode", "") == "REPEATED": yield from _generate_dimensions_helper( field["fields"], *prefix, field["name"] ) else: yield _get_dimension( (*prefix, field["name"]), field["type"], field.get("mode", ""), field.get("description", ""), ) def _generate_dimensions(table: str, dryrun) -> List[Dict[str, Any]]: """Generate dimensions and dimension groups from a bigquery table. When schema contains both submission_timestamp and submission_date, only produce a dimension group for submission_timestamp. Raise ClickException if schema results in duplicate dimensions. """ dimensions = {} [project, dataset, table] = table.split(".") table_schema = dryrun.create( project=project, dataset=dataset, table=table, ).get_table_schema() for dimension in _generate_dimensions_helper(table_schema): name_key = dimension["name"] # This prevents `time` dimension groups from overwriting other dimensions below if dimension.get("type") == "time": name_key += "_time" # overwrite duplicate "submission", "end", "start" dimension group, thus picking the # last value sorted by field name, which is submission_timestamp # See also https://github.com/mozilla/lookml-generator/issues/471 if name_key in dimensions and not ( dimension.get("type") == "time" and ( dimension["name"] == "submission" or dimension["name"].endswith("end") or dimension["name"].endswith("start") ) ): raise click.ClickException( f"duplicate dimension {name_key!r} for table {table!r}" ) dimensions[name_key] = dimension return list(dimensions.values()) def _generate_dimensions_from_query(query: str, dryrun) -> List[Dict[str, Any]]: """Generate dimensions and dimension groups from a SQL query.""" schema = dryrun.create(sql=query).get_schema() dimensions = {} for dimension in _generate_dimensions_helper(schema or []): name_key = dimension["name"] # This prevents `time` dimension groups from overwriting other dimensions below if dimension.get("type") == "time": name_key += "_time" # overwrite duplicate "submission", "end", "start" dimension group, thus picking the # last value sorted by field name, which is submission_timestamp # See also https://github.com/mozilla/lookml-generator/issues/471 if name_key in dimensions and not ( dimension.get("type") == "time" and ( dimension["name"] == "submission" or dimension["name"].endswith("end") or dimension["name"].endswith("start") ) ): raise click.ClickException(f"duplicate dimension {name_key!r} in query") dimensions[name_key] = dimension return list(dimensions.values()) def _generate_nested_dimension_views( schema: List[dict], view_name: str ) -> List[Dict[str, Any]]: """ Recursively generate views for nested fields. Nested fields are created as views, with dimensions and optionally measures. """ views: List[Dict[str, Any]] = [] for field in sorted(schema, key=lambda f: f["name"]): if field["type"] == "RECORD" and field["name"] != "labeled_counter": # labeled_counter is handled explicitly in glean ping views; hidden for other views if field.get("mode") == "REPEATED": nested_field_view: Dict[str, Any] = { "name": f"{view_name}__{field['name']}" } dimensions = _generate_dimensions_helper(schema=field["fields"]) nested_field_view["dimensions"] = [ d for d in dimensions if not _is_dimension_group(d) ] nested_field_view["dimension_groups"] = [ d for d in dimensions if _is_dimension_group(d) ] views = ( views + [nested_field_view] + _generate_nested_dimension_views( field["fields"], f"{view_name}__{field['name']}" ) ) else: views = views + _generate_nested_dimension_views( field["fields"], f"{view_name}__{field['name']}" ) return views def _is_dimension_group(dimension: dict): """Determine if a dimension is actually a dimension group.""" return "timeframes" in dimension or "intervals" in dimension def escape_filter_expr(expr: str) -> str: """Escape filter expression for special Looker chars.""" return re.sub(r'((?:^-)|["_%,^])', r"^\1", expr, count=0) def _is_nested_dimension(dimension: dict): return ( "hidden" in dimension and dimension["hidden"] and "nested" in dimension and dimension["nested"] ) def render_template(filename, template_folder, **kwargs) -> str: """Render a given template using Jinja.""" env = Environment( loader=FileSystemLoader(GENERATOR_PATH / f"{template_folder}/templates") ) template = env.get_template(filename) rendered = template.render(**kwargs) return rendered def slug_to_title(slug): """Convert a slug to title case, with some common initialisms uppercased.""" return UPPERCASE_INITIALISMS_PATTERN.sub( lambda match: match.group(0).upper(), slug.replace("_", " ").title() ) # Map from view to qualified references {dataset: {view: [[project, dataset, table],]}} BQViewReferenceMap = Dict[str, Dict[str, List[List[str]]]] def get_bigquery_view_reference_map( generated_sql_uri: str, ) -> BQViewReferenceMap: """Get a mapping from BigQuery datasets to views with references.""" with urllib.request.urlopen(generated_sql_uri) as f: tarbytes = BytesIO(f.read()) views: BQViewReferenceMap = defaultdict(dict) with tarfile.open(fileobj=tarbytes, mode="r:gz") as tar: for tarinfo in tar: if tarinfo.name.endswith("/metadata.yaml"): metadata = yaml.safe_load(tar.extractfile(tarinfo.name)) # type: ignore references = metadata.get("references", {}) if "view.sql" not in references: continue *_, project, dataset_id, view_id, _ = tarinfo.name.split("/") if project == "moz-fx-data-shared-prod": views[dataset_id][view_id] = [ ref.split(".") for ref in references["view.sql"] ] return views