probe_scraper/transform_probes.py (286 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import copy
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Callable, Dict, Optional
from .scrapers.git_scraper import Commit
DATES_KEY = "dates"
COMMITS_KEY = "git-commits"
HISTORY_KEY = "history"
NAME_KEY = "name"
TYPE_KEY = "type"
REFLOG_KEY = "reflog-index"
IN_SOURCE_KEY = "in-source"
SOURCE_URL_KEY = "source_url"
def is_test_probe(probe_type, name):
if probe_type == "histogram":
# These are test-only probes and never sent out.
return name.startswith("TELEMETRY_TEST_")
elif probe_type in ["scalar", "event"]:
return name.startswith("telemetry.test.")
return False
def get_from_nested_dict(dictionary, path, default=None):
keys = path.split("/")
for k in keys[:-1]:
dictionary = dictionary[k]
return dictionary.get(keys[-1], default)
def get_probe_id(probe_type, name):
return probe_type + "/" + name
def probes_equal(probe1, probe2):
props = [
# Common.
"cpp_guard",
"optout",
"notification_emails",
# Histograms & scalars.
"details/keyed",
"details/kind",
# Histograms.
"details/n_buckets",
"details/n_values",
"details/low",
"details/high",
"details/record_in_processes",
"details/labels",
# Events.
"details/methods",
"details/objects",
"details/extra_keys",
]
for prop in props:
if get_from_nested_dict(probe1, prop) != get_from_nested_dict(probe2, prop):
return False
return True
def extract_node_data(
node_id, channel, probe_type, probe_data, result_data, version, break_by_channel
):
"""Extract the probe data and group it by channel.
:param node_id: the revision the probe data comes from, with th
:param channel: the channel the probe was found in.
:param probe_type: the probe type (e.g. 'histogram').
:param probe_data: the probe data, with the following form:
{
node_id: {
histogram: {
name: ...,
...
},
scalar: {
...
},
},
...
}
:param result_data: the dictionary to which the processed probe data is appended
to. Extract probe data will be added to result_data in the form:
{
channel: {
probe_id: {
type: 'histogram',
name: 'some-name',
history: {
channel: [
{
optout: True,
...
revisions: {first: ..., last: ...},
versions: {first: ..., last: ...}
},
...
]
}
}
}
}
:param version: a human readable version string.
:param break_by_channel: True if probe data for different channels needs to be
stored separately, False otherwise. If True, probe data will be saved
to result_data[channel] instead of just result_data.
"""
for name, probe in probe_data.items():
# Telemetrys test probes are never submitted to the servers.
if is_test_probe(probe_type, name):
continue
storage = result_data
if break_by_channel:
if channel not in result_data:
result_data[channel] = {}
storage = result_data[channel]
probe_id = get_probe_id(probe_type, name)
if probe_id in storage and channel in storage[probe_id][HISTORY_KEY]:
# If the probes state didn't change from the previous revision,
# we just override with the latest state and continue.
previous = storage[probe_id][HISTORY_KEY][channel][-1]
if probes_equal(previous, probe):
previous["revisions"]["first"] = node_id
previous["versions"]["first"] = version
continue
if probe_id not in storage:
storage[probe_id] = {
TYPE_KEY: probe_type,
NAME_KEY: name,
HISTORY_KEY: {channel: []},
}
if channel not in storage[probe_id][HISTORY_KEY]:
storage[probe_id][HISTORY_KEY][channel] = []
probe = copy.deepcopy(probe)
probe["revisions"] = {
"first": node_id,
"last": node_id,
}
probe["versions"] = {
"first": version,
"last": version,
}
storage[probe_id][HISTORY_KEY][channel].append(probe)
def sorted_node_lists_by_channel(node_data):
channels = defaultdict(list)
for channel, nodes in node_data.items():
for node_id, data in nodes.items():
channels[channel].append(
{
"node_id": node_id,
"version": data["version"],
}
)
for channel, data in channels.items():
channels[channel] = sorted(data, key=lambda n: int(n["version"]), reverse=True)
return channels
def sorted_node_lists_by_date(node_data, revision_dates):
def get_date(revision):
return revision_dates[channel][revision]["date"]
channels = defaultdict(list)
for channel, nodes in node_data.items():
for node_id, data in nodes.items():
channels[channel].append(
{
"node_id": node_id,
"version": data["version"],
}
)
for channel, data in channels.items():
channels[channel] = sorted(
data, key=lambda x: get_date(x["node_id"]), reverse=True
)
return channels
def transform(probe_data, node_data, break_by_channel, revision_dates=None):
"""Transform the probe data into the final format.
:param probe_data: the preprocessed probe data.
:param node_data: the raw probe data.
:param break_by_channel: True if we want the probe output grouped by
release channel.
:param revision_dates: (optional) A dictionary of channel-revisions
and their publish date, used to sort the revisions
"""
if revision_dates is None:
channels = sorted_node_lists_by_channel(node_data)
else:
channels = sorted_node_lists_by_date(node_data, revision_dates)
result_data = {}
for channel, channel_data in channels.items():
print("\n" + channel + " - transforming probe data:")
for entry in channel_data:
node_id = entry["node_id"]
readable_version = str(entry["version"])
print(" from: " + str({"node": node_id, "version": readable_version}))
for probe_type, probes in probe_data[channel][node_id].items():
# Group the probes by the release channel, if requested
extract_node_data(
node_id,
channel,
probe_type,
probes,
result_data,
readable_version,
break_by_channel,
)
return result_data
def get_minimum_date(probe_data, revision_data, revision_dates):
probe_histories = transform(
probe_data, revision_data, break_by_channel=True, revision_dates=revision_dates
)
min_dates = defaultdict(lambda: defaultdict(str))
for channel, probes in probe_histories.items():
for probe_id, entry in probes.items():
dates = []
for history in entry["history"][channel]:
revision = history["revisions"]["first"]
dates.append(revision_dates[channel][revision]["date"])
min_dates[probe_id][channel] = min(dates)
return min_dates
def make_item_defn(definition, commit: Commit, new_source_url: Optional[str] = None):
if COMMITS_KEY not in definition:
# This is the first time we've seen this definition
definition[COMMITS_KEY] = {"first": commit.hash, "last": commit.hash}
definition[DATES_KEY] = {
"first": commit.pretty_timestamp,
"last": commit.pretty_timestamp,
}
definition[REFLOG_KEY] = {
"first": commit.reflog_index,
"last": commit.reflog_index,
}
else:
# we've seen this definition, update the `last` commit and source url
last_dt = datetime.fromisoformat(definition[DATES_KEY]["last"])
last_timestamp = last_dt.replace(tzinfo=timezone.utc).timestamp()
last_reflog = definition[REFLOG_KEY]["last"]
# use negative last_reflog to match commit.sort_key()
if commit.is_head or (last_timestamp, -last_reflog) < commit.sort_key():
definition[COMMITS_KEY]["last"] = commit.hash
definition[DATES_KEY]["last"] = commit.pretty_timestamp
definition[REFLOG_KEY]["last"] = commit.reflog_index
# only update source url when the last commit changed
if new_source_url:
definition[SOURCE_URL_KEY] = new_source_url
return definition
def tags_equal(def1, def2):
return def1["description"] == def2["description"]
def metrics_equal(def1, def2):
return all(
(
def1.get(label) == def2.get(label)
for label in {
"bugs",
"data_reviews",
"data_sensitivity",
"description",
"disabled",
"expires",
"labeled",
"labels",
"lifetime",
"metadata",
"notification_emails",
"send_in_pings",
"time_unit",
"type",
"version",
"extra_keys",
}
)
)
def ping_equal(def1, def2):
# Test all keys except the ones the probe-scraper adds
ignored_keys = {DATES_KEY, COMMITS_KEY, HISTORY_KEY, REFLOG_KEY, SOURCE_URL_KEY}
all_keys = set(def1.keys()).union(def2.keys()).difference(ignored_keys)
return all((def1.get(label) == def2.get(label) for label in all_keys))
def tag_constructor(defn, tag):
return {NAME_KEY: tag, HISTORY_KEY: [defn], IN_SOURCE_KEY: False}
def metric_constructor(defn, metric):
return {
TYPE_KEY: defn[TYPE_KEY],
NAME_KEY: metric,
HISTORY_KEY: [defn],
IN_SOURCE_KEY: False,
}
def ping_constructor(defn, ping):
return {NAME_KEY: ping, HISTORY_KEY: [defn], IN_SOURCE_KEY: False}
def update_or_add_item(
repo_items: Dict[str, dict],
commit: Commit,
item: str,
definition: dict,
equal_fn: Callable[[Any, Any], bool],
type_ctor: Callable[[dict, str], dict],
):
# If we've seen this item before, check previous definitions
if item in repo_items:
prev_defns = repo_items[item][HISTORY_KEY]
for i, prev_defn in sorted(
enumerate(prev_defns),
key=lambda e: datetime.fromisoformat(e[1][DATES_KEY]["last"]),
):
# If equal to a previous commit, update date and commit on existing definition
if equal_fn(definition, prev_defn):
new_defn = make_item_defn(
prev_defn, commit, definition.get(SOURCE_URL_KEY)
)
repo_items[item][HISTORY_KEY][i] = new_defn
break
# Otherwise, prepend changed definition for existing item
else:
new_defn = make_item_defn(definition, commit)
repo_items[item][HISTORY_KEY] = prev_defns + [new_defn]
# In rare cases the type can change.
# We always pick the latest one.
if TYPE_KEY in definition:
repo_items[item][TYPE_KEY] = definition[TYPE_KEY]
# We haven't seen this item before, add it
else:
defn = make_item_defn(definition, commit)
repo_items[item] = type_ctor(defn, item)
return repo_items
def transform_by_hash(
data: Dict[str, Dict[Commit, Dict[str, dict]]],
equal_fn: Callable[[Any, Any], bool],
type_ctor: Callable[[dict, str], dict],
update_result: Optional[dict] = None,
):
"""
:param data - of the form
<repo_name>: {
<Commit>: {
<item-name>: {
...
},
},
...
}
Outputs deduplicated data of the form
<repo_name>: {
<name>: {
"type": <type>,
"name": <name>,
"history": [
{
"bugs": [<bug#>, ...],
...other info (from metrics.yaml or pings.yaml)...,
"git-commits": {
"first": <hash>,
"last": <hash>
},
"dates": {
"first": <datetime>,
"last": <datetime>
}
},
]
}
}
"""
result = {} if update_result is None else update_result
for repo_name, commits in data.items():
repo_items = result.get(repo_name, {})
# iterate through commits, sorted by Commit.sort_key()
sorted_commits = sorted(
iter(commits.items()),
key=lambda x_y: x_y[0].sort_key(),
)
for commit, items in sorted_commits:
for item, definition in items.items():
repo_items = update_or_add_item(
repo_items,
commit,
item,
definition,
equal_fn,
type_ctor,
)
if commit.is_head:
# if this commit is the first one, we use it to mark whether items are
# "in-source" (aka in the source code and not removed)
for item in repo_items:
repo_items[item][IN_SOURCE_KEY] = item in items
result[repo_name] = repo_items
return result
def transform_tags_by_hash(
tag_data: Dict[str, Dict[Commit, Dict[str, dict]]],
update_result: Optional[dict] = None,
):
return transform_by_hash(tag_data, tags_equal, tag_constructor, update_result)
def transform_metrics_by_hash(
metric_data: Dict[str, Dict[Commit, Dict[str, dict]]],
update_result: Optional[dict] = None,
):
return transform_by_hash(
metric_data, metrics_equal, metric_constructor, update_result
)
def transform_pings_by_hash(
ping_data: Dict[str, Dict[Commit, Dict[str, dict]]],
update_result: Optional[dict] = None,
):
return transform_by_hash(ping_data, ping_equal, ping_constructor, update_result)