in mozci/push.py [0:0]
def tasks(self):
"""All tasks that ran on the push, including retriggers and backfills.
Returns:
list: A list of `Task` objects.
"""
cached_tasks = config.cache.get(f"{self.push_uuid}/tasks")
# Push is supposedly finalized, if a cache exists we can return it as tasks should've finished running
if self.is_finalized and cached_tasks is not None:
return cached_tasks
logger.debug(f"Retrieving all tasks and groups which run on {self.rev}...")
logger.debug(f"Gathering data about tasks that run on {self.rev}...")
# Gather data about tasks that ran on a given push.
try:
# TODO: Skip tasks with `retry` as result
tasks = data.handler.get("push_tasks", branch=self.branch, rev=self.rev)
except MissingDataError:
return []
# Update gathered tasks with data retrieved from the cache
completed_cached_tasks = {}
if cached_tasks:
completed_cached_tasks = {
t.id: vars(t) for t in cached_tasks if t.state in TASK_FINAL_STATES
}
tasks = [{**t, **completed_cached_tasks.get(t["id"], {})} for t in tasks]
logger.debug(f"Gathering task classifications for {self.rev}...")
# Gather task classifications.
try:
classifications = data.handler.get(
"push_tasks_classifications", branch=self.branch, rev=self.rev
)
for task in tasks:
if task["id"] in classifications:
task.update(classifications[task["id"]])
except MissingDataError:
pass
tasks = [Task.create(**task) for task in tasks]
# Gather group data.
logger.debug(f"Gathering test groups for {self.rev}...")
done, _ = concurrent.futures.wait(
[
Push.THREAD_POOL_EXECUTOR.submit(
lambda task: task.retrieve_results(self), task
)
for task in tasks
# No need to gather group data for a completed task that was already cached
if task.id not in completed_cached_tasks and isinstance(task, TestTask)
],
return_when=concurrent.futures.FIRST_EXCEPTION,
)
failed = set()
for f in done:
try:
f.result()
except SourcesNotFound as e:
task = e.context["task"]
failed.add(f"{task.id} - {task.label}")
if failed:
failed_str = " \n".join(sorted(failed))
logger.warning(
f"Failed to get test groups from the following tasks:\n {failed_str}"
)
logger.debug(f"Retrieved all tasks and groups which run on {self.rev}.")
# Skip tier tasks greater than the tier passed in config
tasks = [task for task in tasks if not task.tier or task.tier <= config.tier]
# cachy's put() overwrites the value in the cache; add() would only add if its empty
config.cache.put(
f"{self.push_uuid}/tasks",
tasks,
config["cache"]["retention"],
)
logger.debug(f"Cached all tasks and groups which run on {self.rev}.")
return tasks