treeherder/perf/auto_perf_sheriffing/backfill_reports.py (258 lines of code) (raw):
import logging
from datetime import datetime, timedelta
from itertools import groupby, zip_longest
import simplejson as json
from django.db.models import F, Q, QuerySet
from treeherder.perf.exceptions import MissingRecordsError
from treeherder.perf.models import (
BackfillRecord,
BackfillReport,
PerformanceAlert,
PerformanceAlertSummary,
PerformanceDatum,
)
from treeherder.utils import default_serializer
class AlertsPicker:
"""
Class encapsulating the algorithm used for selecting the most relevant alerts from a tuple of alerts.
For this algorithm, regressions are considered the most important, followed by improvements.
"""
def __init__(
self, max_alerts: int, max_improvements: int, platforms_of_interest: tuple[str, ...]
):
"""
:param max_alerts: the maximum number of selected alerts
:param max_improvements: max when handling only improvements
:param platforms_of_interest: platforms in decreasing order of importance
For selected platforms use the following names:
Windows 10: 'windows10'
Windows 7: 'windows7'
Linux: 'linux'
OS X: 'osx'
Android: 'android'
Note:
too specific names can trigger a mismatch with database data; the effect will be the alert skipping
too less specific will alter the correct order of alerts
"""
if max_alerts <= 0 or max_improvements <= 0:
raise ValueError("Use positive values.")
if len(platforms_of_interest) == 0:
raise ValueError("Provide at least one platform name.")
self.max_alerts = max_alerts
self.max_improvements = max_improvements
self.ordered_platforms_of_interest = platforms_of_interest
def extract_important_alerts(self, alerts: tuple[PerformanceAlert, ...]):
if any(not isinstance(alert, PerformanceAlert) for alert in alerts):
raise ValueError("Provided parameter does not contain only PerformanceAlert objects.")
relevant_alerts = self._extract_by_relevant_platforms(alerts)
alerts_with_distinct_jobs = self._ensure_distinct_jobs(relevant_alerts)
sorted_alerts = self._multi_criterion_sort(alerts_with_distinct_jobs)
return self._ensure_alerts_variety(sorted_alerts)
def _ensure_alerts_variety(self, sorted_alerts: list[PerformanceAlert]):
"""
The alerts container must be sorted before being passed to this function.
The returned list must contain regressions and (if present) improvements.
"""
regressions_only = all(alert.is_regression for alert in sorted_alerts)
improvements_only = all(not alert.is_regression for alert in sorted_alerts)
if regressions_only or improvements_only:
resulted_alerts_list = self._ensure_platform_variety(sorted_alerts)
else: # mixed alert types
regressions = [alert for alert in sorted_alerts if alert.is_regression]
improvements = [alert for alert in sorted_alerts if not alert.is_regression]
if len(regressions) > 1:
regressions = self._ensure_platform_variety(regressions)
regressions[-1] = improvements[0]
regressions.append(improvements[0])
resulted_alerts_list = regressions
return resulted_alerts_list[
: self.max_improvements if improvements_only else self.max_alerts
]
def _ensure_distinct_jobs(self, alerts: list[PerformanceAlert]) -> list[PerformanceAlert]:
def initial_culprit_job(alert):
return alert.initial_culprit_job
def parent_or_sibling_from(
alert_group: list[PerformanceAlert],
) -> PerformanceAlert | None:
if len(alert_group) == 0:
return None
for alert in alert_group:
if alert.series_signature.parent_signature is None: # just found parent signature
return alert
return alert_group[0]
alert_groups = []
for _, alert_group in groupby(alerts, initial_culprit_job):
alert_groups.append(list(alert_group))
alerts = [parent_or_sibling_from(group) for group in alert_groups]
return list(filter(None, alerts))
def _ensure_platform_variety(
self, sorted_all_alerts: list[PerformanceAlert]
) -> list[PerformanceAlert]:
"""
Note: Ensure that the sorted_all_alerts container has only
platforms of interest (example: 'windows10', 'windows7', 'linux', 'osx', 'android').
Please filter sorted_all_alerts list with filter_alerts(alerts) before calling this function.
:param sorted_all_alerts: alerts sorted by platform name
"""
platform_grouped_alerts = []
for platform in self.ordered_platforms_of_interest:
specific_platform_alerts = [
alert
for alert in sorted_all_alerts
if platform in alert.series_signature.platform.platform
]
if len(specific_platform_alerts):
platform_grouped_alerts.append(specific_platform_alerts)
platform_picked = []
for alert_group in zip_longest(*platform_grouped_alerts):
platform_picked.extend(alert_group)
platform_picked = [alert for alert in platform_picked if alert is not None]
return platform_picked
def _os_relevance(self, alert_platform: str):
"""
One of the sorting criteria.
:param alert_platform: the name of the current alert's platform
:return: int value of platform's relevance
"""
for platform_of_interest in self.ordered_platforms_of_interest:
if platform_of_interest in alert_platform:
return len(
self.ordered_platforms_of_interest
) - self.ordered_platforms_of_interest.index(platform_of_interest)
raise ValueError("Unknown platform.")
def _noise_profile_is_ok(self, noise_profile: str):
"""
One of the sorting criteria.
:param noise_profile: the noise profile of the current alert
:return: boolean value
"""
return True if noise_profile == PerformanceAlert.OK else False
def _has_relevant_platform(self, alert: PerformanceAlert):
"""
Filter criteria based on platform name.
"""
alert_platform_name = alert.series_signature.platform.platform
return any(
platform_of_interest in alert_platform_name
for platform_of_interest in self.ordered_platforms_of_interest
)
def _extract_by_relevant_platforms(self, alerts):
return list(filter(self._has_relevant_platform, alerts))
def _multi_criterion_sort(self, relevant_alerts):
sorted_alerts = sorted(
relevant_alerts,
# sort criteria
key=lambda alert: (
alert.is_regression,
self._noise_profile_is_ok(alert.noise_profile),
alert.amount_pct, # magnitude
self._os_relevance(alert.series_signature.platform.platform),
),
reverse=True,
)
return sorted_alerts
class IdentifyAlertRetriggerables:
def __init__(self, max_data_points: int, time_interval: timedelta, logger=None):
if max_data_points < 1:
raise ValueError("Cannot set range width less than 1")
if max_data_points % 2 == 0:
raise ValueError("Must provide odd range width")
if not isinstance(time_interval, timedelta):
raise TypeError("Must provide time interval as timedelta")
self._range_width = max_data_points
self._time_interval = time_interval
self.log = logger or logging.getLogger(self.__class__.__name__)
def __call__(self, alert: PerformanceAlert) -> list[dict]:
"""
Main method
"""
annotated_data_points = self._fetch_suspect_data_points(
alert
) # in time_interval around alert
flattened_data_points = self._one_data_point_per_push(annotated_data_points)
alert_index = self._find_push_id_index(alert.summary.push_id, flattened_data_points)
retrigger_window = self.__compute_window_slices(alert_index)
data_points_to_retrigger = flattened_data_points[retrigger_window]
self._glance_over_retrigger_range(data_points_to_retrigger)
return data_points_to_retrigger
def min_timestamp(self, alert_push_time: datetime) -> datetime:
return alert_push_time - self._time_interval
def max_timestamp(self, alert_push_time: datetime) -> datetime:
return alert_push_time + self._time_interval
def _fetch_suspect_data_points(self, alert: PerformanceAlert) -> QuerySet:
startday = self.min_timestamp(alert.summary.push.time)
endday = self.max_timestamp(alert.summary.push.time)
data = PerformanceDatum.objects.select_related("push").filter(
repository_id=alert.series_signature.repository_id, # leverage compound index
signature_id=alert.series_signature_id,
push_timestamp__gt=startday,
push_timestamp__lt=endday,
)
annotated_data_points = (
data
# JSONs are more self explanatory
# with perf_datum_id instead of id
.extra(select={"perf_datum_id": "performance_datum.id"})
.values(
"value", "job_id", "perf_datum_id", "push_id", "push_timestamp", "push__revision"
)
.order_by("push_timestamp")
)
return annotated_data_points
def _one_data_point_per_push(self, annotated_data_points: QuerySet) -> list[dict]:
seen_push_ids = set()
seen_add = seen_push_ids.add
return [
data_point
for data_point in annotated_data_points
if not (data_point["push_id"] in seen_push_ids or seen_add(data_point["push_id"]))
]
def _find_push_id_index(self, push_id: int, flattened_data_points: list[dict]) -> int:
for index, data_point in enumerate(flattened_data_points):
if data_point["push_id"] == push_id:
return index
raise LookupError(f"Could not find push id {push_id}")
def __compute_window_slices(self, center_index: int) -> slice:
side = self._range_width // 2
left_margin = max(center_index - side, 0) # cannot have negative start slice
right_margin = center_index + side + 1
return slice(left_margin, right_margin)
def _glance_over_retrigger_range(self, data_points_to_retrigger: list[dict]):
retrigger_range = len(data_points_to_retrigger)
if retrigger_range < self._range_width:
self.log.warning(
f"Found small backfill range (of size {retrigger_range} instead of {self._range_width})"
)
class BackfillReportMaintainer:
def __init__(
self,
alerts_picker: AlertsPicker,
backfill_context_fetcher: IdentifyAlertRetriggerables,
logger=None,
):
"""
Acquire/instantiate data used for finding alerts.
"""
self.alerts_picker = alerts_picker
self.fetch_backfill_context = backfill_context_fetcher
self.log = logger or logging.getLogger(self.__class__.__name__)
def provide_updated_reports(
self, since: datetime, frameworks: list[str], repositories: list[str]
) -> list[BackfillReport]:
alert_summaries = self.__fetch_summaries_to_retrigger(since, frameworks, repositories)
return self.compile_reports_for(alert_summaries)
def compile_reports_for(self, summaries_to_retrigger: QuerySet) -> list[BackfillReport]:
reports = []
for summary in summaries_to_retrigger:
important_alerts = self._pick_important_alerts(summary)
if len(important_alerts) == 0 and self._doesnt_have_report(summary):
continue # won't create blank reports
# but will update if case
try:
alert_context_map = self._associate_retrigger_context(important_alerts)
except MissingRecordsError as ex:
self.log.warning(f"Failed to compute report for alert summary {summary}. {ex}")
continue
backfill_report, created = BackfillReport.objects.get_or_create(summary_id=summary.id)
# only provide new records if the report is not frozen
if not backfill_report.frozen and (created or backfill_report.is_outdated):
backfill_report.expel_records() # associated records are outdated & irrelevant
self._provide_records(backfill_report, alert_context_map)
reports.append(backfill_report)
return reports
def _pick_important_alerts(
self, from_summary: PerformanceAlertSummary
) -> list[PerformanceAlert]:
return self.alerts_picker.extract_important_alerts(
from_summary.alerts.filter(status=PerformanceAlert.UNTRIAGED)
)
def _provide_records(self, backfill_report: BackfillReport, alert_context_map: list[tuple]):
for alert, retrigger_context in alert_context_map:
BackfillRecord.objects.create(
alert=alert,
report=backfill_report,
context=json.dumps(retrigger_context, default=default_serializer),
)
def __fetch_summaries_to_retrigger(
self, since: datetime, frameworks: list[str], repositories: list[str]
) -> QuerySet:
no_reports_yet = Q(last_updated__gte=since, backfill_report__isnull=True)
with_outdated_reports = Q(last_updated__gt=F("backfill_report__last_updated"))
filters = no_reports_yet | with_outdated_reports
if frameworks:
filters = filters & Q(framework__name__in=frameworks)
if repositories:
filters = filters & Q(repository__name__in=repositories)
return (
PerformanceAlertSummary.objects.prefetch_related("backfill_report")
.select_related("framework", "repository")
.filter(filters)
)
def _associate_retrigger_context(self, important_alerts: list[PerformanceAlert]) -> list[tuple]:
retrigger_map = []
incomplete_mapping = False
for alert in important_alerts:
try:
data_points = self.fetch_backfill_context(alert)
except LookupError as ex:
incomplete_mapping = True
self.log.debug(
f"Couldn't identify retrigger context for alert {alert}. (Exception: {ex})"
)
continue
retrigger_map.append((alert, data_points))
if incomplete_mapping:
expected = len(important_alerts)
missing = expected - len(retrigger_map)
raise MissingRecordsError(f"{missing} out of {expected} records are missing!")
return retrigger_map
def _doesnt_have_report(self, summary):
return not hasattr(summary, "backfill_report")