bot/code_coverage_bot/artifacts.py (139 lines of code) (raw):

# -*- coding: utf-8 -*- import collections import concurrent.futures import fnmatch import itertools import os import time import structlog from code_coverage_bot import taskcluster from code_coverage_bot.utils import ThreadPoolExecutorResult logger = structlog.get_logger(__name__) Artifact = collections.namedtuple("Artifact", "path, task_id, platform, suite, chunk") SUITES_TO_IGNORE = [ "awsy", "talos", ] # Ignore awsy and talos as they aren't actually suites of tests. STATUS_VALUE = {"exception": 1, "failed": 2, "completed": 3} class ArtifactsHandler(object): def __init__(self, test_tasks, parent_dir="ccov-artifacts", task_name_filter="*"): self.test_tasks = test_tasks self.parent_dir = parent_dir self.task_name_filter = task_name_filter self.artifacts = [] def generate_path(self, platform, chunk, artifact): file_name = "%s_%s_%s" % (platform, chunk, os.path.basename(artifact["name"])) return os.path.join(self.parent_dir, file_name) def get_chunks(self, platform): return set( artifact.chunk for artifact in self.artifacts if artifact.platform == platform ) def get_combinations(self): # Add the full report out = collections.defaultdict(list) out[("all", "all")] = [artifact.path for artifact in self.artifacts] # Group by suite first suites = itertools.groupby( sorted(self.artifacts, key=lambda a: a.suite), lambda a: a.suite ) for suite, artifacts in suites: artifacts = list(artifacts) # List all available platforms platforms = {a.platform for a in artifacts} platforms.add("all") # And list all possible permutations with suite + platform out[("all", suite)] += [artifact.path for artifact in artifacts] for platform in platforms: if platform != "all": out[(platform, "all")] += [ artifact.path for artifact in artifacts if artifact.platform == platform ] out[(platform, suite)] = [ artifact.path for artifact in artifacts if platform == "all" or artifact.platform == platform ] return out def get(self, platform=None, suite=None, chunk=None): if suite is not None and chunk is not None: raise Exception("suite and chunk can't both have a value") # Filter artifacts according to platform, suite and chunk. filtered_files = [] for artifact in self.artifacts: if platform is not None and artifact.platform != platform: continue if suite is not None and artifact.suite != suite: continue if chunk is not None and artifact.chunk != chunk: continue filtered_files.append(artifact.path) return filtered_files def download(self, test_task): suite = taskcluster.get_suite(test_task["task"]) chunk_name = taskcluster.get_chunk(test_task["task"]) platform_name = taskcluster.get_platform(test_task["task"]) test_task_id = test_task["status"]["taskId"] for artifact in taskcluster.get_task_artifacts(test_task_id): if not any( n in artifact["name"] for n in ["code-coverage-grcov.zip", "code-coverage-jsvm.zip"] ): continue artifact_path = self.generate_path(platform_name, chunk_name, artifact) taskcluster.download_artifact(artifact_path, test_task_id, artifact["name"]) logger.info("%s artifact downloaded" % artifact_path) self.artifacts.append( Artifact(artifact_path, test_task_id, platform_name, suite, chunk_name) ) def is_filtered_task(self, task): """ Apply name filter from CLI args on task name """ assert isinstance(task, dict) name = task["task"]["metadata"]["name"] if not fnmatch.fnmatch(name, self.task_name_filter): logger.debug("Filtered task", name=name) return True return False def download_all(self) -> None: os.makedirs(self.parent_dir, exist_ok=True) logger.info("Downloading artifacts from {} tasks".format(len(self.test_tasks))) for test_task in self.test_tasks: status = test_task["status"]["state"] task_id = test_task["status"]["taskId"] if status in taskcluster.FINISHED_STATUSES: continue while True: # refresh the status information task_status = taskcluster.get_task_status(task_id) status = task_status["status"]["state"] assert ( status in taskcluster.ALL_STATUSES ), "State '{}' not recognized".format(status) if status in taskcluster.FINISHED_STATUSES: # Update the task status, as we will use it to compare statuses later. test_task["status"]["state"] = status break logger.info(f"Waiting for task {task_id} to finish...") time.sleep(60) # Choose best tasks to download (e.g. 'completed' is better than 'failed') download_tasks = {} for test_task in self.test_tasks: status = test_task["status"]["state"] assert ( status in taskcluster.FINISHED_STATUSES ), "State '{}' not recognized".format(status) chunk_name = taskcluster.get_chunk(test_task["task"]) platform_name = taskcluster.get_platform(test_task["task"]) if any(to_ignore in chunk_name for to_ignore in SUITES_TO_IGNORE): continue if (chunk_name, platform_name) not in download_tasks: # If the chunk hasn't been downloaded before, this is obviously the best task # to download it from. download_tasks[(chunk_name, platform_name)] = test_task else: # Otherwise, compare the status of this task with the previously selected task. prev_task = download_tasks[(chunk_name, platform_name)] if STATUS_VALUE[status] > STATUS_VALUE[prev_task["status"]["state"]]: download_tasks[(chunk_name, platform_name)] = test_task with ThreadPoolExecutorResult() as executor: futures = [ executor.submit(self.download, test_task) for test_task in download_tasks.values() ] for future in concurrent.futures.as_completed(futures): exc = future.exception() if exc is not None: logger.error("Exception while downloading artifacts", exception=exc) for f in futures: f.cancel() logger.info("Code coverage artifacts downloaded")