bigquery_etl/glam/utils.py (78 lines of code) (raw):

"""Utilities for the GLAM module.""" import json import subprocess from collections import namedtuple from itertools import combinations from typing import List, Tuple from mozilla_schema_generator.glean_ping import GleanPing CustomDistributionMeta = namedtuple( "CustomDistributionMeta", ["name", "type", "range_min", "range_max", "bucket_count", "histogram_type"], ) def run(command, **kwargs) -> str: """Return the result of stdout and raise on subprocess error.""" if isinstance(command, list): args = command elif isinstance(command, str): args = command.split() else: raise RuntimeError(f"run command is invalid: {command}") try: res = ( subprocess.run(args, stdout=subprocess.PIPE, check=True, **kwargs) .stdout.decode() .strip() ) except subprocess.CalledProcessError as e: print(e.output.decode()) raise e return res def get_schema(table: str, project: str = "moz-fx-data-shared-prod"): """Return the dictionary representation of the BigQuery table schema. This returns types in the legacy SQL format. """ process = subprocess.Popen( ["bq", "show", "--schema", "--format=json", f"{project}:{table}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = process.communicate() if process.returncode > 0: raise Exception( f"Call to bq exited non-zero: {process.returncode}", stdout, stderr ) return json.loads(stdout) def ping_type_from_table(qualified_table): """Return the name of a ping as defined in mozilla-pipeline-schemas. Example: org_mozilla_fenix_stable.deletion_request_v1 -> deletion-request """ table_id = qualified_table.split(".")[-1] ping_name = table_id.rsplit("_", 1)[0] return ping_name.replace("_", "-") def get_custom_distribution_metadata(product_name) -> List[CustomDistributionMeta]: """Get metadata for reconstructing custom distribution buckets in Glean metrics.""" # GleanPing.get_repos -> List[Tuple[name: str, app_id: str]] glean = GleanPing(dict(name=product_name, app_id=product_name)) probes = glean.get_probes() custom = [] for probe in probes: # We use endswith here to accommodate for types prefixed with "labeled" if not probe.get_type().endswith("custom_distribution"): continue meta = CustomDistributionMeta( probe.get_name(), probe.get_type(), probe.get("range_min"), probe.get("range_max"), probe.get("bucket_count"), probe.get("histogram_type"), ) custom.append(meta) return custom def compute_datacube_groupings( attributes: List[str], fixed_attributes: List[str] = [] ) -> List[List[Tuple[str, bool]]]: """Generate the combinations of attributes to be computed. These are the combinations that are available to the frontend. Some dimensions may be fixed and always required. """ max_combinations = len(attributes) result = [] for subset_size in reversed(range(max_combinations + 1)): for grouping in combinations(attributes, subset_size): if len(set(fixed_attributes) - set(grouping)) > 0: # the fixed attributes are always a subset in the grouping continue select_expr = [] for attribute in attributes: select_expr.append((attribute, attribute in grouping)) result.append(select_expr) return result if __name__ == "__main__": from pprint import PrettyPrinter pp = PrettyPrinter() pp.pprint(get_custom_distribution_metadata("fenix"))