etl/glean_etl.py (486 lines of code) (raw):

import copy import os import requests import stringcase import yaml from .bigquery import get_bigquery_column_name, get_bigquery_ping_table_name from .expiry import get_expiry_text, get_mapped_expiry from .glam import SUPPORTED_GLAM_METRIC_TYPES, get_glam_metadata_for_metric from .glean import GleanApp from .glean_auto_events import get_auto_events_for_app, get_auto_events_names from .looker import ( get_looker_explore_metadata_for_metric, get_looker_explore_metadata_for_ping, get_looker_monitoring_metadata_for_event, ) from .search import create_metrics_search_js from .utils import dump_json, get_event_name_and_category # Various additional sources of metadata ANNOTATIONS_URL = os.getenv( "ANNOTATIONS_URL", "https://mozilla.github.io/glean-annotations/api.json" ) NAMESPACES_URL = os.getenv( "NAMESPACES_URL", "https://raw.githubusercontent.com/mozilla/looker-hub/main/namespaces.yaml" ) FIREFOX_PRODUCT_DETAIL_URL = os.getenv( "FIREFOX_PRODUCT_DETAIL_URL", "https://product-details.mozilla.org/1.0/firefox_history_major_releases.json", ) EXPERIMENT_DATA_URL = os.getenv( "EXPERIMENT_DATA_URL", "https://experimenter.services.mozilla.com/api/v6/experiments/", ) EXPERIMENTER_URL_TEMPLATE = "https://experimenter.services.mozilla.com/nimbus/{}/summary" # Priority for getting metric data (use the later definitions of nightly over release) METRIC_CHANNEL_PRIORITY = {"nightly": 1, "beta": 2, "release": 3, "esr": 4} # Priority for sorting app ids in the UI (of anticipated relevance to the suer) USER_CHANNEL_PRIORITY = {"release": 1, "beta": 2, "nightly": 3, "esr": 4} # Certain words are blocked by uBlock Origin, so we need to map them to something else # to avoid the page being blocked # See: https://github.com/mozilla/glean-dictionary/issues/1682 UBLOCK_ORIGIN_PRIVACY_FILTER = {"ad_impression": "advert_impression"} # Handle these apps as having no external dependencies. # This marks all metrics coming from an external dependency as `in-source=false`. # # Reason: # Some apps disable all telemetry except some minimal builtin ones. # We can't remove the dependencies in probe-scraper, # because MSG does not track history of dependencies. # A dependency removal would result in incompatible schema changes # (because columns would be deleted). APPS_DEPENDENCIES_REMOVED = ["focus_ios", "klar_ios", "focus_android", "klar_android"] def _normalize_metrics(name): # replace . with _ so sirv doesn't think that # a metric is a file metric_name = name.replace(".", "_") for key, value in UBLOCK_ORIGIN_PRIVACY_FILTER.items(): if key in metric_name: metric_name = metric_name.replace(key, value) # if a metric name starts with "metrics", uBlock Origin # will block the network call to get the JSON resource # See: https://github.com/mozilla/glean-dictionary/issues/550 # To get around this, we add "data" to metric names return f"data_{metric_name}" def _get_annotation(annotations_index, origin, item_type, identifier=None): if item_type == "app": return annotations_index.get(origin, {}) if not identifier: raise Exception("Identifier required for non-app item types") return annotations_index.get(origin, {}).get(item_type, {}).get(identifier, {}) def _incorporate_annotation(item, item_annotation, app=False, full=False): incorporated = dict(item, has_annotation=len(item_annotation) > 0) if app: # app annotations have some special properties if item_annotation.get("logo"): # the logo is dowloaded locally elsewhere incorporated.update( {"logo": f"/data/{item['app_name']}/" + _get_logo_filename(item_annotation["logo"])} ) if item_annotation.get("featured"): incorporated["featured"] = True # we use the `app_tags` property to disambiguate between the tags # that are a property of an application vs. the list of tags that # it has (and can be applied to other things) if item_annotation.get("tags"): incorporated["app_tags"] = item_annotation["tags"] elif item_annotation.get("tags"): # for non-apps, just use the tags from the annotation directly, if they # exist # annotation tags always take precedence over any tags defined in # metrics.yaml incorporated.update({"tags": item_annotation["tags"]}) if full: # other annotations are only applied to the full version (not the # summaries we list out in various places) for annotation_type in ["commentary", "warning"]: if item_annotation.get(annotation_type): incorporated[annotation_type] = item_annotation[annotation_type] return incorporated def _expand_tags(item, tag_descriptions): """ Expand the tags into full name/description objects (for full definitions) """ return dict( item, tags=[ {"name": tag_name, "description": tag_descriptions.get(tag_name, "Unknown tag")} for tag_name in item["tags"] ], ) def _get_resource_path(line: str) -> str: return line.replace(".", "_") def _get_logo_filename(logo_url: str) -> str: _, file_extension = os.path.splitext(logo_url) return f"logo{file_extension}" def _get_app_variant_description(app): """ Gets a description of app variants (intended for use inside dropdowns) """ description = app.app.get("app_channel", "release") # Make it obvious if a variant should no longer be used. if app.app.get("deprecated"): description = f"[Deprecated] {description}" return description def _get_metric_sample_data(experiment_data) -> dict: # get experiment metric sampling data to enrich metric definitions interesting_experiments = [ experiment for experiment in experiment_data if "glean" in experiment["featureIds"] ] active_experiments = [ experiment for experiment in interesting_experiments if (experiment["startDate"] is not None or experiment["isEnrollmentPaused"] is False) and experiment["endDate"] is None ] sampling_data = {} for experiment in active_experiments: app_name = experiment["appName"] bucket_config = experiment["bucketConfig"] sample_size = bucket_config["count"] / bucket_config["total"] channel = experiment["channel"] sampling_data[app_name] = sampling_data.get(app_name, {}) for branch in experiment["branches"]: feature_configs = branch["features"] filtered_configs = [ config for config in feature_configs if config["featureId"] == "glean" ] metric_config = [ config["value"]["gleanMetricConfiguration"] for config in filtered_configs if config["value"].get("gleanMetricConfiguration") is not None ] for entry in metric_config: for key in entry: sampling_data[app_name][key] = sampling_data[app_name].get(key, {}) sampling_data[app_name][key][channel] = sampling_data[app_name][key].get( channel, {} ) sampling_data[app_name][key][channel]["sample_size"] = sample_size sampling_data[app_name][key][channel]["experiment_id"] = experiment["slug"] sampling_data[app_name][key][channel]["start_date"] = experiment["startDate"] sampling_data[app_name][key][channel]["end_date"] = experiment["endDate"] sampling_data[app_name][key][channel]["targeting"] = experiment["targeting"] sampling_data[app_name][key][channel]["experimenter_link"] = ( EXPERIMENTER_URL_TEMPLATE.format(experiment["slug"]) ) return sampling_data def _is_metric_in_ping(metric, ping_data): if ping_data["name"] not in metric["pings"]: return False if metric["name"] == "client_id": return ping_data["include_client_id"] if metric["is_part_of_info_section"]: return ping_data.get("include_info_sections", True) return True def write_glean_metadata(output_dir, functions_dir, app_names=None): """ Writes out the metadata for use by the dictionary """ # first, get the basic metadata from various sources annotations_index = requests.get(ANNOTATIONS_URL).json() looker_namespaces = yaml.safe_load(requests.get(NAMESPACES_URL).text) product_details = requests.get(FIREFOX_PRODUCT_DETAIL_URL).json() latest_fx_release_version = list(product_details)[-1] metrics_sampling_info = _get_metric_sample_data(requests.get(EXPERIMENT_DATA_URL).json()) # Then, get the apps we're using apps = [app for app in GleanApp.get_apps()] if app_names: apps = [app for app in apps if app.app_name in app_names] app_groups = {} for app in apps: if app.app.get("skip_documentation"): # respect apps that don't want to appear in the glean dictionary continue if not app_groups.get(app.app_name): app_groups[app.app_name] = dict( app_name=app.app_name, app_description=app.app["app_description"], canonical_app_name=app.app["canonical_app_name"], deprecated=app.app.get("deprecated", False), url=app.app["url"], notification_emails=app.app["notification_emails"], app_ids=[], ) app_groups[app.app_name]["app_ids"].extend( [ { "name": app.app_id, "description": app.app.get("description", app.app["app_description"]), "channel": app.app.get("app_channel", "release"), "deprecated": app.app.get("deprecated", False), "prototype": app.app.get("prototype", False), } ] ) # sort each set of app ids by the following criteria # metric channel priority nightly < beta < release < esr # non-deprecated < deprecated for app_group in app_groups.values(): app_group["app_ids"].sort(key=lambda app_id: METRIC_CHANNEL_PRIORITY[app_id["channel"]]) app_group["app_ids"].sort(key=lambda app_id: app_id["deprecated"]) # Process each grouping of apps into a set of summaries, app details, and all the rest app_summaries = [] for app_name, app_group in app_groups.items(): app_dir = os.path.join(output_dir, app_name) (app_id_dir, app_ping_dir, app_table_dir, app_metrics_dir) = ( os.path.join(app_dir, subtype) for subtype in ("app_ids", "pings", "tables", "metrics") ) for directory in (app_id_dir, app_ping_dir, app_table_dir, app_metrics_dir): os.makedirs(directory, exist_ok=True) app_annotation = _get_annotation(annotations_index, app_name, "app") # Create a summary (used in the top-level list of apps, and base metadata for the # app detail page) app_summary = _incorporate_annotation(app_group, app_annotation.get("app", {}), app=True) if app_summary.get("logo"): with open(os.path.join(app_dir, _get_logo_filename(app_summary["logo"])), "wb") as f: # want the original URL for getting the logo f.write(requests.get(app_annotation["app"]["logo"]).content) # An application group is considered a prototype only if all its application ids are if all([app_id.get("prototype") for app_id in app_group["app_ids"]]): app_summary["prototype"] = True # add the summary application to the app list app_summaries.append(app_summary) # Now get more detail on the application for the detail page and all the metrics app_data = dict(app_summary, pings=[], metrics=[]) app_tags_for_objects = app_annotation.get( "tags", {} ) # tags for objects in the app (e.g. metrics) app_tags_for_app = app_summary.get("app_tags", []) # tags for the app itself app_metrics = {} metric_pings = dict(data=[]) # keep track of which metric and ping identifiers we have seen so far metric_identifiers_seen = set() ping_identifiers_seen = set() for app_id in [app["name"] for app in app_group["app_ids"]]: app = next(app for app in apps if app.app_id == app_id) app_is_deprecated = app.app.get("deprecated") # app-id tags: tags specified in the annotations (and or more recent versions of an app) # will always override older ones for tag in app.get_tags(): if not app_tags_for_objects.get(tag.identifier): app_tags_for_objects[tag.identifier] = tag.description # information about this app_id open(os.path.join(app_id_dir, f"{_get_resource_path(app_id)}.json"), "w").write( dump_json(dict(app.app, app_tags=app_tags_for_app)) ) pings_with_client_id = set() # ping data for ping in app.get_pings(): if ping.identifier not in ping_identifiers_seen: ping_identifiers_seen.add(ping.identifier) app_data["pings"].append( _incorporate_annotation( dict( ping.definition, tags=ping.tags, variants=[], ), _get_annotation( annotations_index, ping.definition["origin"], "pings", ping.identifier, ), ) ) ping_data = next(pd for pd in app_data["pings"] if pd["name"] == ping.identifier) if ping_data["include_client_id"]: pings_with_client_id.add(ping_data["name"]) # write table description (app variant specific) ping_name_snakecase = stringcase.snakecase(ping.identifier) stable_ping_table_name = f"{app.app['bq_dataset_family']}.{ping_name_snakecase}" live_ping_table_name = ( f"{app.app['bq_dataset_family']}_live.{ping_name_snakecase}_v1" ) bq_path = ( f"{app.app['document_namespace']}/{ping.identifier}/{ping.identifier}.1.bq" ) bq_definition = ( "https://github.com/mozilla-services/mozilla-pipeline-schemas/blob/generated-schemas/schemas/" # noqa + bq_path ) bq_schema = requests.get( "https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas/generated-schemas/schemas/" # noqa + bq_path ).json() app_channel = app.app.get("app_channel") variant_data = dict( id=app_id, description=_get_app_variant_description(app), table=stable_ping_table_name, channel=app_channel if app_channel else "release", ) looker_explore = get_looker_explore_metadata_for_ping( looker_namespaces, app, app_group, ping ) if not app_is_deprecated and looker_explore: variant_data.update({"looker_explore": looker_explore}) ping_data["variants"].append(variant_data) app_variant_table_dir = os.path.join(app_table_dir, _get_resource_path(app.app_id)) os.makedirs(app_variant_table_dir, exist_ok=True) open(os.path.join(app_variant_table_dir, f"{ping.identifier}.json"), "w").write( dump_json( dict( bq_definition=bq_definition, bq_schema=bq_schema, live_table=live_ping_table_name, name=ping.identifier, stable_table=stable_ping_table_name, app_id=app_id, canonical_app_name=app.app["canonical_app_name"], app_tags=app_tags_for_app, ) ) ) # metrics data metrics = app.get_metrics() app_sampling_info = metrics_sampling_info.get(app_name) for metric in metrics: if metric.identifier not in metric_identifiers_seen: metric_identifiers_seen.add(metric.identifier) # read the annotation, if any metric_annotation = _get_annotation( annotations_index, metric.definition["origin"], "metrics", metric.identifier ) metric_sample_info: dict | None = ( dict(app_sampling_info.get(metric.identifier)) if app_sampling_info is not None and app_sampling_info.get(metric.identifier) is not None else None ) is_sampled = metric_sample_info is not None if is_sampled: for channel in metric_sample_info: sampled_text = ( str(metric_sample_info.get(channel)["sample_size"] * 100) + "% " + "on" if metric.definition["disabled"] is True else str(metric_sample_info.get(channel)["sample_size"] * 100) + "% " + "off" ) metric_sample_info.get(channel)["sampled_text"] = sampled_text # Force all outside metrics as removed. if app_name in APPS_DEPENDENCIES_REMOVED: if metric.definition["origin"] != app_name: metric.definition.update({"in_source": False}) base_definition = _incorporate_annotation( dict( name=metric.identifier, description=metric.description, tags=metric.tags, in_source=metric.definition["in_source"], latest_fx_release_version=latest_fx_release_version, extra_keys=metric.definition["extra_keys"] if "extra_keys" in metric.definition else None, type=metric.definition["type"], expires=get_mapped_expiry( metric.definition["expires"], app_name, product_details ), expiry_text=get_expiry_text( metric.definition["expires"], app_name, product_details ), sampled=is_sampled, sampled_text=(metric_sample_info.get("release")["sampled_text"]) if metric_sample_info is not None else "Not sampled", is_part_of_info_section=metric.bq_prefix in ["client_info", "ping_info"], ), metric_annotation, ) if metric.definition["origin"] != app_name: base_definition.update({"origin": metric.definition["origin"]}) # metrics with associated pings metric_pings["data"].append( dict(base_definition, pings=metric.definition["send_in_pings"]) ) # the summary of metrics app_data["metrics"].append(base_definition) # the full metric definition app_metrics[metric.identifier] = _expand_tags( _incorporate_annotation( dict( metric.definition, name=metric.identifier, tags=metric.tags, # convert send_in_pings to a list so we can sort (see below) send_in_pings=list(metric.definition["send_in_pings"]), repo_url=app.app["url"], variants=[], expires=base_definition["expires"], latest_fx_release_version=latest_fx_release_version, expiry_text=base_definition["expiry_text"], canonical_app_name=app.app["canonical_app_name"], app_tags=app_tags_for_app, sampling_info=metric_sample_info, ), metric_annotation, full=True, ), app_tags_for_objects, ) if metric.definition["type"] == "event": app_metrics[metric.identifier]["event_info"] = { "name": get_event_name_and_category(metric.identifier)[1], "category": get_event_name_and_category(metric.identifier)[0], } # sort "send in pings" alphanumerically, except that `metrics` # should always be first if present and `deletion-request` # should be last ping_priority = {"metrics": 0, "deletion-request": 2} app_metrics[metric.identifier]["send_in_pings"].sort() app_metrics[metric.identifier]["send_in_pings"].sort( key=lambda ping: ping_priority.get(ping, 1) ) # BigQuery and Looker metadata is ping based ping_data = {} for ping_name in metric.definition["send_in_pings"]: ping_data[ping_name] = { "bigquery_table": get_bigquery_ping_table_name( app.app["bq_dataset_family"], ping_name ) } # FIXME: if we allow the metadata format to change, we can # just set it up all in one go above looker_metadata = get_looker_explore_metadata_for_metric( looker_namespaces, app, app_group, metric, ping_name, ping_name in pings_with_client_id, ) if looker_metadata: ping_data[ping_name].update({"looker": looker_metadata}) glam_metadata = get_glam_metadata_for_metric(app, metric, ping_name) ping_data[ping_name].update(glam_metadata) event_monitoring_metadata = get_looker_monitoring_metadata_for_event( app, app_group, metric ) if event_monitoring_metadata: ping_data[ping_name].update({"event_monitoring": event_monitoring_metadata}) etl = dict( ping_data=ping_data, bigquery_column_name=get_bigquery_column_name(metric), ) app_metrics[metric.identifier]["variants"].append( dict( id=app.app_id, channel=app.app.get("app_channel", "release"), description=_get_app_variant_description(app), etl=etl, ) ) # write ping descriptions, resorting the app-specific parts in user preference order for ping_data in app_data["pings"]: ping_data["variants"].sort(key=lambda v: USER_CHANNEL_PRIORITY[v["channel"]]) open(os.path.join(app_ping_dir, f"{ping_data['name']}.json"), "w").write( dump_json( _expand_tags( _incorporate_annotation( dict( ping_data, metrics=[ metric for metric in metric_pings["data"] if _is_metric_in_ping(metric, ping_data) ], tag_descriptions=app_tags_for_objects, canonical_app_name=app.app["canonical_app_name"], app_tags=app_tags_for_app, ), _get_annotation( annotations_index, ping_data["origin"], "pings", ping_data["name"] ), full=True, ), app_tags_for_objects, ) ) ) if "glean.element_click" in app_metrics: auto_events_all_apps = get_auto_events_names() auto_events_for_app = get_auto_events_for_app(app_name, auto_events_all_apps) app_data["metrics"].extend(auto_events_for_app) element_click_base = copy.deepcopy(app_metrics["glean.element_click"]) for auto_event in auto_events_for_app: element_click_base["name"] = auto_event["name"] element_click_base["description"] = auto_event["description"] element_click_base["event_info"].update(auto_event["event_info"]) app_metrics[auto_event["name"]] = copy.deepcopy(element_click_base) # write metrics, resorting the app-specific parts in user preference order for metric_data in app_metrics.values(): metric_data["variants"].sort(key=lambda v: USER_CHANNEL_PRIORITY[v["channel"]]) open( os.path.join(app_metrics_dir, f"{_normalize_metrics(metric_data['name'])}.json"), "w", ).write(dump_json(metric_data)) # write tag metadata (if any) if app_tags_for_objects: tags = [{"name": k, "description": v} for (k, v) in app_tags_for_objects.items()] app_data["tags"] = tags for tag in tags: tag_metrics = [ metric for metric in app_data["metrics"] if tag["name"] in metric.get("tags", []) ] tag["metric_count"] = len(tag_metrics) else: app_data["tags"] = [] # sort the information in the app-level summary, then write it out # (we don't sort application id information, that's already handled # above) for key in ["tags", "metrics", "pings"]: if app_data.get(key): app_data[key].sort(key=lambda v: v["name"]) # for tags, put those with no metrics associated with them at the # end if key == "tags": app_data[key].sort(key=lambda v: v["metric_count"] > 0, reverse=True) open(os.path.join(app_dir, "index.json"), "w").write( dump_json( _incorporate_annotation( app_data, app_annotation.get("app", {}), app=True, full=True ) ) ) # write a search index for the app open(os.path.join(functions_dir, f"metrics_search_{app_name}.js"), "w").write( create_metrics_search_js(app_metrics.values(), app_name, legacy=False) ) # export FOG data to a separate file for the FOG + legacy search index if app_name == "firefox_desktop": open(os.path.join(functions_dir, "metrics_search_fog.js"), "w").write( create_metrics_search_js(app_metrics.values(), app_name="fog", legacy=False) ) # Write out a list of app groups (for the landing page) # put "featured" apps first, then sort by name open(os.path.join(output_dir, "apps.json"), "w").write( dump_json( sorted( sorted(app_summaries, key=lambda s: s["app_name"]), key=lambda s: s.get("featured", False), reverse=True, ) ) ) # also write some metadata for use by the netlify functions open(os.path.join(functions_dir, "supported_glam_metric_types.json"), "w").write( dump_json(list(SUPPORTED_GLAM_METRIC_TYPES)) )