etl/glean.py (208 lines of code) (raw):
from __future__ import annotations
import logging
from datetime import datetime
from typing import List
import requests
logger = logging.getLogger(__name__)
GLEAN_DISTRIBUTION_TYPES = {
"timing_distribution",
"memory_distribution",
"custom_distribution",
}
def _merge_latest_ping(pings, ping_name, ping_data):
"""
Merge data for the latest `ping_name` into `pings` (by history date).
If `pings` already contains the ping with newer data it is not overwritten.
"""
if ping_name not in pings:
pings[ping_name] = ping_data
return
latest = pings[ping_name]["history"][-1]["dates"]["last"]
new = ping_data["history"][-1]["dates"]["last"]
if new >= latest:
pings[ping_name] = ping_data
class _Cache:
"""
Simple cache manager so we can avoid refetching the same dependency data
over and over
"""
def __init__(self):
self.cached_responses = {}
def get(self, url: str):
if self.cached_responses.get(url):
return self.cached_responses[url]
# pass a parameter to bypass any caching, which might give us stale
# data (probeinfo.telemetry.mozilla.org is currently using cloudfront)
resp = requests.get(url + f"?t={datetime.utcnow().isoformat()}")
self.cached_responses[url] = resp
return resp
def get_json(self, url: str):
return self.get(url).json()
_cache = _Cache()
class GleanObject(object):
NAME_KEY = "name"
ORIGIN_KEY = "origin"
HISTORY_KEY = "history"
IN_SOURCE_KEY = "in-source"
SAMPLING_INFO_KEY = "sampling_info"
class GleanMetric(GleanObject):
"""
Represents an individual Glean metric, as defined by probe scraper
"""
ALL_PINGS_KEYWORDS = ("all-pings", "all_pings", "glean_client_info", "glean_internal_info")
def __init__(self, identifier: str, definition: dict, *, ping_names: List[str] = None):
self.identifier = identifier
self._set_dates(definition)
self._set_definition(definition)
self.description = self.definition.get("description")
self.tags = self.definition["metadata"].get("tags", [])
self.bq_prefix = None
if "glean_client_info" in self.definition["send_in_pings"]:
self.bq_prefix = "client_info"
elif "glean_internal_info" in self.definition["send_in_pings"]:
self.bq_prefix = "ping_info"
if ping_names is not None:
self._update_all_pings(ping_names)
def _update_all_pings(self, pings: List[str]):
if any([kw in self.definition["send_in_pings"] for kw in self.ALL_PINGS_KEYWORDS]):
self.definition["send_in_pings"] = set(pings)
def _set_definition(self, full_defn: dict):
# sort from latest to earliest
self.definition_history = list(
sorted(
full_defn[self.HISTORY_KEY],
key=lambda x: datetime.fromisoformat(x["dates"]["last"]),
reverse=True,
)
)
# The canonical definition for up-to-date schemas
self.definition = self.definition_history[0]
self.definition["name"] = full_defn[self.NAME_KEY]
self.definition["origin"] = full_defn[self.ORIGIN_KEY]
self.definition["in_source"] = full_defn[self.IN_SOURCE_KEY]
self.definition["sampling_info"] = full_defn.get(self.SAMPLING_INFO_KEY)
# first seen is the earliest date in the history
self.definition["date_first_seen"] = self.definition_history[-1]["dates"]["first"]
def _set_dates(self, definition: dict):
vals = [datetime.fromisoformat(d["dates"]["first"]) for d in definition[self.HISTORY_KEY]]
self.first_added = min(vals)
self.last_change = max(vals)
def get_first_added(self) -> datetime:
return self.first_added
def get_last_change(self) -> datetime:
return self.last_change
class GleanPing(GleanObject):
"""
Represents an individual Glean Ping, as defined by probe scraper
"""
def __init__(self, identifier: str, definition: dict):
self.identifier = identifier
self._set_definition(definition)
self.description = self.definition.get("description")
self.tags = self.definition["metadata"].get("tags", [])
def _set_definition(self, full_defn: dict):
self.definition_history = list(
sorted(
full_defn[self.HISTORY_KEY],
key=lambda x: datetime.fromisoformat(x["dates"]["last"]),
reverse=True,
)
)
# The canonical definition for up-to-date schemas
self.definition = self.definition_history[0]
self.definition["name"] = full_defn[self.NAME_KEY]
self.definition["origin"] = full_defn[self.ORIGIN_KEY]
self.definition["date_first_seen"] = self.definition_history[-1]["dates"]["first"]
self.definition["in_source"] = full_defn[self.IN_SOURCE_KEY]
class GleanTag(GleanObject):
"""
Represents an individual Glean Tag, as defined by probe scraper
"""
def __init__(self, identifier: str, definition: dict):
self.identifier = identifier
self._set_definition(definition)
self.description = self.definition_history[0].get("description")
def _set_definition(self, full_defn: dict):
self.definition_history = list(
sorted(
full_defn[self.HISTORY_KEY],
key=lambda x: datetime.fromisoformat(x["dates"]["last"]),
reverse=True,
)
)
# The canonical definition for up-to-date schemas
self.definition = self.definition_history[0]
self.definition["name"] = full_defn[self.NAME_KEY]
self.definition["date_first_seen"] = self.definition_history[-1]["dates"]["first"]
class GleanApp(object):
"""
Represents a Glean application, provides convenience methods for getting metrics and pings
"""
PROBE_INFO_BASE_URL = "https://probeinfo.telemetry.mozilla.org"
APPS_URL = PROBE_INFO_BASE_URL + "/v2/glean/app-listings"
LIBRARIES_URL = PROBE_INFO_BASE_URL + "/v2/glean/library-variants"
PINGS_URL_TEMPLATE = PROBE_INFO_BASE_URL + "/glean/{}/pings"
METRICS_URL_TEMPLATE = PROBE_INFO_BASE_URL + "/glean/{}/metrics"
PING_URL_TEMPLATE = PROBE_INFO_BASE_URL + "/glean/{}/pings"
TAGS_URL_TEMPLATE = PROBE_INFO_BASE_URL + "/glean/{}/tags"
DEPENDENCIES_URL_TEMPLATE = PROBE_INFO_BASE_URL + "/glean/{}/dependencies"
DEFAULT_DEPENDENCIES = ["glean"]
def __init__(self, app, **kwargs):
self.app = app
self.app_name = app["app_name"]
self.app_id = app["app_id"]
@staticmethod
def get_apps() -> List[GleanApp]:
"""
Get all non-library Glean repositories
"""
apps = _cache.get_json(GleanApp.APPS_URL)
return [GleanApp(app) for app in apps]
@staticmethod
def get_libraries() -> List[dict]:
return _cache.get_json(GleanApp.LIBRARIES_URL)
def get_dependencies(self):
# Get all of the library dependencies for the application that
# are also known about in the repositories file.
# The dependencies are specified using dependency names, but we need to
# map those back to the name of the repository in the repository file.
try:
dependencies = _cache.get_json(
self.DEPENDENCIES_URL_TEMPLATE.format(self.app["v1_name"])
)
except requests.HTTPError:
logging.info(f"For {self.app_id}, using default Glean dependencies")
return self.DEFAULT_DEPENDENCIES
dependency_library_names = list(dependencies.keys())
libraries_by_dependency_name = {}
for library in self.get_libraries():
libraries_by_dependency_name[library["dependency_name"]] = library
dependencies = []
for name in dependency_library_names:
if name in libraries_by_dependency_name:
dependencies.append(libraries_by_dependency_name[name])
if len(dependencies) == 0:
logging.info(f"For {self.app_id}, using default Glean dependencies")
return self.DEFAULT_DEPENDENCIES
logging.info(f"For {self.app_id}, found Glean dependencies: {dependencies}")
return dependencies
def get_metrics(self) -> List[GleanMetric]:
data = _cache.get_json(GleanApp.METRICS_URL_TEMPLATE.format(self.app["v1_name"]))
metrics = [
(key, {**metricdict, "origin": self.app["app_name"]})
for key, metricdict in data.items()
]
for dependency in self.get_dependencies():
if "v1_name" in dependency:
dependency_metrics = _cache.get_json(
GleanApp.METRICS_URL_TEMPLATE.format(dependency["v1_name"])
)
# augment these dependency names with the library_name where they came from
metrics += [
(d[0], {**d[1], "origin": dependency["library_name"]})
for d in dependency_metrics.items()
]
ping_names = set(self._get_ping_data().keys())
processed = []
# deduplicate metrics
metric_map = {}
for metric in metrics:
if (
not metric_map.get(metric[0])
or metric_map[metric[0]][1]["history"][-1]["dates"]["last"]
< metric[1]["history"][-1]["dates"]["last"]
):
metric_map[metric[0]] = metric
for _id, defn in metric_map.values():
metric = GleanMetric(_id, defn, ping_names=ping_names)
processed.append(metric)
return processed
def _get_ping_data(self) -> dict:
ping_data = dict()
for p in _cache.get_json(GleanApp.PING_URL_TEMPLATE.format(self.app["v1_name"])).items():
_merge_latest_ping(ping_data, p[0], {**p[1], "origin": self.app["app_name"]})
for dependency in self.get_dependencies():
if "v1_name" in dependency:
for p in _cache.get_json(
GleanApp.PING_URL_TEMPLATE.format(dependency["v1_name"])
).items():
_merge_latest_ping(
ping_data, p[0], {**p[1], "origin": dependency["library_name"]}
)
return ping_data
def get_pings(self) -> List[GleanPing]:
return [
GleanPing(ping_name, ping_data)
for ping_name, ping_data in self._get_ping_data().items()
]
def get_tags(self) -> List[GleanTag]:
return [
GleanTag(tag_name, tag_data)
for tag_name, tag_data in _cache.get_json(
GleanApp.TAGS_URL_TEMPLATE.format(self.app["v1_name"])
).items()
]