scripts/metric_reporter/reporter/coverage_reporter.py (252 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """Module for reporting test suite coverage results.""" from datetime import datetime from typing import Any, Sequence from google.api_core.exceptions import GoogleAPIError from google.cloud.bigquery import ArrayQueryParameter, Client, QueryJobConfig, ScalarQueryParameter from scripts.metric_reporter.constants import DATETIME_FORMAT from scripts.metric_reporter.parser.coverage_json_parser import ( CoverageJson, LlvmCovReport, LlvmCovTotals, PytestReport, PytestTotals, ) from scripts.metric_reporter.reporter.base_reporter import ( BaseReporter, ReporterResultBase, ReporterError, ) class CoverageReporterResult(ReporterResultBase): """Represents the coverage of a test suite run.""" repository: str workflow: str test_suite: str # llvm-cov doesn't have a timestamp as part of their report, so timestamp and date may not be # available if CircleCI can't be used to fill in the gap date: str | None = None timestamp: str | None = None job: int line_count: int | None line_covered: int | None line_not_covered: int | None line_excluded: int | None = None # pytest only line_percent: float | None function_count: int | None = None # llvm-cov only function_covered: int | None = None # llvm-cov only function_not_covered: int | None = None # llvm-cov only function_percent: float | None = None # llvm-cov only branch_count: int | None branch_covered: int | None branch_not_covered: int | None branch_percent: float | None def dict_with_fieldnames(self) -> dict[str, Any]: """Convert the coverage result to a dictionary with field names. Returns: dict[str, Any]: Dictionary representation of the coverage result. """ return { "Repository": self.repository, "Workflow": self.workflow, "Test Suite": self.test_suite, "Date": self.date, "Timestamp": self.timestamp, "Job Number": self.job, "Line Count": self.line_count, "Line Covered": self.line_covered, "Line Not Covered": self.line_not_covered, "Line Excluded": self.line_excluded, "Line Percent": self.line_percent, "Function Count": self.function_count, "Function Covered": self.function_covered, "Function Not Covered": self.function_not_covered, "Function Percent": self.function_percent, "Branch Count": self.branch_count, "Branch Covered": self.branch_covered, "Branch Not Covered": self.branch_not_covered, "Branch Percent": self.branch_percent, } class CoverageReporter(BaseReporter): """Handles the reporting of coverage results.""" def __init__( self, repository: str, workflow: str, test_suite: str, coverage_artifact_list: list[CoverageJson] | None, ) -> None: """Initialize the reporter with the coverage data. Args: repository (str): The repository associated to the test suite. workflow (str): The workflow associated to the test suite. test_suite (str): The test suite name. coverage_artifact_list (list[LlvmCovReport | PytestReport]): The coverage report data from test suites. """ super().__init__() self.repository = repository self.workflow = workflow self.test_suite = test_suite self.results: Sequence[CoverageReporterResult] = self._parse_results( coverage_artifact_list ) def update_table(self, client: Client, project_id: str, dataset_name: str) -> None: """Update the BigQuery table with new results. Args: client (Client): The BigQuery client to interact with BigQuery. project_id (str): The BigQuery project ID. dataset_name (str): The BigQuery dataset name. """ table_id = f"{project_id}.{dataset_name}.{self._normalize_name(self.repository)}_coverage" if not self.results: self.logger.warning( f"There are no results for {self.repository}/{self.workflow}/{self.test_suite} to " f"add to {table_id}." ) return last_update: datetime | None = self._get_last_update(client, table_id) # If no 'last_update' insert all results, else insert results that occur after the last # update timestamp new_results: Sequence[CoverageReporterResult] = ( self.results if not last_update else [ r for r in self.results if r.timestamp and datetime.strptime(r.timestamp, DATETIME_FORMAT) > last_update ] ) if not new_results: self.logger.warning( f"There are no new results for {self.repository}/{self.workflow}/{self.test_suite} " f"to add to {table_id}." ) return self._insert_rows(client, table_id, new_results) def _check_rows_exist( self, client: Client, table_id: str, results: Sequence[CoverageReporterResult] ) -> bool: query = f""" SELECT 1 FROM `{table_id}` WHERE Repository = @repository AND Workflow = @workflow AND `Test Suite` = @test_suite AND `Job Number` IN UNNEST(@job_numbers) LIMIT 1 """ # nosec jobs: list[int] = [result.job for result in results] query_parameters = [ ScalarQueryParameter("repository", "STRING", self.repository), ScalarQueryParameter("workflow", "STRING", self.workflow), ScalarQueryParameter("test_suite", "STRING", self.test_suite), ArrayQueryParameter("job_numbers", "INT64", jobs), ] job_config = QueryJobConfig(query_parameters=query_parameters) try: query_job = client.query(query, job_config=job_config) return any(query_job.result()) except (GoogleAPIError, TypeError, ValueError) as error: error_mapping: dict[type, str] = { GoogleAPIError: f"Error executing query: {query}", TypeError: f"The query, {query}, has an invalid format or type", ValueError: f"The table name {table_id} is invalid", } error_msg: str = next(m for t, m in error_mapping.items() if isinstance(error, t)) self.logger.error(error_msg, exc_info=error) raise ReporterError(error_msg) from error def _get_last_update(self, client: Client, table_id: str) -> datetime | None: query = f""" SELECT FORMAT_TIMESTAMP('{DATETIME_FORMAT}', MAX(`Timestamp`)) as last_update FROM `{table_id}` WHERE Repository = @repository AND Workflow = @workflow AND `Test Suite` = @test_suite """ # nosec query_parameters = [ ScalarQueryParameter("repository", "STRING", self.repository), ScalarQueryParameter("workflow", "STRING", self.workflow), ScalarQueryParameter("test_suite", "STRING", self.test_suite), ] job_config = QueryJobConfig(query_parameters=query_parameters) try: query_job = client.query(query, job_config=job_config) result = query_job.result() for row in result: last_update: str | None = row["last_update"] return datetime.strptime(last_update, DATETIME_FORMAT) if last_update else None except (GoogleAPIError, TypeError, ValueError) as error: error_mapping: dict[type, str] = { GoogleAPIError: f"Error executing query: {query}", TypeError: f"The query, {query}, has an invalid format or type", ValueError: f"The table name {table_id} is invalid", } error_msg: str = next(m for t, m in error_mapping.items() if isinstance(error, t)) self.logger.error(error_msg, exc_info=error) raise ReporterError(error_msg) from error return None def _insert_rows( self, client: Client, table_id: str, results: Sequence[CoverageReporterResult] ) -> None: results_exist: bool = self._check_rows_exist(client, table_id, results) if results_exist: self.logger.warning( f"Detected one or more results from " f"{self.repository}/{self.workflow}/{self.test_suite} already exist in table " f"{table_id}. Aborting insert." ) return try: json_rows: list[dict[str, Any]] = [ results.dict_with_fieldnames() for results in results ] errors = client.insert_rows_json(table_id, json_rows) if errors: client_error_msg: str = ( f"Failed to insert rows from " f"{self.repository}/{self.workflow}/{self.test_suite} into {table_id}: {errors}" ) self.logger.error(client_error_msg) raise ReporterError(client_error_msg) self.logger.info( f"Inserted {len(results)} results from " f"{self.repository}/{self.workflow}/{self.test_suite} into {table_id}." ) except (TypeError, ValueError) as error: error_mapping: dict[type, str] = { TypeError: f"data is an improper format for insertion in {table_id}", ValueError: f"The table name {table_id} is invalid", } error_msg: str = next(m for t, m in error_mapping.items() if isinstance(error, t)) self.logger.error(error_msg, exc_info=error) raise ReporterError(error_msg) from error def _parse_results( self, coverage_artifact_list: list[CoverageJson] | None ) -> Sequence[CoverageReporterResult]: if coverage_artifact_list is None: return [] results: list[CoverageReporterResult] = [] for artifact in coverage_artifact_list: if isinstance(artifact, LlvmCovReport): results.append(self._parse_llvm_cov_report(artifact)) elif isinstance(artifact, PytestReport): results.append(self._parse_pytest_report(artifact)) else: raise ReporterError(f"Unknown coverage type: {type(artifact)}") return results def _parse_llvm_cov_report(self, llvm_cov_report: LlvmCovReport) -> CoverageReporterResult: if not len(llvm_cov_report.data) == 1: raise ReporterError( f"The coverage report for {self.repository}-{self.workflow}-{self.test_suite} " f"has an unexpected number of items in 'data'." ) totals: LlvmCovTotals = llvm_cov_report.data[0].totals return CoverageReporterResult( repository=self.repository, workflow=self.workflow, test_suite=self.test_suite, timestamp=llvm_cov_report.job_timestamp, date=( self._extract_date(llvm_cov_report.job_timestamp) if llvm_cov_report.job_timestamp else None ), job=llvm_cov_report.job_number, line_count=totals.lines.count, line_covered=totals.lines.covered, line_not_covered=totals.lines.count - totals.lines.covered, line_percent=totals.lines.percent, function_count=totals.functions.count, function_covered=totals.functions.covered, function_not_covered=totals.functions.count - totals.functions.covered, function_percent=totals.functions.percent, branch_count=totals.branches.count, branch_covered=totals.branches.covered, branch_not_covered=totals.branches.count - totals.branches.covered, branch_percent=totals.branches.percent, ) def _parse_pytest_report(self, pytest_report: PytestReport) -> CoverageReporterResult: totals: PytestTotals = pytest_report.totals return CoverageReporterResult( repository=self.repository, workflow=self.workflow, test_suite=self.test_suite, timestamp=pytest_report.job_timestamp, date=self._extract_date(pytest_report.job_timestamp) if pytest_report.job_timestamp else None, job=pytest_report.job_number, line_count=totals.num_statements, line_covered=totals.covered_lines, line_not_covered=totals.missing_lines, line_excluded=totals.excluded_lines, line_percent=totals.percent_covered, branch_count=totals.num_branches, branch_covered=totals.covered_branches, branch_not_covered=totals.missing_branches, branch_percent=( (totals.covered_branches / totals.num_branches) * 100 if totals.num_branches else 0.0 ), )