bot/code_coverage_bot/commit_coverage.py (119 lines of code) (raw):

# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import concurrent.futures import io import json import os import threading import time import hglib import structlog import zstandard from tqdm import tqdm from code_coverage_bot import hgmo from code_coverage_bot.phabricator import PhabricatorUploader from code_coverage_bot.secrets import secrets from code_coverage_bot.utils import ThreadPoolExecutorResult from code_coverage_tools.gcp import DEFAULT_FILTER from code_coverage_tools.gcp import download_report from code_coverage_tools.gcp import get_bucket from code_coverage_tools.gcp import get_name from code_coverage_tools.gcp import list_reports logger = structlog.get_logger(__name__) hg_servers = list() hg_servers_lock = threading.Lock() thread_local = threading.local() def _init_thread(repo_dir: str) -> None: hg_server = hglib.open(repo_dir) thread_local.hg = hg_server with hg_servers_lock: hg_servers.append(hg_server) def generate(server_address: str, repo_dir: str, out_dir: str = ".") -> None: start_time = time.monotonic() commit_coverage_path = os.path.join(out_dir, "commit_coverage.json.zst") assert ( secrets[secrets.GOOGLE_CLOUD_STORAGE] is not None ), "Missing GOOGLE_CLOUD_STORAGE secret" bucket = get_bucket(secrets[secrets.GOOGLE_CLOUD_STORAGE]) blob = bucket.blob("commit_coverage.json.zst") if blob.exists(): dctx = zstandard.ZstdDecompressor() commit_coverage = json.loads( dctx.decompress(blob.download_as_bytes(raw_download=True)) ) else: commit_coverage = {} cctx = zstandard.ZstdCompressor(threads=-1) def _upload(): blob = bucket.blob("commit_coverage.json.zst") blob.upload_from_string( cctx.compress(json.dumps(commit_coverage).encode("ascii")) ) blob.content_type = "application/json" blob.content_encoding = "zstd" blob.patch() # We are only interested in "overall" coverage, not platform or suite specific. changesets_to_analyze = [ changeset for changeset, platform, suite in list_reports(bucket, "mozilla-central") if platform == DEFAULT_FILTER and suite == DEFAULT_FILTER ] # Skip already analyzed changesets. changesets_to_analyze = [ changeset for changeset in changesets_to_analyze if changeset not in commit_coverage ] # Use the local server to generate the coverage mapping, as it is faster and # correct. def analyze_changeset(changeset_to_analyze: str) -> None: report_name = get_name( "mozilla-central", changeset_to_analyze, DEFAULT_FILTER, DEFAULT_FILTER ) assert download_report( os.path.join(out_dir, "ccov-reports"), bucket, report_name ) with open( os.path.join(out_dir, "ccov-reports", f"{report_name}.json"), "r" ) as f: report = json.load(f) phabricatorUploader = PhabricatorUploader( repo_dir, changeset_to_analyze, warnings_enabled=False ) # Use the hg.mozilla.org server to get the automation relevant changesets, since # this information is broken in our local repo (which mozilla-unified). with hgmo.HGMO(server_address=server_address) as hgmo_remote_server: changesets = hgmo_remote_server.get_automation_relevance_changesets( changeset_to_analyze ) results = phabricatorUploader.generate(thread_local.hg, report, changesets) for changeset in changesets: # Lookup changeset coverage from phabricator uploader coverage = results.get(changeset["node"]) if coverage is None: logger.info("No coverage found", changeset=changeset) commit_coverage[changeset["node"]] = None continue commit_coverage[changeset["node"]] = { "added": sum(c["lines_added"] for c in coverage["paths"].values()), "covered": sum(c["lines_covered"] for c in coverage["paths"].values()), "unknown": sum(c["lines_unknown"] for c in coverage["paths"].values()), } max_workers = min(32, (os.cpu_count() or 1) + 4) logger.info(f"Analyzing {len(changesets_to_analyze)} with {max_workers} workers") with ThreadPoolExecutorResult( initializer=_init_thread, initargs=(repo_dir,) ) as executor: futures = [ executor.submit(analyze_changeset, changeset) for changeset in changesets_to_analyze ] for changeset, future in tqdm( zip(changesets_to_analyze, concurrent.futures.as_completed(futures)), total=len(futures), ): exc = future.exception() if exc is not None: logger.error(f"Exception {exc} while analyzing {changeset}") if time.monotonic() - start_time >= 600: _upload() start_time = time.monotonic() while len(hg_servers) > 0: hg_server = hg_servers.pop() hg_server.close() _upload() with open(commit_coverage_path, "wb") as zf: with cctx.stream_writer(zf) as compressor: with io.TextIOWrapper(compressor, encoding="ascii") as f: json.dump(commit_coverage, f)