sync/trypush.py (506 lines of code) (raw):
from __future__ import annotations
import os
import re
import shutil
import subprocess
import traceback
from collections import defaultdict
import taskcluster
import yaml
from . import base
from . import log
from . import tc
from . import tree
from .env import Environment
from .errors import AbortError, RetryableError
from .index import TaskGroupIndex, TryCommitIndex
from .load import get_syncs
from .lock import constructor, mut
from .projectutil import Mach
from .repos import cinnabar
from .tc import TaskGroupView
from typing import Any, Mapping, MutableMapping, Text, TYPE_CHECKING
from git.repo.base import Repo
if TYPE_CHECKING:
from sync.downstream import DownstreamSync
from sync.landing import LandingSync
from sync.lock import SyncLock
from sync.tc import TaskGroup
logger = log.get_logger(__name__)
env = Environment()
auth_tc = tc.TaskclusterClient()
rev_re = re.compile("revision=(?P<rev>[0-9a-f]{40})")
class TryCommit:
def __init__(self,
git_gecko: Repo,
worktree: Repo,
tests_by_type: Mapping[str, list[str]] | None,
rebuild: int,
hacks: bool = True,
**kwargs: Any
) -> None:
self.git_gecko = git_gecko
self.worktree = worktree
self.tests_by_type = tests_by_type
self.rebuild = rebuild
self.hacks = hacks
self.try_rev = None
self.extra_args = kwargs
self.reset: str | None = None
def __enter__(self) -> TryCommit:
self.create()
return self
def __exit__(self, *args: Any, **kwargs: Any) -> None:
self.cleanup()
def create(self):
pass
def cleanup(self) -> None:
if self.reset is not None:
logger.debug("Resetting working tree to %s" % self.reset)
self.worktree.head.reset(self.reset, working_tree=True)
self.reset = None
def apply_hacks(self) -> None:
# Some changes to forceably exclude certain default jobs that are uninteresting
# Spidermonkey jobs that take > 3 hours
logger.info("Removing spidermonkey jobs")
tc_config = "taskcluster/ci/config.yml"
working_dir = self.worktree.working_dir
assert working_dir is not None
path = os.path.join(working_dir, tc_config)
if os.path.exists(path):
with open(path) as f:
data = yaml.safe_load(f)
if "try" in data and "ridealong-builds" in data["try"]:
data["try"]["ridealong-builds"] = {}
with open(path, "w") as f:
yaml.dump(data, f)
self.worktree.index.add([tc_config])
def push(self) -> str:
status, output = self._push()
return self.read_treeherder(status, output)
def _push(self) -> tuple[int, str]:
raise NotImplementedError
def read_treeherder(self, status: int, output: str) -> str:
msg = f"Failed to push to try:\n{output}"
try_rev: str | None = None
if status != 0:
logger.error(msg)
raise RetryableError(AbortError(msg))
rev_match = rev_re.search(output)
if not rev_match:
logger.warning(f"No revision found in string:\n\n{output}\n")
# Assume that the revision is HEAD
# This happens in tests and isn't a problem, but would be in real code,
# so that's not ideal
try:
try_rev = cinnabar(self.git_gecko).git2hg(self.worktree.head.commit.hexsha)
except ValueError:
pass
else:
try_rev = rev_match.group('rev')
if try_rev is None:
logger.error(msg)
raise AbortError(msg)
return try_rev
class TryFuzzyCommit(TryCommit):
def __init__(self,
git_gecko: Repo,
worktree: Repo,
tests_by_type: Mapping[str, list[str]] | None,
rebuild: int,
hacks: bool = True,
**kwargs: Any
) -> None:
super().__init__(git_gecko, worktree, tests_by_type, rebuild,
hacks=hacks, **kwargs)
self.queries = self.extra_args.get("queries",
["web-platform-tests !macosx !shippable !asan !tsan"])
if isinstance(self.queries, str):
self.queries = [self.queries]
self.full = self.extra_args.get("full", False)
self.disable_target_task_filter = self.extra_args.get("disable_target_task_filter", False)
self.artifact = self.extra_args.get("artifact", True)
def create(self) -> None:
if self.hacks:
self.reset = self.worktree.head.commit.hexsha
self.apply_hacks()
# TODO add something useful to the commit message here since that will
# appear in email &c.
self.worktree.index.commit(message="Apply task hacks before running try")
def _push(self) -> tuple[int, str]:
self.worktree.git.reset("--hard")
working_dir = self.worktree.working_dir
assert working_dir is not None
mach = Mach(working_dir)
# Gross hack to create a objdir until we figure out why this is failing
# from here but not from the shell
try:
if not os.path.exists(os.path.join(working_dir,
"obj-x86_64-pc-linux-gnu")):
mach.python("-c", "")
except OSError:
pass
query_args = []
for query in self.queries:
query_args.extend(["-q", query])
logger.info("Pushing to try with fuzzy query: %s" % " ".join(query_args))
can_push_routes = b"--route " in mach.try_("fuzzy", "--help")
args = ["fuzzy"] + query_args
if self.rebuild:
args.append("--rebuild")
args.append(str(self.rebuild))
if self.full:
args.append("--full")
if self.disable_target_task_filter:
args.append("--disable-target-task-filter")
if can_push_routes:
args.append("--route=notify.pulse.wptsync.try-task.on-any")
if self.artifact:
args.append("--artifact")
else:
args.append("--no-artifact")
# --push-to-vcs is required to push directly to hgmo
args.append("--push-to-vcs")
if self.tests_by_type is not None:
paths = []
all_paths = set()
for values in self.tests_by_type.values():
for item in values:
if (item not in all_paths and
os.path.exists(os.path.join(working_dir,
item))):
paths.append(item)
all_paths.add(item)
max_tests = env.config["gecko"]["try"].get("max-tests")
if max_tests and len(paths) > max_tests:
logger.warning("Capping number of affected tests at %d" % max_tests)
paths = paths[:max_tests]
args.extend(paths)
try:
output = mach.try_(*args, stderr=subprocess.STDOUT)
return 0, output.decode("utf8", "replace")
except subprocess.CalledProcessError as e:
return e.returncode, e.output
class TryPush(base.ProcessData):
"""A try push is represented by an annotated tag with a path like
try/<pr_id>/<id>
Where id is a number to indicate the Nth try push for this PR.
"""
obj_type = "try"
statuses = ("open", "complete", "infra-fail")
status_transitions = [("open", "complete"),
("complete", "open"), # For reopening "failed" landing try pushes
("infra-fail", "complete")]
@classmethod
@constructor(lambda args: (args["sync"].process_name.subtype,
args["sync"].process_name.obj_id))
def create(cls,
lock: SyncLock,
sync: DownstreamSync | LandingSync,
affected_tests: dict[str, list[str]] | None = None,
stability: bool = False,
hacks: bool = True,
try_cls: type = TryFuzzyCommit,
rebuild_count: int | None = None,
check_open: bool = True,
**kwargs: Any
) -> TryPush:
logger.info("Creating try push for PR %s" % sync.pr)
if check_open and not tree.is_open("try"):
logger.info("try is closed")
raise RetryableError(AbortError("Try is closed"))
# Ensure the required indexes exist
TaskGroupIndex.get_or_create(sync.git_gecko)
try_idx = TryCommitIndex.get_or_create(sync.git_gecko)
git_work = sync.gecko_worktree.get()
if rebuild_count is None:
rebuild_count = 0 if not stability else env.config['gecko']['try']['stability_count']
if not isinstance(rebuild_count, int):
logger.error("Could not find config for Stability rebuild count, using default 5")
rebuild_count = 5
with try_cls(sync.git_gecko, git_work, affected_tests, rebuild_count, hacks=hacks,
**kwargs) as c:
try_rev = c.push()
data = {
"try-rev": try_rev,
"stability": stability,
"gecko-head": sync.gecko_commits.head.sha1,
"wpt-head": sync.wpt_commits.head.sha1,
"status": "open",
"bug": sync.bug,
}
process_name = base.ProcessName.with_seq_id(sync.git_gecko,
cls.obj_type,
sync.sync_type,
str(getattr(sync, sync.obj_id)))
rv = super().create(lock, sync.git_gecko, process_name, data)
try_idx.insert(try_idx.make_key(try_rev), process_name)
with rv.as_mut(lock):
rv.created = taskcluster.fromNowJSON("0 days")
if sync.bug is not None:
env.bz.comment(sync.bug,
"Pushed to try%s %s" %
(" (stability)" if stability else "",
rv.treeherder_url))
return rv
@classmethod
def load_all(cls, git_gecko):
process_names = base.ProcessNameIndex(git_gecko).get("try")
for process_name in process_names:
yield cls(git_gecko, process_name)
@classmethod
def for_commit(cls, git_gecko, sha1):
idx = TryCommitIndex(git_gecko)
process_name = idx.get(idx.make_key(sha1))
if process_name:
logger.info(f"Found try push {process_name!r} for rev {sha1}")
return cls(git_gecko, process_name)
logger.info(f"No try push for rev {sha1}")
@classmethod
def for_taskgroup(cls, git_gecko, taskgroup_id):
idx = TaskGroupIndex(git_gecko)
process_name = idx.get(idx.make_key(taskgroup_id))
if process_name:
return cls(git_gecko, process_name)
@property
def treeherder_url(self) -> str:
return "https://treeherder.mozilla.org/#/jobs?repo=try&revision=%s" % self.try_rev
@property
def created(self) -> Any | None:
return self.get("created")
@created.setter # type: ignore
@mut()
def created(self, value: str) -> None:
self["created"] = value
@property
def try_rev(self) -> str | None:
return self.get("try-rev")
@try_rev.setter # type: ignore
@mut()
def try_rev(self, value: str) -> None:
idx = TryCommitIndex(self.repo)
if self.try_rev is not None:
idx.delete(idx.make_key(self.try_rev), self.process_name)
self._data["try-rev"] = value
assert self.try_rev is not None
idx.insert(idx.make_key(self.try_rev), self.process_name)
@property
def taskgroup_id(self) -> str | None:
return self.get("taskgroup-id")
@taskgroup_id.setter # type: ignore
@mut()
def taskgroup_id(self, value: str) -> None:
self["taskgroup-id"] = value
idx = TaskGroupIndex(self.repo)
if value:
idx.insert(idx.make_key(value), self.process_name)
@property
def status(self) -> str:
return self.get("status")
@status.setter # type: ignore
@mut()
def status(self, value: str) -> None:
if value not in self.statuses:
raise ValueError("Unrecognised status %s" % value)
current = self.get("status")
if current == value:
return
if (current, value) not in self.status_transitions:
raise ValueError(f"Tried to change status from {current} to {value}")
self["status"] = value
@property
def wpt_head(self) -> str:
return self.get("wpt-head")
@property
def gecko_head(self) -> Text:
return self.get("gecko-head")
def sync(self, git_gecko, git_wpt):
process_name = self.process_name
syncs = get_syncs(git_gecko,
git_wpt,
process_name.subtype,
process_name.obj_id)
if len(syncs) == 0:
return None
if len(syncs) == 1:
return syncs.pop()
for item in syncs:
if item.status == "open":
return item
raise ValueError("Got multiple syncs and none were open")
@property
def stability(self) -> bool:
"""Is the current try push a stability test"""
return self["stability"]
@property
def bug(self) -> int:
return int(self.get("bug"))
@property
def infra_fail(self) -> bool:
"""Does this push have infrastructure failures"""
if self.status == "infra-fail":
self.status = "complete" # type: ignore
self.infra_fail = True # type: ignore
return self.get("infra-fail", False)
@infra_fail.setter # type: ignore
@mut()
def infra_fail(self, value: bool) -> None:
"""Set the status of this push's infrastructure failure state"""
if value == self.get("infra-fail"):
return
self["infra-fail"] = value
if value:
self.notify_failed_builds()
def notify_failed_builds(self) -> None:
if self.taskgroup_id is None:
logger.error("No task group for try push %s" % self)
return
tasks = tc.TaskGroup(self.taskgroup_id).view()
failed = tasks.failed_builds()
if not failed:
logger.error("No failed builds to report for Try push: %s" % self)
return
bug = self.bug
if not bug:
logger.error("No associated bug found for Try push: %s" % self)
return
msg = "There were infrastructure failures for the Try push (%s):\n" % self.treeherder_url
msg += "\n".join(task.get("task", {}).get("metadata", {}).get("name")
for task in failed.tasks)
env.bz.comment(bug, msg)
@property
def accept_failures(self) -> bool:
return self.get("accept-failures", False)
@accept_failures.setter # type: ignore
@mut()
def accept_failures(self, value: bool) -> None:
self["accept-failures"] = value
def tasks(self) -> TryPushTasks | None:
"""Get a list of all the taskcluster tasks for web-platform-tests
jobs associated with the current try push.
:return: List of tasks
"""
if self.taskgroup_id is None:
return None
task_id = tc.normalize_task_id(self.taskgroup_id)
if task_id != self.taskgroup_id:
self.taskgroup_id = task_id # type: ignore
tasks = tc.TaskGroup(self.taskgroup_id)
tasks.refresh()
return TryPushTasks(tasks)
def log_path(self) -> str:
assert self.try_rev is not None
return os.path.join(env.config["root"], env.config["paths"]["try_logs"],
"try", self.try_rev)
@mut()
def download_logs(self, wpt_taskgroup: TaskGroupView | TryPushTasks,
first_only: bool = False) -> TaskGroupView:
"""Download all the wptreport logs for the current try push
:return: List of paths to logs
"""
# Allow passing either TryPushTasks or the actual TaskGroupView
if hasattr(wpt_taskgroup, "wpt_tasks"):
assert isinstance(wpt_taskgroup, TryPushTasks)
wpt_tasks = wpt_taskgroup.wpt_tasks
else:
assert isinstance(wpt_taskgroup, TaskGroupView)
wpt_tasks = wpt_taskgroup
exclude = set()
def included(t: dict[str, dict[str, Any]]) -> bool:
# if a name is on the excluded list, only download SUCCESS job logs
name = t.get("task", {}).get("metadata", {}).get("name")
state = t.get("status", {}).get("state")
output = name not in exclude or state == tc.SUCCESS
# Add this name to exclusion list, so that we don't download the repeated run
# logs in a stability try run
if first_only and name is not None and name not in exclude:
exclude.add(name)
return output
if self.try_rev is None:
if wpt_tasks:
logger.info("Got try push with no rev; setting it from a task")
try_rev = (next(iter(wpt_tasks))
.get("task", {})
.get("payload", {})
.get("env", {})
.get("GECKO_HEAD_REV"))
if try_rev:
self.try_rev = try_rev # type: ignore
else:
raise ValueError("Unknown try rev for %s" % self.process_name)
include_tasks = wpt_tasks.filter(included)
logger.info("Downloading logs for try revision %s" % self.try_rev)
file_names = ["wptreport.json"]
include_tasks.download_logs(self.log_path(), file_names)
return include_tasks
@mut()
def cleanup_logs(self):
logger.info("Removing downloaded for try push %s" % self.process_name)
try:
shutil.rmtree(self.log_path())
except Exception:
logger.warning("Failed to remove logs %s:%s" %
(self.log_path(), traceback.format_exc()))
@mut()
def delete(self):
super().delete()
for (idx_cls, data) in [(TaskGroupIndex, self.taskgroup_id),
(TryCommitIndex, self.try_rev)]:
if data is not None:
idx = idx_cls(self.repo)
key = idx.make_key(data)
idx.delete(key, data)
class TryPushTasks:
_retrigger_count = 6
# min rate of job success to proceed with metadata update
_min_success = 0.7
def __init__(self, tasks: TaskGroup) -> None:
"""Wrapper object that implements sync-specific business logic on top of a
list of tasks"""
self.wpt_tasks = tasks.view(tc.is_suite_fn("web-platform-tests"))
def __len__(self) -> int:
return len(self.wpt_tasks)
def complete(self, allow_unscheduled: bool = False) -> bool:
return self.wpt_tasks.is_complete(allow_unscheduled)
def validate(self):
err = None
if not len(self.wpt_tasks):
err = ("No wpt tests found. Check decision task %s" %
self.wpt_tasks.taskgroup.taskgroup_id)
else:
exception_tasks = self.wpt_tasks.filter(tc.is_status_fn(tc.EXCEPTION))
if float(len(exception_tasks)) / len(self.wpt_tasks) > (1 - self._min_success):
err = ("Too many exceptions found among wpt tests. "
"Check decision task %s" % self.wpt_tasks.taskgroup.taskgroup_id)
if err:
logger.error(err)
return False
return True
def retrigger_failures(self, count: int = _retrigger_count) -> int:
task_states = self.wpt_states()
def is_failure(task_data: dict) -> bool:
states = task_data["states"]
return states[tc.FAIL] > 0 or states[tc.EXCEPTION] > 0
def is_excluded(name: str) -> bool:
return "-aarch64" in name
failures = [data["task_id"] for name, data in task_states.items()
if is_failure(data) and not is_excluded(name)]
retriggered_count = 0
for task_id in failures:
jobs = auth_tc.retrigger(task_id, count=count)
if jobs:
retriggered_count += len(jobs)
return retriggered_count
def wpt_states(self) -> Mapping[str, Any]:
# e.g. {"test-linux32-stylo-disabled/opt-web-platform-tests-e10s-6": {
# "task_id": "abc123"
# "states": {
# "completed": 5,
# "failed": 1
# }
# }}
by_name = self.wpt_tasks.by_name()
task_states: MutableMapping[str, Any] = defaultdict(lambda:
defaultdict(lambda: defaultdict(int)))
for name, tasks in by_name.items():
for task in tasks:
task_id = task.get("status", {}).get("taskId")
state = task.get("status", {}).get("state")
task_states[name]["states"][state] += 1
# need one task_id per job group for retriggering
task_states[name]["task_id"] = task_id
return task_states
def failed_builds(self) -> TaskGroupView:
"""Return the builds that failed"""
return self.wpt_tasks.failed_builds()
def successful_builds(self):
"""Return the builds that were successful"""
builds = self.wpt_tasks.filter(tc.is_build)
return builds.filter(tc.is_status_fn({tc.SUCCESS}))
def retriggered_wpt_states(self) -> Mapping[str, Mapping[str, Any]]:
# some retrigger requests may have failed, and we try to ignore
# manual/automatic retriggers made outside of wptsync
threshold = max(1, self._retrigger_count / 2)
task_counts = self.wpt_states()
return {name: data for name, data in task_counts.items()
if sum(data["states"].values()) > threshold}
def success(self) -> bool:
"""Check if all the wpt tasks in a try push ended with a successful status"""
wpt_tasks = self.wpt_tasks
if wpt_tasks:
return all(task.get("status", {}).get("state") == tc.SUCCESS for task in wpt_tasks)
return False
def has_failures(self):
"""Check if any of the wpt tasks in a try push ended with a failure status"""
wpt_tasks = self.wpt_tasks
if wpt_tasks:
return any(task.get("status", {}).get("state") == tc.FAIL for task in wpt_tasks)
return False
def has_completed_tests(self) -> bool:
"""Check if all the wpt tasks in a try push completed"""
wpt_tasks = self.wpt_tasks.filter(tc.is_test)
if wpt_tasks:
return any(task.get("status", {}).get("state") in (tc.SUCCESS, tc.FAIL)
for task in wpt_tasks)
return False
def success_rate(self) -> float:
wpt_tasks = self.wpt_tasks
if not wpt_tasks:
return float(0)
success = wpt_tasks.filter(tc.is_status_fn(tc.SUCCESS))
return float(len(success)) / len(wpt_tasks)
def failure_limit_exceeded(self, target_rate: float = _min_success) -> bool:
return self.success_rate() < target_rate