backend/code_coverage_backend/api.py (89 lines of code) (raw):
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import structlog
from flask import abort
from code_coverage_backend.gcp import load_cache
from code_coverage_backend.report import DEFAULT_FILTER
from code_coverage_tools import COVERAGE_EXTENSIONS
DEFAULT_REPOSITORY = "mozilla-central"
logger = structlog.get_logger(__name__)
def coverage_supported_extensions():
"""
List all the file extensions we currently support
"""
return COVERAGE_EXTENSIONS
def coverage_latest(repository=DEFAULT_REPOSITORY):
"""
List the last 10 reports available on the server
"""
gcp = load_cache()
if gcp is None:
logger.error("No GCP cache available")
abort(500)
try:
return [
{"revision": report.changeset, "push": report.push_id}
for report in gcp.list_reports(repository, nb=10)
]
except Exception as e:
logger.warn("Failed to retrieve latest reports: {}".format(e))
abort(404)
def coverage_for_path(
path="",
changeset=None,
repository=DEFAULT_REPOSITORY,
platform=DEFAULT_FILTER,
suite=DEFAULT_FILTER,
):
"""
Aggregate coverage for a path, regardless of its type:
* file, gives its coverage percent
* directory, gives coverage percent for its direct sub elements
files and folders (recursive average)
"""
gcp = load_cache()
if gcp is None:
logger.error("No GCP cache available")
abort(500)
try:
if changeset:
# Find closest report matching this changeset
report = gcp.find_closest_report(repository, changeset, platform, suite)
else:
# Fallback to latest report
report = gcp.find_report(repository, platform, suite)
except Exception as e:
logger.warn("Failed to retrieve report: {}".format(e))
abort(404)
# Load tests data from GCP
try:
return gcp.get_coverage(report, path)
except Exception as e:
logger.warn(
"Failed to load coverage",
repo=repository,
changeset=changeset,
path=path,
error=str(e),
)
abort(400)
def coverage_history(
repository=DEFAULT_REPOSITORY,
path="",
start=None,
end=None,
platform=DEFAULT_FILTER,
suite=DEFAULT_FILTER,
):
"""
List overall coverage from ingested reports over a period of time
"""
gcp = load_cache()
if gcp is None:
logger.error("No GCP cache available")
abort(500)
try:
return gcp.get_history(repository, path, start, end, platform, suite)
except Exception as e:
logger.warn(
"Failed to load history",
repo=repository,
path=path,
start=start,
end=end,
error=str(e),
)
abort(400)
def coverage_filters(repository=DEFAULT_REPOSITORY):
"""
List all available filters for that repository
"""
gcp = load_cache()
if gcp is None:
logger.error("No GCP cache available")
abort(500)
try:
return {
"platforms": gcp.get_platforms(repository),
"suites": gcp.get_suites(repository),
}
except Exception as e:
logger.warn("Failed to load filters", repo=repository, error=str(e))
abort(400)