sync/tc.py (411 lines of code) (raw):

from __future__ import annotations import os import requests import shutil import time import traceback import uuid from collections import defaultdict from datetime import datetime, timedelta import newrelic.agent import slugid import taskcluster from . import log from .env import Environment from .errors import RetryableError from .threadexecutor import ThreadExecutor from typing import Any, Callable, Dict, Iterator Task = Dict[str, Dict[str, Any]] TASKCLUSTER_ROOT_URL = "https://firefox-ci-tc.services.mozilla.com" QUEUE_BASE = TASKCLUSTER_ROOT_URL + "/api/queue/v1/" INDEX_BASE = TASKCLUSTER_ROOT_URL + "/api/index/v1/" _DATE_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" TREEHERDER_BASE = "https://treeherder.mozilla.org/" SUCCESS = "completed" FAIL = "failed" EXCEPTION = "exception" UNSCHEDULED = "unscheduled" RUNNING = "running" PENDING = "pending" logger = log.get_logger(__name__) env = Environment() class TaskclusterClient: def __init__(self): self._queue = None @property def queue(self): # Only used for retriggers which always use the new URL if not self._queue: self._queue = taskcluster.Queue({ "credentials": { "clientId": env.config["taskcluster"]["client_id"], "accessToken": env.config["taskcluster"]["token"] }, "rootUrl": TASKCLUSTER_ROOT_URL, }) return self._queue def retrigger(self, task_id, count=1, retries=5): logger.info("Retriggering task %s" % task_id) payload = self.queue.task(task_id) now = taskcluster.fromNow("0 days") created = datetime.strptime(payload["created"], _DATE_FMT) deadline = datetime.strptime(payload["deadline"], _DATE_FMT) expiration = datetime.strptime(payload["expires"], _DATE_FMT) to_dead = deadline - created to_expire = expiration - created payload["deadline"] = taskcluster.stringDate( taskcluster.fromNow("%d days %d seconds" % (to_dead.days, to_dead.seconds), now) ) payload["expires"] = taskcluster.stringDate( taskcluster.fromNow("%d days %d seconds" % (to_expire.days, to_expire.seconds), now) ) payload["created"] = taskcluster.stringDate(now) payload["retries"] = 0 rv = [] while count > 0: new_id = slugid.nice() r = retries while r > 0: try: rv.append(self.queue.createTask(new_id, payload)) break except Exception as e: r -= 1 logger.warning(traceback.format_exc(e)) count -= 1 return rv or None def normalize_task_id(task_id: str) -> str: # For some reason, pulse doesn't get the real # task ID, but some alternate encoding of it that doesn't # work anywhere else. So we have to first convert to the canonical # form. task_id = task_id.split("/", 1)[0] try: task_uuid = uuid.UUID(task_id) except ValueError: # This is probably alrady in the canonoical form return task_id return slugid.encode(task_uuid) def parse_job_name(job_name): if job_name.startswith("test-"): job_name = job_name[len("test-"):] if "web-platform-tests" in job_name: job_name = job_name[:job_name.index("web-platform-tests")] job_name = job_name.rstrip("-") job_name = job_name.replace("/", "-") return job_name def result_from_run(run): result_map = {"completed": "success", "failed": "fail"} state = run.get("state") if state in result_map: return result_map[state] if state == "exception": if run.get("reasonResolved") == "canceled": return "canceled" if run.get("reasonResolved") == "superseded": return "superseded" return "exception" return "unknown" class TaskGroup: def __init__(self, taskgroup_id: str, tasks: Any | None = None) -> None: self.taskgroup_id = taskgroup_id self._tasks = tasks @property def tasks(self) -> list[Task]: if self._tasks: return self._tasks list_url = QUEUE_BASE + "task-group/" + self.taskgroup_id + "/list" r = requests.get(list_url, params={ "limit": 200 }) reply = r.json() tasks = reply["tasks"] while "continuationToken" in reply: r = requests.get(list_url, params={ "limit": 200, "continuationToken": reply["continuationToken"] }) reply = r.json() tasks += reply["tasks"] self._tasks = tasks return self._tasks def refresh(self) -> list[Task]: self._tasks = None return self.tasks def tasks_by_id(self) -> dict[str, Task]: return {item["status"]["taskId"]: item for item in self.tasks} def view(self, filter_fn: Callable | None = None) -> TaskGroupView: return TaskGroupView(self, filter_fn) class TaskGroupView: def __init__(self, taskgroup: TaskGroup, filter_fn: Callable[[Task], bool] | None) -> None: self.taskgroup = taskgroup self.filter_fn: Callable[[Task], bool] = (filter_fn if filter_fn is not None else lambda x: bool(x)) self._tasks: list[Task] | None = None def __bool__(self): return bool(self.tasks) def __len__(self) -> int: return len(self.tasks) def __iter__(self) -> Iterator[Task]: yield from self.tasks @property def tasks(self) -> list[Task]: if self._tasks: return self._tasks self._tasks = [item for item in self.taskgroup.tasks if self.filter_fn(item)] assert self._tasks is not None return self._tasks def refresh(self): self._tasks = None self.taskgroup.refresh() return self.tasks def incomplete_tasks(self, allow_unscheduled: bool = False, ) -> Iterator[Task]: tasks_by_id = self.taskgroup.tasks_by_id() for task in self.tasks: if task_is_incomplete(task, tasks_by_id, allow_unscheduled): yield task def failed_builds(self) -> TaskGroupView: """Return the builds that failed""" builds = self.filter(is_build) return builds.filter(is_status_fn({FAIL, EXCEPTION})) def filter(self, filter_fn: Callable[[Task], bool]) -> TaskGroupView: def combined_filter(task: Task ) -> bool: return self.filter_fn(task) and filter_fn(task) return self.taskgroup.view(combined_filter) def is_complete(self, allow_unscheduled: bool = False) -> bool: return not any(self.incomplete_tasks(allow_unscheduled)) def by_name(self) -> dict[str, list[Task]]: rv = defaultdict(list) for task in self.tasks: name = task.get("task", {}).get("metadata", {}).get("name") if name: rv[name].append(task) return rv @newrelic.agent.function_trace() def download_logs(self, destination: str, file_names: list[str], retry: int = 5 ): # type (...) -> None if not os.path.exists(destination): os.makedirs(destination) if not file_names: return [] logger.info("Downloading logs to %s" % destination) t0 = time.time() executor = ThreadExecutor(8, work_fn=get_task_artifacts, init_fn=start_session) errors = executor.run([((), { "destination": destination, "task": item, "file_names": file_names, "retry": retry }) for item in self.tasks]) # TODO: not sure if we can avoid tolerating some errors here, but # there is probably some sign of badness less than all the downloads # erroring for error in errors: logger.warning(traceback.format_exc(error)) if len(errors) == len(self.tasks): raise RetryableError("Downloading logs all failed") logger.info("Downloading logs took %s" % (time.time() - t0)) def start_session(): return {"session": requests.Session()} def get_task_artifacts(destination: str, task: Task, file_names: list[str], session: requests.Session | None, retry: int ): status = task.get("status", {}) if not status.get("runs"): logger.debug("No runs for task %s" % status["taskId"]) return artifacts_base_url = QUEUE_BASE + "task/%s/artifacts" % status["taskId"] if session is None: session = requests.Session() try: artifacts = fetch_json(artifacts_base_url, session=session) except requests.HTTPError as e: logger.warning(str(e)) artifact_urls = ["{}/{}".format(artifacts_base_url, item["name"]) for item in artifacts["artifacts"] if any(item["name"].endswith("/" + file_name) for file_name in file_names)] run = status["runs"][-1] if "_log_paths" not in run: run["_log_paths"] = {} for url in artifact_urls: params = { "task": status["taskId"], "file_name": url.rsplit("/", 1)[1] } log_name = "{task}_{file_name}".format(**params) success = False logger.debug(f"Trying to download {url}") log_path = os.path.abspath(os.path.join(destination, log_name)) if not os.path.exists(log_path): success = download(url, log_path, retry, session=session) else: success = True if not success: logger.warning(f"Failed to download log from {url}") run["_log_paths"][params["file_name"]] = log_path def task_is_incomplete(task: Task, tasks_by_id: dict[str, Task], allow_unscheduled: bool, ) -> bool: status = task.get("status", {}).get("state", PENDING) if status in (PENDING, RUNNING): return True elif status == UNSCHEDULED: if not allow_unscheduled: return True # If the task is unscheduled, we may regard it as complete if # all dependencies are complete # TODO: is there a race condition here where dependencies can be # complete and successful but this task has not yet been scheduled? # A task can depend on its image; it's OK to ignore this for our purposes image = task.get("task", {}).get("payload", {}).get("image", {}).get("taskId") dependencies = [item for item in task.get("task", {}).get("dependencies", []) if item != image] if not dependencies: return True # Not sure how to handle a case where a dependent doesn't exist, ignore it return any(task_is_incomplete(tasks_by_id[dependent_id], tasks_by_id, allow_unscheduled) for dependent_id in dependencies if dependent_id in tasks_by_id) return False def is_suite(suite: str, task: dict[str, dict[str, Any]], ) -> bool: t = task.get("task", {}).get("extra", {}).get("suite", {}) if isinstance(t, dict): t = t.get("name", "") return t.startswith(suite) def is_suite_fn(suite: str) -> Callable: return lambda x: is_suite(suite, x) def check_tag(task: Task, tag: str, ) -> bool: tags = task.get("task", {}).get("tags") if tags: return tags.get("kind") == tag return False def is_test(task: Task) -> bool: return check_tag(task, "test") def is_build(task: Task) -> bool: return check_tag(task, "build") def is_status(statuses: set[str] | str, task: Task, ) -> bool: state: str | None = task.get("status", {}).get("state") return state is not None and state in statuses def is_status_fn(statuses: set[str] | str) -> Callable: if isinstance(statuses, (str, str)): statuses = {statuses} return lambda x: is_status(statuses, x) def lookup_index(index_name: str) -> str | None: if index_name is None: return None idx_url = INDEX_BASE + "task/" + index_name resp = requests.get(idx_url) resp.raise_for_status() idx = resp.json() task_id = idx.get("taskId") if task_id: return task_id logger.warning("Task not found from index: {}\n{}".format(index_name, idx.get("message", ""))) return task_id def lookup_treeherder(project: str, revision: str) -> str | None: push_data = fetch_json(TREEHERDER_BASE + f"api/project/{project}/push/", params={"revision": revision}) pushes = push_data.get("results", []) push_id = pushes[0].get("id") if pushes else None if push_id is None: return None jobs_data = fetch_json(TREEHERDER_BASE + "api/jobs/", {"push_id": push_id}) property_names = jobs_data["job_property_names"] idx_name = property_names.index("job_type_name") idx_task = property_names.index("task_id") decision_tasks = [item for item in jobs_data["results"] if item[idx_name] == "Gecko Decision Task"] return decision_tasks[-1][idx_task] def get_task(task_id: str) -> dict[str, Any] | None: if task_id is None: return task_url = QUEUE_BASE + "task/" + task_id r = requests.get(task_url) task = r.json() if task.get("taskGroupId"): return task logger.warning("Task {} not found: {}".format(task_id, task.get("message", ""))) return None def get_task_status(task_id: str) -> dict[str, Any] | None: if task_id is None: return status_url = f"{QUEUE_BASE}task/{task_id}/status" r = requests.get(status_url) status = r.json() if status.get("status"): return status["status"] logger.warning("Task {} not found: {}".format(task_id, status.get("message", ""))) return None def download(log_url: str, log_path: str, retry: int, session: requests.Session | None = None) -> bool: if session is None: session = requests.Session() while retry > 0: try: logger.debug("Downloading from %s" % log_url) t0 = time.time() resp = session.get(log_url, stream=True) if resp.status_code < 200 or resp.status_code >= 300: logger.warning("Download failed with status %s" % resp.status_code) retry -= 1 continue tmp_path = log_path + ".tmp" with open(tmp_path, 'wb') as f: resp.raw.decode_content = True shutil.copyfileobj(resp.raw, f) os.rename(tmp_path, log_path) logger.debug("Download took %s" % (time.time() - t0)) return True except Exception: logger.warning(traceback.format_exc()) retry -= 1 return False def fetch_json(url: str, params: dict[str, str] | None = None, session: requests.Session | None = None ): # type (...) -> Union[Dict[Text, Any], List[Any]] if session is None: session = requests.Session() t0 = time.time() logger.debug("Getting json from %s" % url) headers = { 'Accept': 'application/json', 'User-Agent': 'wpt-sync', } resp = session.get(url=url, params=params, headers=headers, timeout=30) resp.raise_for_status() logger.debug("Getting json took %s" % (time.time() - t0)) return resp.json() def get_taskgroup_id(project: str, revision: str) -> tuple[str, str, list[dict[str, Any]]]: idx = f"gecko.v2.{project}.revision.{revision}.firefox.decision" try: task_id = lookup_index(idx) except requests.HTTPError: task_id = lookup_treeherder(project, revision) if task_id is None: raise ValueError("Failed to look up task id from index %s" % idx) status = get_task_status(task_id) if status is None: raise ValueError("Failed to look up status for task %s" % task_id) state = status["state"] runs = status["runs"] return (task_id, state, runs) def cleanup(): base_path = os.path.join(env.config["root"], env.config["paths"]["try_logs"]) for repo_dir in os.listdir(base_path): repo_path = os.path.join(base_path, repo_dir) if not os.path.isdir(repo_path): continue for rev_dir in os.listdir(repo_path): rev_path = os.path.join(repo_path, rev_dir) if not os.path.isdir(rev_path): continue now = datetime.now() # Data hasn't been touched in three days if (datetime.fromtimestamp(os.stat(rev_path).st_mtime) < now - timedelta(days=3)): logger.info("Removing downloaded logs without recent activity %s" % rev_path) shutil.rmtree(rev_path)