bot/code_coverage_bot/hooks/base.py (135 lines of code) (raw):

# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import concurrent.futures import os from datetime import datetime from datetime import timedelta import hglib import structlog from code_coverage_bot import config from code_coverage_bot import grcov from code_coverage_bot import taskcluster from code_coverage_bot.artifacts import ArtifactsHandler from code_coverage_bot.taskcluster import taskcluster_config from code_coverage_bot.utils import ThreadPoolExecutorResult logger = structlog.get_logger(__name__) class Hook(object): def __init__( self, repository, revision, task_name_filter, cache_root, working_dir, required_platforms=[], ): os.makedirs(working_dir, exist_ok=True) self.artifacts_dir = os.path.join(working_dir, "ccov-artifacts") self.reports_dir = os.path.join(working_dir, "ccov-reports") logger.info( "Local storage initialized.", artifacts=self.artifacts_dir, reports=self.reports_dir, ) self.repository = repository self.revision = revision assert ( self.revision is not None and self.repository is not None ), "Missing repo/revision" logger.info( "Mercurial setup", repository=self.repository, revision=self.revision ) if cache_root is not None: assert os.path.isdir(cache_root), f"Cache root {cache_root} is not a dir." self.repo_dir = os.path.join(cache_root, self.branch) # Load coverage tasks for all platforms decision_task_id = taskcluster.get_decision_task(self.branch, self.revision) assert decision_task_id is not None, "The decision task couldn't be found" group = taskcluster.get_task_details(decision_task_id)["taskGroupId"] test_tasks = [ task for task in taskcluster.get_tasks_in_group(group) if taskcluster.is_coverage_task(task["task"]) ] # Check the required platforms are present platforms = set( taskcluster.get_platform(test_task["task"]) for test_task in test_tasks ) for platform in required_platforms: assert platform in platforms, f"{platform} missing in the task group." self.artifactsHandler = ArtifactsHandler( test_tasks, self.artifacts_dir, task_name_filter ) @property def branch(self): return self.repository[len(config.HG_BASE) :] def clone_repository(self): cmd = hglib.util.cmdbuilder( "robustcheckout", self.repository, self.repo_dir, purge=True, sharebase="hg-shared", upstream="https://hg.mozilla.org/mozilla-unified", revision=self.revision, networkattempts=7, ) cmd.insert(0, hglib.HGPATH) proc = hglib.util.popen(cmd) out, err = proc.communicate() if proc.returncode: raise hglib.error.CommandError(cmd, proc.returncode, out, err) logger.info("{} cloned".format(self.repository)) def retrieve_source_and_artifacts(self): with ThreadPoolExecutorResult(max_workers=2) as executor: futures = [] # Thread 1 - Download coverage artifacts. futures.append(executor.submit(self.artifactsHandler.download_all)) # Thread 2 - Clone repository. futures.append(executor.submit(self.clone_repository)) for future in concurrent.futures.as_completed(futures): exc = future.exception() if exc is not None: logger.error( "Exception while downloading coverage artifacts or cloning repository", exception=exc, ) from traceback import format_exception logger.error(format_exception(exc, exc, exc.__traceback__)) os._exit(1) def build_reports(self, only=None): """ Build all the possible covdir reports using current artifacts """ os.makedirs(self.reports_dir, exist_ok=True) reports = {} for ( (platform, suite), artifacts, ) in self.artifactsHandler.get_combinations().items(): if only is not None and (platform, suite) not in only: continue # Generate covdir report for that suite & platform logger.info( "Building covdir suite report", suite=suite, platform=platform, artifacts=len(artifacts), ) output = grcov.report( artifacts, source_dir=self.repo_dir, out_format="covdir" ) # Write output on FS path = os.path.join(self.reports_dir, f"{platform}.{suite}.json") with open(path, "wb") as f: f.write(output) reports[(platform, suite)] = path return reports def index_task(self, namespaces, ttl=180): """ Index current task on Taskcluster Index TTL is expressed in days """ assert isinstance(ttl, int) and ttl > 0 task_id = os.environ.get("TASK_ID") if task_id is None: logger.warning("Skipping Taskcluster indexation, no task id found.") return index_service = taskcluster_config.get_service("index") for namespace in namespaces: index_service.insertTask( namespace, { "taskId": task_id, "rank": 0, "data": {}, "expires": (datetime.utcnow() + timedelta(ttl)).strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ), }, )