mozci/console/commands/push.py (1,201 lines of code) (raw):
# -*- coding: utf-8 -*-
import collections
import csv
import datetime
import fnmatch
import itertools
import json
import os
import re
import traceback
from inspect import signature
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlencode
import arrow
import taskcluster
from cleo.commands.command import Command
from cleo.exceptions import CleoNoSuchOptionError
from cleo.helpers import argument, option
from loguru import logger
from tabulate import tabulate
from taskcluster.exceptions import TaskclusterRestFailure
from mozci import config
from mozci.errors import PushNotFound, SourcesNotFound, TaskNotFound
from mozci.push import (
MAX_DEPTH,
Push,
PushStatus,
Regressions,
ToRetriggerOrBackfill,
make_push_objects,
)
from mozci.task import Task, TestTask, is_autoclassifiable
from mozci.util.defs import INTERMITTENT_CLASSES
from mozci.util.hgmo import HgRev
from mozci.util.taskcluster import (
COMMUNITY_TASKCLUSTER_ROOT_URL,
get_taskcluster_options,
notify_email,
notify_matrix,
)
EMAIL_CLASSIFY_EVAL = """
# classify-eval report generated on the {today}
The report contains statistics about pushes that were classified by Mozci.
## Statistics for the {total} pushes that were evaluated
{error_line}
{stats}
"""
EMAIL_PUSH_EVOLUTION = """
# Push {push.id} evolved from {previous} to {current}
Rev: [{push.rev}](https://treeherder.mozilla.org/jobs?repo={branch}&revision={push.rev})\n
Author: {push.author}\n
Time: {date}
## Real failures
- {real_failures}
"""
TWO_INTS_TUPLE_REGEXP = r"^\((\d+), ?(\d+)\)$"
class PushTasksCommand(Command):
name = "push tasks"
description = "List the tasks that ran on a push"
arguments = [
argument("rev", description="Head revision of the push."),
argument(
"branch", description="Branch the push belongs to (e.g autoland, try, etc)."
),
]
def handle(self):
push = Push(self.argument("rev"), self.argument("branch"))
table = []
for task in sorted(push.tasks, key=lambda t: t.label):
table.append([task.label, task.result or "running"])
self.line(tabulate(table, headers=["Label", "Result"]))
def classify_commands_pushes(
branch: str, from_date: str, to_date: str, rev: str
) -> List[Push]:
if not (bool(rev) ^ bool(from_date or to_date)):
raise Exception(
"You must either provide a single push revision with --rev or define at least --from-date option to classify a range of pushes (note: --to-date will default to current time if not given)."
)
if rev:
pushes = [Push(rev, branch)]
else:
if not from_date:
raise Exception(
"You must provide at least --from-date to classify a range of pushes (note: --to-date will default to current time if not given)."
)
now = datetime.datetime.now()
if not to_date:
to_date = datetime.datetime.strftime(now, "%Y-%m-%d")
arrow_now = arrow.get(now)
try:
datetime.datetime.strptime(from_date, "%Y-%m-%d")
except ValueError:
try:
from_date = arrow_now.dehumanize(from_date).format("YYYY-MM-DD")
except ValueError:
raise Exception(
'Provided --from-date should be a date in yyyy-mm-dd format or a human expression like "1 days ago".'
)
try:
datetime.datetime.strptime(to_date, "%Y-%m-%d")
except ValueError:
try:
to_date = arrow_now.dehumanize(to_date).format("YYYY-MM-DD")
except ValueError:
raise Exception(
'Provided --to-date should be a date in yyyy-mm-dd format or a human expression like "1 days ago".'
)
pushes = make_push_objects(from_date=from_date, to_date=to_date, branch=branch)
return pushes
def check_type(parameter_type, hint, value):
try:
if parameter_type == bool:
parameter = value not in [False, 0, "0", "False", "false", "f"]
elif parameter_type == Optional[Tuple[int, int]]:
match = re.match(TWO_INTS_TUPLE_REGEXP, value)
if not match or len(match.groups()) != 2:
raise ValueError
parameter = tuple([int(number) for number in match.groups()])
else:
parameter = parameter_type(value)
except ValueError:
raise Exception(
f"Provided {hint} should be a {parameter_type.__name__ if hasattr(parameter_type, '__name__') else parameter_type}."
)
return parameter
def retrieve_classify_parameters(options):
default_parameters = []
for name, parameter in signature(Push.classify).parameters.items():
if name != "self":
default_parameters.append(parameter)
classify_parameters = {}
for parameter in default_parameters:
parameter_name = parameter.name
parameter_type = parameter.annotation
option_name = parameter_name.replace("_", "-")
try:
option = options(option_name)
except CleoNoSuchOptionError:
option = None
if config.get(parameter_name) is not None and option is not None:
raise Exception(
f"You should provide either --{option_name} CLI option OR '{parameter_name}' in the config secret not both."
)
if config.get(parameter_name) is not None:
classify_parameters[parameter_name] = check_type(
parameter_type,
f"'{parameter_name}' in the config secret",
config[parameter_name],
)
elif option is not None:
classify_parameters[parameter_name] = check_type(
parameter_type, f"--{option_name}", option
)
return classify_parameters
class ClassifyCommand(Command):
name = "push classify"
description = "Display the classification for a given push (or a range of pushes) as GOOD, BAD or UNKNOWN"
arguments = [
argument(
"branch",
description="Branch the push belongs to (e.g autoland, try, etc).",
optional=True,
default="autoland",
)
]
options = [
option("rev", description="Head revision of the push.", flag=False),
option(
"from-date",
description='Lower bound of the push range (as a date in yyyy-mm-dd format or a human expression like "1 days ago").',
flag=False,
),
option(
"to-date",
description='Upper bound of the push range (as a date in yyyy-mm-dd format or a human expression like "1 days ago"), defaults to now.',
flag=False,
),
option(
"intermittent-confidence-threshold",
description="Medium confidence threshold used to classify the regressions.",
flag=False,
),
option(
"real-confidence-threshold",
description="High confidence threshold used to classify the regressions.",
flag=False,
),
option(
"use-possible-regressions",
description="Use possible regressions while classifying the regressions.",
default=None,
flag=False,
),
option(
"unknown-from-regressions",
description="Unknown from regressions while classifying the regressions.",
default=None,
flag=False,
),
option(
"consider-children-pushes-configs",
description="Consider children pushes configs while classifying the regressions.",
default=None,
flag=False,
),
option(
"cross-config-counts",
description="Cross-config counts used to classify the regressions.",
flag=False,
),
option(
"consistent-failures-counts",
description="Consistent failures counts used to classify the regressions.",
flag=False,
),
option(
"output",
description="Path towards a directory to save a JSON file containing classification and regressions details in.",
flag=False,
),
option(
"show-intermittents",
description="If set, print tasks that should be marked as intermittent.",
flag=True,
),
option(
"environment",
description="Environment in which the analysis is running (testing, production, ...).",
flag=False,
default="testing",
),
option(
"retrigger-limit",
description="Number of failing groups (missing information) to be retriggered.",
flag=False,
default=3,
),
option(
"backfill-limit",
description="Number of failing groups (missing information) to be backfilled.",
flag=False,
default=3,
),
]
def handle(self) -> None:
self.branch = self.argument("branch")
pushes = classify_commands_pushes(
self.branch,
self.option("from-date"),
self.option("to-date"),
self.option("rev"),
)
classify_parameters = retrieve_classify_parameters(self.option)
output = self.option("output")
if output and not os.path.isdir(output):
os.makedirs(output)
self.line(
"<comment>Provided --output pointed to a inexistent directory that is now created.</comment>"
)
retriggerable_backfillable_patterns = config.get(
"retriggerable-backfillable-task-names", []
)
try:
retrigger_limit = int(self.option("retrigger-limit"))
except ValueError:
raise Exception("Provided --retrigger-limit should be an int.")
try:
backfill_limit = int(self.option("backfill-limit"))
except ValueError:
raise Exception("Provided --backfill-limit should be an int.")
for push in pushes:
try:
classification, regressions, to_retrigger_or_backfill = push.classify(
**classify_parameters
)
self.backfill_and_retrigger_failures(
push,
retriggerable_backfillable_patterns,
classify_parameters,
retrigger_limit,
backfill_limit,
to_retrigger_or_backfill,
)
self.line(
f"Push associated with the head revision {push.rev} on "
f"the branch {self.branch} is classified as {classification.name}"
)
except Exception as e:
self.line(
f"<error>Couldn't classify push {push.push_uuid}: {e}.</error>"
)
# Print the error stacktrace in red
self.line(f"<error>{traceback.format_exc()}</error>")
continue
if self.option("show-intermittents"):
self.line("-" * 50)
self.line(
"Printing tasks that should be marked as intermittent failures:"
)
for task in regressions.intermittent:
self.line(task)
self.line("-" * 50)
if output:
def _serialize_regressions(regressions):
return {
group: [
{
"task_id": task.id,
"label": task.label,
"autoclassify": is_autoclassifiable(task),
"tests": [
test_name
for group_failures in task.failure_types.values()
for test_name, _ in group_failures
],
}
for task in failing_tasks
]
for group, failing_tasks in regressions.items()
}
to_save = {
"push": {
"id": push.push_uuid,
"classification": classification.name,
},
"failures": {
"real": _serialize_regressions(regressions.real),
"intermittent": _serialize_regressions(
regressions.intermittent
),
"unknown": _serialize_regressions(regressions.unknown),
},
}
filename = f"{output}/classify_output_{self.branch}_{push.rev}.json"
with open(filename, "w") as file:
json.dump(to_save, file, indent=2)
self.line(
f"Classification and regressions details for push {push.push_uuid} were saved in {filename} JSON file"
)
# Send a notification when some emails are declared in the config
emails = config.get("emails", {}).get("classifications")
matrix_room = config.get("matrix-room-id")
if emails or matrix_room:
# Load previous classification from taskcluster
try:
previous = push.get_existing_classification(
self.option("environment")
)
except SourcesNotFound:
# We still want to send a notification if the current one is bad
previous = None
self.send_notifications(
emails, matrix_room, push, previous, classification, regressions
)
def retrigger_failures(
self, push, groups, count, allowed_patterns, retrigger_limit
):
groups_with_failures = {}
for name, failing_tasks in groups.items():
filtered_failing_tasks = [
task
for task in failing_tasks
if any(
fnmatch.fnmatch(task.label, pattern) for pattern in allowed_patterns
)
]
if filtered_failing_tasks:
assert all(
any(
not result.ok and result.group == name
for result in task.results
)
for task in filtered_failing_tasks
), f"Some failing tasks on the group {name} (to be retriggered) didn't really fail"
groups_with_failures[name] = filtered_failing_tasks
if not groups_with_failures:
return
for failing_tasks in itertools.islice(
groups_with_failures.values(), 0, retrigger_limit
):
# If there is more than one task failing in this group, we should retrigger only one of them
failing_tasks[0].confirm(push)
def backfill_and_retrigger_failures(
self,
push: Push,
allowed_patterns: List[str],
classify_parameters: Dict[str, Any],
retrigger_limit: int,
backfill_limit: int,
to_retrigger_or_backfill: ToRetriggerOrBackfill,
) -> None:
# Retrigger real failures
# TODO: Potential real failures might be coming from children pushes too, but we should instead
# only retrigger tasks on the push where they were defined (https://github.com/mozilla/mozci/issues/796).
self.line("Retriggering failures to ensure they are real")
self.retrigger_failures(
push,
to_retrigger_or_backfill.real_retrigger,
classify_parameters.get("consistent_failures_counts", (2, 3))[1],
allowed_patterns,
retrigger_limit,
)
# Retrigger intermittent failures
self.line("Retriggering failures to ensure they are intermittent")
self.retrigger_failures(
push,
to_retrigger_or_backfill.intermittent_retrigger,
classify_parameters.get("consistent_failures_counts", (2, 3))[0],
allowed_patterns,
retrigger_limit,
)
# Backfill some failures
self.line("Backfilling failures to ensure they are caused by this push")
groups_to_backfill = {
name: failing_tasks
for name, failing_tasks in to_retrigger_or_backfill.backfill.items()
if failing_tasks
}
for failing_tasks in itertools.islice(
groups_to_backfill.values(), 0, backfill_limit
):
for t in failing_tasks:
if t.label and any(
fnmatch.fnmatch(t.label, pattern) for pattern in allowed_patterns
):
t.backfill(push)
def send_notifications(
self,
emails: Optional[List[str]],
matrix_room: Optional[str],
push: Push,
previous: Optional[PushStatus],
current: PushStatus,
regressions: Regressions,
) -> None:
"""
Send an email and/or a matrix notification when:
- there is no previous classification and the new classification is BAD;
- the previous classification was GOOD or UNKNOWN and the new classification is BAD;
- or the previous classification was BAD and the new classification is GOOD or UNKNOWN.
"""
def _get_task_url(task: TestTask):
"""Helper to build a treeherder link for a task"""
assert task.id is not None
params = {
"repo": self.branch,
"revision": push.rev,
"selectedTaskRun": f"{task.id}-0",
}
return f"https://treeherder.mozilla.org/#/jobs?{urlencode(params)}"
def _get_group_url(group: str):
"""Helper to build a treeherder link for a group"""
params = {"repo": self.branch, "tochange": push.rev, "test_paths": group}
return f"https://treeherder.mozilla.org/#/jobs?{urlencode(params)}"
def _list_tasks(tasks):
"""Helper to build a list of all tasks in a group, with their treeherder url"""
if not tasks:
return "No tasks available"
return "Tasks:\n - " + "\n - ".join(
[f"[{task.label}]({_get_task_url(task)})" for task in tasks]
)
if (
previous in (None, PushStatus.GOOD, PushStatus.UNKNOWN)
and current == PushStatus.BAD
) or (
previous == PushStatus.BAD
and current in (PushStatus.GOOD, PushStatus.UNKNOWN)
):
formatted_date = datetime.datetime.fromtimestamp(
push.date, tz=datetime.timezone.utc
).strftime("%H:%M:%S")
email_content = EMAIL_PUSH_EVOLUTION.format(
previous=previous.name if previous else "no classification",
current=current.name,
push=push,
date=formatted_date,
branch=self.branch,
real_failures="\n- ".join(
[
f"Group [{group}]({_get_group_url(group)}) - {_list_tasks(tasks)}"
for group, tasks in regressions.real.items()
]
),
)
if emails:
notify_email(
emails=emails,
subject=f"Push status evolution {push.id} {push.rev[:8]}",
content=email_content,
)
if matrix_room:
notify_matrix(
room=matrix_room,
body=email_content,
)
def prepare_for_analysis(push):
removed_tasks: Dict[str, List[Task]] = {}
backedoutby: Dict[str, str] = {}
old_classifications: Dict[str, Dict[str, Dict[str, str]]] = {}
all_pushes = set(
[push]
+ [parent for parent in push._iterate_parents(max_depth=MAX_DEPTH)]
+ [child for child in push._iterate_children(max_depth=MAX_DEPTH)]
)
for p in all_pushes:
# Ignore retriggers and backfills on current push/its parents/its children.
removed_tasks[p.id] = [
task for task in p.tasks if task.is_backfill or task.is_retrigger
]
p.tasks = [task for task in p.tasks if task not in removed_tasks[p.id]]
# Pretend push was not backed out.
backedoutby[p.id] = p.backedoutby
p.backedoutby = None
# Pretend push was not finalized yet.
p._date = datetime.datetime.now().timestamp()
# Pretend no tasks were classified to run the model without any outside help.
old_classifications[p.id] = {}
for task in p.tasks:
old_classifications[p.id][task.id] = {
"classification": task.classification,
"note": task.classification_note,
}
task.classification = "not classified"
task.classification_note = None
return all_pushes, removed_tasks, backedoutby, old_classifications
def retrieve_sheriff_reals(pushes_group_summaries, push):
# Compare real failures that were predicted by mozci with the ones classified by Sheriffs
sheriff_reals = set()
# Get likely regressions of this push
likely_regressions = push.get_likely_regressions("group", True)
# Only consider groups that were classified as "fixed by commit" to exclude likely regressions mozci found via heuristics.
max_depth = None if push.backedout or push.bustage_fixed_by else MAX_DEPTH
for other in push._iterate_children(max_depth=max_depth):
if other.push_uuid not in pushes_group_summaries:
pushes_group_summaries[other.push_uuid] = other.group_summaries
for name, group in pushes_group_summaries[other.push_uuid].items():
classifications = set([c for c, _ in group.classifications])
if classifications == {"fixed by commit"} and name in likely_regressions:
sheriff_reals.add(name)
return pushes_group_summaries, sheriff_reals
def retrieve_sheriff_intermittents(pushes_group_summaries, push):
if push.push_uuid not in pushes_group_summaries:
pushes_group_summaries[push.push_uuid] = push.group_summaries
# Compare intermittent failures that were predicted by mozci with the ones classified by Sheriffs
sheriff_intermittents = set()
for name, group in pushes_group_summaries[push.push_uuid].items():
classifications = set([c for c, _ in group.classifications])
if classifications <= set(INTERMITTENT_CLASSES):
sheriff_intermittents.add(name)
return pushes_group_summaries, sheriff_intermittents
def parse_and_log_details(
group_summaries,
sheriff_groups,
expected,
push=None,
failures=None,
predicted_groups=None,
ignore_pending_conflicting=False,
state="",
suffix="",
):
to_print = []
if failures and not predicted_groups:
predicted_groups = failures[push][state].keys() if failures.get(push) else []
total = len(predicted_groups)
if not total:
if sheriff_groups:
to_print.append(
f"{len(sheriff_groups)} groups were classified as {state} by Sheriffs and missed by Mozci, missed groups:"
)
to_print.append(" - " + "\n - ".join(sheriff_groups))
output = {
f"total{suffix}": 0,
f"correct{suffix}": 0,
f"wrong{suffix}": 0,
f"missed{suffix}": len(sheriff_groups),
}
if not ignore_pending_conflicting:
output[f"pending{suffix}"] = 0
output[f"conflicting{suffix}"] = 0
return output, to_print
conflicting = []
differing = []
pending = []
for group in predicted_groups:
classifications_set = set(
task.classification for task in group_summaries[group].tasks if task.failed
)
if len(classifications_set) == 0:
continue
if (
len(classifications_set - {"not classified", "new failure not classified"})
== 0
):
pending.append(group)
elif len(classifications_set) != 1:
conflicting.append(group)
elif classifications_set.isdisjoint(expected):
differing.append(group)
missed = []
for group in sheriff_groups:
if group not in predicted_groups:
missed.append(group)
correct = total - len(differing)
if not ignore_pending_conflicting:
correct -= len(pending) + len(conflicting)
to_print.append(
f"{correct} out of {total} {state} groups were also classified as {state} by Sheriffs."
)
if differing:
to_print.append(
f"{len(differing)} out of {total} {state} groups weren't classified as {state} by Sheriffs, differing groups:"
)
to_print.append(" - " + "\n - ".join(differing))
if not ignore_pending_conflicting:
if pending:
to_print.append(
f"{len(pending)} out of {total} {state} groups are waiting to be classified by Sheriffs."
)
if conflicting:
to_print.append(
f"{len(conflicting)} out of {total} {state} groups have conflicting classifications applied by Sheriffs, inconsistent groups:"
)
to_print.append(" - " + "\n - ".join(conflicting))
if missed:
to_print.append(
f"{len(missed)} groups were classified as {state} by Sheriffs and missed (or classified as unknown) by Mozci, missed groups:"
)
to_print.append(" - " + "\n - ".join(missed))
output = {
f"total{suffix}": total,
f"correct{suffix}": correct,
f"wrong{suffix}": len(differing),
f"missed{suffix}": len(missed),
}
if not ignore_pending_conflicting:
output[f"pending{suffix}"] = len(pending)
output[f"conflicting{suffix}"] = len(conflicting)
return output, to_print
def check_ever_classified_as_cause(push, iterate_on):
ever_classified_as_cause = False
for (
other,
_,
candidate_regressions,
classified_as_cause,
) in push._iterate_failures(iterate_on):
if push.backedoutby in other.revs or push.bustage_fixed_by in other.revs:
return ever_classified_as_cause
ever_classified_as_cause = any(
result is True
for name in candidate_regressions.keys()
for result in classified_as_cause[name]
)
if ever_classified_as_cause:
return ever_classified_as_cause
class ClassifyEvalCommand(Command):
name = "push classify-eval"
description = "Evaluate the classification results for a given push (or a range of pushes) by comparing them with reality"
arguments = [
argument(
"branch",
description="Branch the push belongs to (e.g autoland, try, etc).",
optional=True,
default="autoland",
)
]
options = [
option("rev", description="Head revision of the push.", flag=False),
option(
"from-date",
description='Lower bound of the push range (as a date in yyyy-mm-dd format or a human expression like "1 days ago").',
flag=False,
),
option(
"to-date",
description='Upper bound of the push range (as a date in yyyy-mm-dd format or a human expression like "1 days ago"), defaults to now.',
flag=False,
),
option(
"intermittent-confidence-threshold",
description="Medium confidence threshold used to classify the regressions.",
flag=False,
),
option(
"real-confidence-threshold",
description="High confidence threshold used to classify the regressions.",
flag=False,
),
option(
"use-possible-regressions",
description="Use possible regressions while classifying the regressions.",
default=None,
flag=False,
),
option(
"unknown-from-regressions",
description="Unknown from regressions while classifying the regressions.",
default=None,
flag=False,
),
option(
"consider-children-pushes-configs",
description="Consider children pushes configs while classifying the regressions.",
default=None,
flag=False,
),
option(
"cross-config-counts",
description="Cross-config counts used to classify the regressions.",
flag=False,
),
option(
"consistent-failures-counts",
description="Consistent failures counts used to classify the regressions.",
flag=False,
),
option(
"recalculate",
description="If set, recalculate the classification instead of fetching an artifact.",
flag=True,
),
option(
"output",
description="Path towards a path to save a CSV file with classification states for various pushes.",
flag=False,
),
option(
"send-email",
description="If set, also send the evaluation report by email instead of just logging it.",
flag=True,
),
option(
"detailed-classifications",
description="If set, compare real/intermittent group classifications with Sheriff's ones.",
flag=True,
),
option(
"environment",
description="Environment in which the analysis is running (testing, production, ...).",
flag=False,
default="testing",
),
]
def handle(self) -> None:
branch = self.argument("branch")
self.line("<comment>Loading pushes...</comment>")
self.pushes = classify_commands_pushes(
branch,
self.option("from-date"),
self.option("to-date"),
self.option("rev"),
)
option_names = [
name.replace("_", "-")
for name, _ in signature(Push.classify).parameters.items()
if name != "self"
]
if self.option("recalculate"):
classify_parameters = retrieve_classify_parameters(self.option)
elif any(self.option(name) for name in option_names):
self.line(
f"<error>--recalculate isn't set, you shouldn't provide --{', --'.join(option_names)} CLI options.</error>"
)
return
# Progress bar will display time stats & messages
progress = self.progress_bar(len(self.pushes))
progress.set_format(
" %current%/%max% [%bar%] %percent:3s%% %elapsed:6s% %message%"
)
# Setup specific route prefix for existing tasks, according to environment
environment = self.option("environment")
route_prefix = (
"project.mozci.classification"
if environment == "production"
else f"project.mozci.{environment}.classification"
)
self.errors = {}
self.classifications = {}
self.failures = {}
for push in self.pushes:
if self.option("recalculate"):
progress.set_message(f"Calc. {branch} {push.id}")
(
all_pushes,
removed_tasks,
backedoutby,
old_classifications,
) = prepare_for_analysis(push)
try:
self.classifications[push], regressions, _ = push.classify(
**classify_parameters
)
self.failures[push] = {
"real": regressions.real,
"intermittent": regressions.intermittent,
"unknown": regressions.unknown,
}
except Exception as e:
self.line(
f"<error>Classification failed on {branch} {push.rev}: {e}</error>"
)
self.errors[push] = e
for p in all_pushes:
# Once the Mozci algorithm has run, restore Sheriffs classifications to be able to properly compare failures classifications.
for task in p.tasks:
task.classification = old_classifications[p.id][task.id][
"classification"
]
task.classification_note = old_classifications[p.id][task.id][
"note"
]
# Restore backout information.
p.backedoutby = backedoutby[p.id]
# And also restore tasks marked as a backfill or a retrigger.
p.tasks = p.tasks + removed_tasks[p.id]
else:
progress.set_message(f"Fetch {branch} {push.id}")
try:
index = f"{route_prefix}.{branch}.revision.{push.rev}"
task = Task.create(
index=index, root_url=COMMUNITY_TASKCLUSTER_ROOT_URL
)
artifact = task.get_artifact(
"public/classification.json",
root_url=COMMUNITY_TASKCLUSTER_ROOT_URL,
)
self.classifications[push] = PushStatus[
artifact["push"]["classification"]
]
self.failures[push] = artifact["failures"]
except TaskNotFound as e:
self.line(
f"<comment>Taskcluster task missing for {branch} {push.rev}</comment>"
)
self.errors[push] = e
except Exception as e:
self.line(
f"<error>Fetch failed on {branch} {push.rev}: {e}</error>"
)
self.errors[push] = e
warnings = []
# Warn about pushes that are backed-out and where all failures on the push itself and its children are marked as intermittent
if push.backedout or push.bustage_fixed_by:
ever_classified_as_cause = check_ever_classified_as_cause(push, "label")
if not ever_classified_as_cause:
ever_classified_as_cause = check_ever_classified_as_cause(
push, "group"
)
if not ever_classified_as_cause:
warnings.append(
{
"message": f"Push {push.branch}/{push.rev} was backedout and all of its failures and the ones of its children were marked as intermittent or marked as caused by another push.",
"type": "error",
"notify": config.get("warnings", {}).get(
"ever_classified_as_cause", False
),
}
)
for task in push.tasks:
if task.classification != "fixed by commit":
continue
# Warn if there is a classification that references a revision that does not exist
fix_hgmo = HgRev.create(
task.classification_note[:12], branch=push.branch
)
try:
fix_hgmo.changesets
except PushNotFound:
warnings.append(
{
"message": f"Task {task.id} on push {push.branch}/{push.rev} contains a classification that references a non-existent revision: {task.classification_note}.",
"type": "error",
"notify": config.get("warnings", {}).get(
"non_existent_fix", False
),
}
)
continue
if fix_hgmo.pushid <= push.id:
warnings.append(
{
"message": f"Task {task.label} on push {push.branch}/{push.rev} is classified as fixed by {task.classification_note}, which is older than the push itself.",
"type": "error",
"notify": config.get("warnings", {}).get(
"fix_older_than_push", False
),
}
)
continue
# Warn when a failure is classified as fixed by a backout of a push that is newer than the failure itself
all_backedouts = set(
backedout
for backedouts in fix_hgmo.backouts.values()
for backedout in backedouts
)
all_bustagefixed = set()
for child in push._iterate_children():
if child.rev == fix_hgmo.node:
break
for bug in child.bugs:
if bug in fix_hgmo.bugs_without_backouts:
all_bustagefixed.add(child.rev)
all_fixed = all_backedouts | all_bustagefixed
if len(all_fixed) > 0 and all(
HgRev.create(backedout, branch=push.branch).pushid > push.id
for backedout in all_fixed
):
warnings.append(
{
"message": f"Task {task.label} on push {push.branch}/{push.rev} is classified as fixed by a backout/bustage fix ({fix_hgmo.node}) of pushes ({all_fixed}) that come after the failure itself.",
"type": "error",
"notify": config.get("warnings", {}).get(
"backout_of_newer_pushes", False
),
}
)
# Warn when there are inconsistent classifications for a given group
if push.backedout or push.bustage_fixed_by:
group_classifications: dict[
str, dict[tuple[str, str], set[str]]
] = collections.defaultdict(lambda: collections.defaultdict(set))
for other in push._iterate_children():
if (
push.backedoutby in other.revs
or push.bustage_fixed_by in other.revs
):
break
for name, summary in other.group_summaries.items():
for classification in summary.classifications:
group_classifications[name][classification].add(other.rev)
for name, classification_to_revs in group_classifications.items():
if len(classification_to_revs) > 1:
inconsistent_list = [
f" - {classification} in pushes {', '.join(revs)}"
for classification, revs in classification_to_revs.items()
]
inconsistent = "\n" + ",\n".join(inconsistent_list)
warnings.append(
{
"message": f"Group {name} has inconsistent classifications: {inconsistent}.",
"type": "comment",
"notify": config.get("warnings", {}).get(
"inconsistent", False
),
}
)
# Output all warnings and also send them to the Matrix room if defined
matrix_room = config.get("matrix-room-id")
for warning in warnings:
warn_type = warning["type"]
warn_message = warning["message"]
do_notify = warning["notify"]
self.line(f"<{warn_type}>{warn_message}</{warn_type}>")
if matrix_room and do_notify:
notify_matrix(room=matrix_room, body=warn_message)
if not matrix_room and warnings:
self.line(
"<comment>Some warning notifications should have been sent but no matrix room was provided in the secret.</comment>"
)
# Advance the overall progress bar
progress.advance()
# Conclude the progress bar
progress.finish()
print("\n")
error_line = ""
if self.errors:
if self.option("recalculate"):
error_line = "Failed to recalculate classification"
else:
error_line = "Failed to fetch classification"
error_line += f" for {len(self.errors)} out of {len(self.pushes)} pushes."
if not self.option("recalculate") and not self.option("send-email"):
error_line += " Use the '--recalculate' option if you want to generate them yourself."
self.line(f"<error>{error_line}</error>")
stats = [
self.log_pushes(PushStatus.BAD, False),
self.log_pushes(PushStatus.BAD, True),
self.log_pushes(PushStatus.GOOD, False),
self.log_pushes(PushStatus.GOOD, True),
self.log_pushes(PushStatus.UNKNOWN, False),
self.log_pushes(PushStatus.UNKNOWN, True),
]
if self.option("detailed-classifications"):
self.line("\n")
pushes_group_summaries = {}
real_stats = intermittent_stats = {
"total": 0,
"correct": 0,
"wrong": 0,
"pending": 0,
"conflicting": 0,
"missed": 0,
}
for push in self.pushes:
self.line(
f"<comment>Printing detailed classifications comparison for push {push.branch}/{push.rev}</comment>"
)
if push.push_uuid not in pushes_group_summaries:
pushes_group_summaries[push.push_uuid] = push.group_summaries
# Compare real failures that were predicted by mozci with the ones classified by Sheriffs
try:
pushes_group_summaries, sheriff_reals = retrieve_sheriff_reals(
pushes_group_summaries, push
)
except Exception:
self.line(
"<error>Failed to retrieve Sheriff classifications for the real failures of this push.</error>"
)
try:
push_real_stats, to_print = parse_and_log_details(
pushes_group_summaries[push.push_uuid],
sheriff_reals,
{"fixed by commit"},
push=push,
failures=self.failures,
state="real",
)
for line in to_print:
self.line(line)
real_stats = {
key: value + push_real_stats[key]
for key, value in real_stats.items()
}
except Exception:
self.line(
"<error>Failed to compare true and predicted real failures of this push.</error>"
)
# Compare intermittent failures that were predicted by mozci with the ones classified by Sheriffs
try:
(
pushes_group_summaries,
sheriff_intermittents,
) = retrieve_sheriff_intermittents(pushes_group_summaries, push)
except Exception:
self.line(
"<error>Failed to retrieve Sheriff classifications for the intermittent failures of this push.</error>"
)
try:
push_intermittent_stats, to_print = parse_and_log_details(
pushes_group_summaries[push.push_uuid],
sheriff_intermittents,
set(INTERMITTENT_CLASSES),
push=push,
failures=self.failures,
state="intermittent",
)
for line in to_print:
self.line(line)
intermittent_stats = {
key: value + push_intermittent_stats[key]
for key, value in intermittent_stats.items()
}
except Exception:
self.line(
"<error>Failed to compare true and predicted intermittent failures of this push.</error>"
)
self.line(
f"\n<comment>Printing overall detailed classifications comparison for {len(self.pushes)} pushes</comment>"
)
detailed_stats = [
f"{real_stats['correct']} out of {real_stats['total']} failures were correctly classified as real ('fixed by commit' by Sheriffs).",
f"{real_stats['wrong']} out of {real_stats['total']} failures were wrongly classified as real ('intermittent' by Sheriffs).",
f"{real_stats['pending']} out of {real_stats['total']} failures classified as real are waiting to be classified by Sheriffs.",
f"{real_stats['conflicting']} out of {real_stats['total']} failures classified as real have conflicting classifications applied by Sheriffs.",
f"{real_stats['missed']} real failures were missed or classified as unknown by Mozci.",
f"{intermittent_stats['correct']} out of {intermittent_stats['total']} failures were correctly classified as intermittent ('intermittent' by Sheriffs).",
f"{intermittent_stats['wrong']} out of {intermittent_stats['total']} failures were wrongly classified as intermittent ('fixed by commit' by Sheriffs).",
f"{intermittent_stats['pending']} out of {intermittent_stats['total']} failures classified as intermittent are waiting to be classified by Sheriffs.",
f"{intermittent_stats['conflicting']} out of {intermittent_stats['total']} failures classified as intermittent have conflicting classifications applied by Sheriffs.",
f"{intermittent_stats['missed']} intermittent failures were missed or classified as unknown by Mozci.",
]
for line in detailed_stats:
self.line(line)
stats += detailed_stats
if self.option("send-email"):
self.send_emails(len(self.pushes), stats, error_line)
output = self.option("output")
if output:
# Build stats for CSV
with open(output, "w") as csvfile:
writer = csv.DictWriter(
csvfile,
fieldnames=[
"revision",
"date",
"classification",
"backedout",
"error_type",
"error_message",
],
)
writer.writeheader()
writer.writerows([self.build_stats(push) for push in self.pushes])
self.line(
f"<info>Written stats for {len(self.pushes)} pushes in {output}</info>"
)
def build_stats(self, push):
"""
Build a dict with statistics relevant for a push
"""
classification = self.classifications.get(push)
error = self.errors.get(push)
return {
"revision": push.rev,
"date": push.date,
"classification": classification or "error",
"backedout": push.backedout if classification else "",
"error_type": error.__class__.__name__ if error else "",
"error_message": str(error) if error else "",
}
def log_pushes(self, status, backedout):
"""
Display stats for all pushes in a given classification state + backout combination
"""
assert isinstance(status, PushStatus)
assert isinstance(backedout, bool)
nb = len(
[
push
for push in self.pushes
if self.classifications.get(push) == status
and push.backedout == backedout
]
)
verb = "were" if backedout else "weren't"
line = f"{nb} out of {len(self.pushes)} pushes {verb} backed-out by a sheriff and were classified as {status.name}."
self.line(line)
return line
def send_emails(self, total, stats, error_line):
today = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d")
stats = "\n".join([f"- {stat}" for stat in stats])
environment = self.option("environment")
notify_email(
emails=config.get("emails", {}).get("monitoring"),
subject=f"{environment} classify-eval report generated the {today}",
content=EMAIL_CLASSIFY_EVAL.format(
today=today,
total=total,
error_line=f"**{error_line}**" if error_line else "",
stats=stats,
),
)
class ClassifyPerfCommand(Command):
name = "push perf"
description = (
"Generate a CSV file with performance stats for all classification tasks"
)
arguments = [
argument(
"branch",
description="Branch the push belongs to (e.g autoland, try, etc).",
optional=True,
default="autoland",
)
]
options = [
option(
"environment",
description="Environment in which the analysis is running (testing, production, ...).",
flag=False,
default="testing",
),
option(
"output",
description="Output CSV file path.",
flag=False,
default="perfs.csv",
),
]
REGEX_ROUTE = re.compile(
r"^index.project.mozci.classification.([\w\-]+).(revision|push).(\w+)$"
)
def handle(self) -> None:
environment = self.option("environment")
output = self.option("output")
# Aggregate stats for completed tasks processed by the hook
stats = [
self.parse_task_status(task_status)
for group_id in self.list_groups_from_hook(
"project-mozci", f"decision-task-{environment}"
)
for task_status in self.list_classification_tasks(group_id)
]
# Dump stats as CSV file
with open(output, "w") as csvfile:
writer = csv.DictWriter(
csvfile,
fieldnames=[
"branch",
"push",
"revision",
"task_id",
"created",
"time_taken",
],
)
writer.writeheader()
writer.writerows(stats)
self.line(f"<info>Written stats for {len(stats)} tasks in {output}</info>")
def parse_routes(self, routes):
"""Find revision from task routes"""
def _match(route):
res = self.REGEX_ROUTE.search(route)
if res:
return res.groups()
# Extract branch+name+value from the routes
# and get 3 separated lists to check those values
branches, keys, values = zip(*filter(None, map(_match, routes)))
# We should only have one branch
branches = set(branches)
assert len(branches) == 1, f"Multiple branches detected: {branches}"
# Output single branch, revision and push id
data = dict(zip(keys, values))
assert "revision" in data, "Missing revision route"
assert "push" in data, "Missing push route"
return branches.pop(), data["revision"], int(data["push"])
def parse_task_status(self, task_status):
"""Extract identification and time spent for each classification task"""
def date(x):
return datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%fZ")
out = {
"task_id": task_status["status"]["taskId"],
"created": task_status["task"]["created"],
"time_taken": sum(
(date(run["resolved"]) - date(run["started"])).total_seconds()
for run in task_status["status"]["runs"]
if run["state"] == "completed"
),
}
out["branch"], out["revision"], out["push"] = self.parse_routes(
task_status["task"]["routes"]
)
return out
def list_groups_from_hook(self, group_id, hook_id):
"""List all decision tasks from the specified hook"""
hooks = taskcluster.Hooks(get_taskcluster_options())
fires = hooks.listLastFires(group_id, hook_id).get("lastFires", [])
# Setup CLI progress bar
progress = self.progress_bar(len(fires))
progress.set_format("verbose")
# Provide the decision task ID as it's the same value for group ID
for fire in fires:
yield fire["taskId"]
progress.advance()
# Cleanup progress bar
progress.finish()
def list_classification_tasks(self, group_id):
# Check cache first
cache_key = f"perf/task_group/{group_id}"
tasks = config.cache.get(cache_key, [])
if not tasks:
queue = taskcluster.Queue(get_taskcluster_options())
token = False
try:
# Support pagination using continuation token
while token is not None:
query = {"continuationToken": token} if token else {}
results = queue.listTaskGroup(group_id, query=query)
tasks += results.get("tasks")
token = results.get("continuationToken")
except TaskclusterRestFailure as e:
# Skip expired task groups
if e.status_code == 404:
return
raise
else:
logger.debug("From cache", cache_key)
for task_status in tasks:
task_id = task_status["status"]["taskId"]
# Skip decision task
if task_id == group_id:
continue
# Only provide completed tasks
if task_status["status"]["state"] != "completed":
logger.debug(f"Skip not completed task {task_id}")
continue
yield task_status
# Cache all tasks if all completed
if all(t["status"]["state"] == "completed" for t in tasks):
config.cache.add(cache_key, tasks, int(config["cache"]["retention"]))