bot/code_coverage_bot/chunk_mapping.py (240 lines of code) (raw):

# -*- coding: utf-8 -*- import concurrent.futures import os import sqlite3 import tarfile from collections import defaultdict from concurrent.futures import ThreadPoolExecutor import requests import structlog from code_coverage_bot import grcov from code_coverage_bot import taskcluster logger = structlog.get_logger(__name__) ACTIVEDATA_QUERY_URL = "http://activedata.allizom.org/query" PLATFORMS = ["linux", "windows"] IGNORED_SUITE_PREFIXES = ["awsy", "talos", "test-coverage", "test-coverage-wpt"] # TODO: Calculate this dynamically when https://github.com/klahnakoski/ActiveData-ETL/issues/40 is fixed. TEST_COVERAGE_SUITES = [ "reftest", "web-platform", "mochitest", "xpcshell", "jsreftest", "crashtest", ] def get_suites(revision): r = requests.post( ACTIVEDATA_QUERY_URL, json={ "from": "unittest", "where": { "and": [ {"eq": {"repo.branch.name": "mozilla-central"}}, {"eq": {"repo.changeset.id12": revision[:12]}}, {"regexp": {"run.key": ".*-ccov.*/.*"}}, ] }, "limit": 500000, "groupby": ["run.suite.fullname"], }, ) suites_data = r.json()["data"] return [e[0] for e in suites_data] # Retrieve chunk -> tests mapping from ActiveData. def get_tests_chunks(revision, platform, suite): r = requests.post( ACTIVEDATA_QUERY_URL, json={ "from": "unittest", "where": { "and": [ {"eq": {"repo.branch.name": "mozilla-central"}}, {"eq": {"repo.changeset.id12": revision[:12]}}, {"eq": {"run.suite.fullname": suite}}, {"regexp": {"run.key": f".*-{platform}.*-ccov.*/.*"}}, ] }, "limit": 50000, "select": ["result.test", "run.key"], }, ) return r.json()["data"] def group_by_20k(data): groups = defaultdict(list) total_count = 0 for elem, count in data: total_count += count groups[total_count // 20000].append(elem) return groups.values() def get_test_coverage_suites(): r = requests.post( ACTIVEDATA_QUERY_URL, json={ "from": "coverage", "where": { "and": [ {"eq": {"repo.branch.name": "mozilla-central"}}, {"gte": {"repo.push.date": {"date": "today-week"}}}, {"gt": {"source.file.total_covered": 0}}, {"exists": "test.name"}, ] }, "limit": 50000, "select": {"aggregate": "cardinality", "value": "test.name"}, "groupby": ["test.suite"], }, ) return r.json()["data"] def get_test_coverage_tests(suites): r = requests.post( ACTIVEDATA_QUERY_URL, json={ "from": "coverage", "where": { "and": [ {"eq": {"repo.branch.name": "mozilla-central"}}, {"gte": {"repo.push.date": {"date": "today-week"}}}, {"gt": {"source.file.total_covered": 0}}, {"exists": "test.name"}, {"in": {"test.suite": suites}}, ] }, "limit": 50000, "select": {"aggregate": "cardinality", "value": "source.file.name"}, "groupby": ["test.name"], }, ) return r.json()["data"] def get_test_coverage_files(tests): r = requests.post( ACTIVEDATA_QUERY_URL, json={ "from": "coverage", "where": { "and": [ {"eq": {"repo.branch.name": "mozilla-central"}}, {"gte": {"repo.push.date": {"date": "today-week"}}}, {"gt": {"source.file.total_covered": 0}}, {"exists": "test.name"}, {"in": {"test.name": tests}}, ] }, "limit": 50000, "select": ["source.file.name", "test.name"], }, ) return r.json()["data"] def is_chunk_only_suite(suite): # Ignore test-coverage, test-coverage-wpt, awsy and talos. if any(suite.startswith(prefix) for prefix in IGNORED_SUITE_PREFIXES): return False # Ignore suites supported by test-coverage. if any( test_coverage_suite in suite for test_coverage_suite in TEST_COVERAGE_SUITES ): return False return True def _inner_generate( repo_dir, revision, artifactsHandler, per_test_cursor, per_chunk_cursor, executor ): per_test_cursor.execute( "CREATE TABLE file_to_chunk (path text, platform text, chunk text)" ) per_test_cursor.execute( "CREATE TABLE chunk_to_test (platform text, chunk text, path text)" ) per_test_cursor.execute("CREATE TABLE file_to_test (source text, test text)") per_chunk_cursor.execute( "CREATE TABLE file_to_chunk (path text, platform text, chunk text)" ) per_chunk_cursor.execute( "CREATE TABLE chunk_to_test (platform text, chunk text, path text)" ) logger.info("Populating file_to_test table.") test_coverage_suites = get_test_coverage_suites() logger.info("Found {} test suites.".format(len(test_coverage_suites))) for suites in group_by_20k(test_coverage_suites): test_coverage_tests = get_test_coverage_tests(suites) for tests in group_by_20k(test_coverage_tests): tests_files_data = get_test_coverage_files(tests) source_names = tests_files_data["source.file.name"] test_iter = enumerate(tests_files_data["test.name"]) source_test_iter = ((source_names[i], test) for i, test in test_iter) per_test_cursor.executemany( "INSERT INTO file_to_test VALUES (?,?)", source_test_iter ) futures = {} for platform in PLATFORMS: logger.info("Reading chunk coverage artifacts for {}.".format(platform)) for chunk in artifactsHandler.get_chunks(platform): assert chunk.strip() != "", "chunk can not be an empty string" artifacts = artifactsHandler.get(platform=platform, chunk=chunk) assert len(artifacts) > 0, "There should be at least one artifact" future = executor.submit(grcov.files_list, artifacts, source_dir=repo_dir) futures[future] = (platform, chunk) logger.info("Populating chunk_to_test table for {}.".format(platform)) for suite in get_suites(revision): tests_data = get_tests_chunks(revision, platform, suite) if len(tests_data) == 0: logger.warn( "No tests found for platform {} and suite {}.".format( platform, suite ) ) continue logger.info( "Adding tests for platform {} and suite {}".format(platform, suite) ) task_names = tests_data["run.key"] def chunk_test_iter(): test_iter = enumerate(tests_data["result.test"]) return ( (platform, taskcluster.name_to_chunk(task_names[i]), test) for i, test in test_iter ) if is_chunk_only_suite(suite): per_test_cursor.executemany( "INSERT INTO chunk_to_test VALUES (?,?,?)", chunk_test_iter() ) per_chunk_cursor.executemany( "INSERT INTO chunk_to_test VALUES (?,?,?)", chunk_test_iter() ) logger.info("Populating file_to_chunk table.") for future in concurrent.futures.as_completed(futures): (platform, chunk) = futures[future] files = future.result() suite = taskcluster.chunk_to_suite(chunk) if is_chunk_only_suite(suite): per_test_cursor.executemany( "INSERT INTO file_to_chunk VALUES (?,?,?)", ((f, platform, chunk) for f in files), ) per_chunk_cursor.executemany( "INSERT INTO file_to_chunk VALUES (?,?,?)", ((f, platform, chunk) for f in files), ) def generate(repo_dir, revision, artifactsHandler, out_dir="."): logger.info("Generating chunk mapping...") # TODO: Change chunk_mapping to test_mapping, but the name should be synced in mozilla-central # in the coverage selector! per_test_sqlite_file = os.path.join(out_dir, "chunk_mapping.sqlite") per_test_tarxz_file = os.path.join(out_dir, "chunk_mapping.tar.xz") per_chunk_sqlite_file = os.path.join(out_dir, "per_chunk_mapping.sqlite") per_chunk_tarxz_file = os.path.join(out_dir, "per_chunk_mapping.tar.xz") logger.info("Creating tables.") with sqlite3.connect(per_test_sqlite_file) as per_test_conn: per_test_cursor = per_test_conn.cursor() with sqlite3.connect(per_chunk_sqlite_file) as per_chunk_conn: per_chunk_cursor = per_chunk_conn.cursor() with ThreadPoolExecutor(max_workers=4) as executor: _inner_generate( repo_dir, revision, artifactsHandler, per_test_cursor, per_chunk_cursor, executor, ) logger.info( "Writing the per-test mapping archive at {}.".format(per_test_tarxz_file) ) with tarfile.open(per_test_tarxz_file, "w:xz") as tar: tar.add(per_test_sqlite_file, os.path.basename(per_test_sqlite_file)) logger.info( "Writing the per-chunk mapping archive at {}.".format(per_chunk_tarxz_file) ) with tarfile.open(per_chunk_tarxz_file, "w:xz") as tar: tar.add(per_chunk_sqlite_file, os.path.basename(per_chunk_sqlite_file))