scripts/metric_reporter/reporter/suite_reporter.py (299 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""Module for reporting test suite results from CircleCI metadata."""
from datetime import datetime
from enum import Enum
from typing import Any, Sequence
from google.api_core.exceptions import GoogleAPIError
from google.cloud.bigquery import ArrayQueryParameter, Client, QueryJobConfig, ScalarQueryParameter
from pydantic import BaseModel
from scripts.metric_reporter.constants import DATETIME_FORMAT
from scripts.metric_reporter.parser.junit_xml_parser import (
JestJUnitXmlTestSuites,
JUnitXmlJobTestSuites,
MochaJUnitXmlTestSuites,
NextestJUnitXmlTestSuites,
PlaywrightJUnitXmlTestSuites,
PytestJUnitXmlTestSuites,
TapJUnitXmlTestSuites,
)
from scripts.metric_reporter.reporter.base_reporter import (
BaseReporter,
ReporterError,
ReporterResultBase,
)
SUCCESS_RESULTS = {"success", "system-out"}
FAILURE_RESULT = "failure"
SKIPPED_RESULT = "skipped"
CANCELED_JOB_STATUS = "canceled"
RUNNING_JOB_STATUS = "running"
class Status(Enum):
"""Overall status of the test suite."""
SUCCESS = "success"
FAILED = "failed"
class SuiteReporterResult(ReporterResultBase):
"""Represents the results of a test suite run."""
repository: str
workflow: str
test_suite: str
timestamp: str
date: str
job: int
@property
def status(self) -> Status:
"""Test Suite status"""
if self.failure > 0:
return Status.FAILED
return Status.SUCCESS
# The summation of all test run times in seconds. Parallelization is not taken into
# consideration.
# Not supported by TAP
run_time: float = 0
# Equal to the longest run_time in seconds when tests are run in parallel. We know tests are run
# in parallel if we have multiple reports for a repository/workflow/test_suite.
# Not supported by TAP
execution_time: float | None = None
success: int = 0
failure: int = 0
skipped: int = 0
# An annotation available in Playwright only. Subset of 'skipped'.
fixme: int = 0
# The number of tests that were the result of a re-execution. It is possible that the same test
# is re-executed more than once. Playwright only.
retry: int = 0
@property
def total(self) -> int:
"""Calculate the total number of tests."""
return self.success + self.failure + self.skipped
@property
def success_rate(self) -> float | None:
"""Calculate the success rate of the test suite."""
return self._calculate_rate(self.success, self.total)
@property
def failure_rate(self) -> float | None:
"""Calculate the failure rate of the test suite."""
return self._calculate_rate(self.failure, self.total)
@property
def skipped_rate(self) -> float | None:
"""Calculate the skipped rate of the test suite."""
return self._calculate_rate(self.skipped, self.total)
@property
def fixme_rate(self) -> float | None:
"""Calculate the fixme rate of the test suite."""
return self._calculate_rate(self.fixme, self.total)
@staticmethod
def _calculate_rate(value: int, total: int) -> float | None:
"""Calculate the percentage rate of a given value over the total.
Args:
value (int): The numerator for the rate calculation.
total (int): The denominator for the rate calculation.
Returns:
float | None: The calculated rate as a percentage, or None if the total is 0.
"""
return round((value / total) * 100, 2) if total > 0 else None
def dict_with_fieldnames(self) -> dict[str, Any]:
"""Convert the test suite result to a dictionary with field names.
Returns:
dict[str, Any]: Dictionary representation of the test suite result.
"""
return {
"Repository": self.repository,
"Workflow": self.workflow,
"Test Suite": self.test_suite,
"Date": self.date,
"Timestamp": self.timestamp,
"Job Number": self.job,
"Status": self.status.value,
"Execution Time": self.execution_time,
"Run Time": self.run_time,
"Success": self.success,
"Failure": self.failure,
"Skipped": self.skipped,
"Fixme": self.fixme,
"Retry Count": self.retry,
"Total": self.total,
"Success Rate": self.success_rate,
"Failure Rate": self.failure_rate,
"Skipped Rate": self.skipped_rate,
"Fixme Rate": self.fixme_rate,
}
class SuiteMetrics(BaseModel):
"""Represents the results of a test suite."""
time: float | None = None
tests: int = 0
failure: int = 0
skipped: int = 0
fixme: int = 0
retry: int = 0
@property
def success(self) -> int:
"""Calculate the number of tests that succeeded."""
return self.tests - self.failure - self.skipped
class SuiteReporter(BaseReporter):
"""Handles the reporting of test suite results from CircleCI metadata and JUnit XML Reports."""
def __init__(
self,
repository: str,
workflow: str,
test_suite: str,
junit_artifact_list: list[JUnitXmlJobTestSuites] | None,
) -> None:
"""Initialize the reporter with the directory containing test result data.
Args:
repository (str): The repository associated to the test suite.
workflow (str): The workflow associated to the test suite.
test_suite (str): The test suite name.
junit_artifact_list (list[JUnitXmlJobTestSuites] | None): The test results from JUnit
XML artifacts.
"""
super().__init__()
self.repository = repository
self.workflow = workflow
self.test_suite = test_suite
self.results: Sequence[SuiteReporterResult] = self._parse_results(junit_artifact_list)
def update_table(self, client: Client, project_id: str, dataset_name: str) -> None:
"""Update the BigQuery table with new results.
Args:
client (Client): The BigQuery client to interact with BigQuery.
project_id (str): The BigQuery project ID.
dataset_name (str): The BigQuery dataset name.
"""
table_id = (
f"{project_id}.{dataset_name}.{self._normalize_name(self.repository)}_suite_results"
)
if not self.results:
self.logger.warning(
f"There are no results for {self.repository}/{self.workflow}/{self.test_suite} to "
f"add to {table_id}."
)
return
last_update: datetime | None = self._get_last_update(client, table_id)
# If no 'last_update' insert all results, else insert results that occur after the last
# update timestamp
new_results: Sequence[SuiteReporterResult] = (
self.results
if not last_update
else [
r
for r in self.results
if r.timestamp and datetime.strptime(r.timestamp, DATETIME_FORMAT) > last_update
]
)
if not new_results:
self.logger.warning(
f"There are no new results for {self.repository}/{self.workflow}/{self.test_suite} "
f"to add to {table_id}."
)
return
self._insert_rows(client, table_id, new_results)
def _check_rows_exist(
self, client: Client, table_id: str, results: Sequence[SuiteReporterResult]
) -> bool:
query = f"""
SELECT 1
FROM `{table_id}`
WHERE
Repository = @repository
AND Workflow = @workflow
AND `Test Suite` = @test_suite
AND `Job Number` IN UNNEST(@job_numbers)
LIMIT 1
""" # nosec
jobs: list[int] = [result.job for result in results]
query_parameters = [
ScalarQueryParameter("repository", "STRING", self.repository),
ScalarQueryParameter("workflow", "STRING", self.workflow),
ScalarQueryParameter("test_suite", "STRING", self.test_suite),
ArrayQueryParameter("job_numbers", "INT64", jobs),
]
job_config = QueryJobConfig(query_parameters=query_parameters)
try:
query_job = client.query(query, job_config=job_config)
return any(query_job.result())
except (GoogleAPIError, TypeError, ValueError) as error:
error_mapping: dict[type, str] = {
GoogleAPIError: f"Error executing query: {query}",
TypeError: f"The query, {query}, has an invalid format or type",
ValueError: f"The table name {table_id} is invalid",
}
error_msg: str = next(m for t, m in error_mapping.items() if isinstance(error, t))
self.logger.error(error_msg, exc_info=error)
raise ReporterError(error_msg) from error
def _get_last_update(self, client: Client, table_id: str) -> datetime | None:
query = f"""
SELECT FORMAT_TIMESTAMP('{DATETIME_FORMAT}', MAX(`Timestamp`)) as last_update
FROM `{table_id}`
WHERE Repository = @repository AND Workflow = @workflow AND `Test Suite` = @test_suite
""" # nosec
query_parameters = [
ScalarQueryParameter("repository", "STRING", self.repository),
ScalarQueryParameter("workflow", "STRING", self.workflow),
ScalarQueryParameter("test_suite", "STRING", self.test_suite),
]
job_config = QueryJobConfig(query_parameters=query_parameters)
try:
query_job = client.query(query, job_config=job_config)
result = query_job.result()
for row in result:
last_update: str | None = row["last_update"]
return datetime.strptime(last_update, DATETIME_FORMAT) if last_update else None
except (GoogleAPIError, TypeError, ValueError) as error:
error_mapping: dict[type, str] = {
GoogleAPIError: f"Error executing query: {query}",
TypeError: f"The query, {query}, has an invalid format or type",
ValueError: f"The table name {table_id} is invalid",
}
error_msg: str = next(m for t, m in error_mapping.items() if isinstance(error, t))
self.logger.error(error_msg, exc_info=error)
raise ReporterError(error_msg) from error
return None
def _insert_rows(
self, client: Client, table_id: str, results: Sequence[SuiteReporterResult]
) -> None:
results_exist: bool = self._check_rows_exist(client, table_id, results)
if results_exist:
self.logger.warning(
f"Detected one or more results from "
f"{self.repository}/{self.workflow}/{self.test_suite} already exist in table "
f"{table_id}. Aborting insert."
)
return
try:
json_rows: list[dict[str, Any]] = [
results.dict_with_fieldnames() for results in results
]
errors = client.insert_rows_json(table_id, json_rows)
if errors:
client_error_msg: str = (
f"Failed to insert rows from "
f"{self.repository}/{self.workflow}/{self.test_suite} into {table_id}: {errors}"
)
self.logger.error(client_error_msg)
raise ReporterError(client_error_msg)
self.logger.info(
f"Inserted {len(results)} results from "
f"{self.repository}/{self.workflow}/{self.test_suite} into {table_id}."
)
except (TypeError, ValueError) as error:
error_mapping: dict[type, str] = {
TypeError: f"data is an improper format for insertion in {table_id}",
ValueError: f"The table name {table_id} is invalid",
}
error_msg: str = next(m for t, m in error_mapping.items() if isinstance(error, t))
self.logger.error(error_msg, exc_info=error)
raise ReporterError(error_msg) from error
@staticmethod
def _extract_suite_metrics(suites) -> SuiteMetrics:
metrics = SuiteMetrics()
match suites:
case JestJUnitXmlTestSuites() | NextestJUnitXmlTestSuites():
metrics.time = suites.time
metrics.tests = suites.tests
metrics.failure = suites.failures + suites.errors
metrics.skipped = sum(suite.skipped for suite in suites.test_suites)
case MochaJUnitXmlTestSuites():
metrics.time = suites.time
# Mocha test reporting has been known to inaccurately total the number of
# tests at the top level, so we count the number of test cases
metrics.tests = (
sum(len(suite.test_cases) for suite in suites.test_suites if suite.test_cases)
if suites.test_suites
else 0
)
metrics.failure = suites.failures
metrics.skipped = suites.skipped or 0
case PlaywrightJUnitXmlTestSuites():
metrics.time = suites.time
metrics.tests = suites.tests
metrics.failure = suites.failures + suites.errors
metrics.skipped = suites.skipped
metrics.fixme = sum(
1
for suite in suites.test_suites
for case in suite.test_cases
if case.properties and any(p.name == "fixme" for p in case.properties.property)
)
# An assumption is made that the presence of a nested system-out tag in
# a test case that contains a link to a trace.zip attachment file as
# content is the result of a retry.
metrics.retry = sum(
1
for suite in suites.test_suites
for case in suite.test_cases
if case.system_out and "trace.zip" in case.system_out
)
case PytestJUnitXmlTestSuites():
metrics.time = sum(suite.time for suite in suites.test_suites)
metrics.tests = sum(suite.tests for suite in suites.test_suites)
metrics.failure = sum(
suite.failures + suite.errors for suite in suites.test_suites
)
metrics.skipped = sum(suite.skipped for suite in suites.test_suites)
case TapJUnitXmlTestSuites():
metrics.tests = sum(suite.tests for suite in suites.test_suites)
# With Tap it's possible for errors to be omitted
metrics.failure = sum(
suite.failures + (suite.errors or 0) for suite in suites.test_suites
)
return metrics
def _parse_results(
self, artifacts_list: list[JUnitXmlJobTestSuites] | None
) -> list[SuiteReporterResult]:
results: list[SuiteReporterResult] = []
if not artifacts_list:
return results
for artifact in artifacts_list:
test_suite_result = SuiteReporterResult(
repository=self.repository,
workflow=self.workflow,
test_suite=self.test_suite,
timestamp=artifact.job_timestamp,
date=self._extract_date(artifact.job_timestamp),
job=artifact.job,
)
times: list[float] = []
for suites in artifact.test_suites:
suite_metrics: SuiteMetrics = self._extract_suite_metrics(suites)
if suite_metrics.time:
times.append(suite_metrics.time)
test_suite_result.failure += suite_metrics.failure
test_suite_result.skipped += suite_metrics.skipped
test_suite_result.success += suite_metrics.success
test_suite_result.fixme += suite_metrics.fixme
test_suite_result.retry += suite_metrics.retry
# Times are not always available, for example with TAP.
# Times at the suites, suite and case level may not sum-up to the same values. This can
# be due to many factors including the use of threads.
test_suite_result.run_time = sum(times)
test_suite_result.execution_time = max(times) if times else 0
results.append(test_suite_result)
# Sort by timestamp and then by job
sorted_results = sorted(results, key=lambda result: (result.timestamp, result.job))
return sorted_results