in bot/code_coverage_bot/commit_coverage.py [0:0]
def generate(server_address: str, repo_dir: str, out_dir: str = ".") -> None:
start_time = time.monotonic()
commit_coverage_path = os.path.join(out_dir, "commit_coverage.json.zst")
assert (
secrets[secrets.GOOGLE_CLOUD_STORAGE] is not None
), "Missing GOOGLE_CLOUD_STORAGE secret"
bucket = get_bucket(secrets[secrets.GOOGLE_CLOUD_STORAGE])
blob = bucket.blob("commit_coverage.json.zst")
if blob.exists():
dctx = zstandard.ZstdDecompressor()
commit_coverage = json.loads(
dctx.decompress(blob.download_as_bytes(raw_download=True))
)
else:
commit_coverage = {}
cctx = zstandard.ZstdCompressor(threads=-1)
def _upload():
blob = bucket.blob("commit_coverage.json.zst")
blob.upload_from_string(
cctx.compress(json.dumps(commit_coverage).encode("ascii"))
)
blob.content_type = "application/json"
blob.content_encoding = "zstd"
blob.patch()
# We are only interested in "overall" coverage, not platform or suite specific.
changesets_to_analyze = [
changeset
for changeset, platform, suite in list_reports(bucket, "mozilla-central")
if platform == DEFAULT_FILTER and suite == DEFAULT_FILTER
]
# Skip already analyzed changesets.
changesets_to_analyze = [
changeset
for changeset in changesets_to_analyze
if changeset not in commit_coverage
]
# Use the local server to generate the coverage mapping, as it is faster and
# correct.
def analyze_changeset(changeset_to_analyze: str) -> None:
report_name = get_name(
"mozilla-central", changeset_to_analyze, DEFAULT_FILTER, DEFAULT_FILTER
)
assert download_report(
os.path.join(out_dir, "ccov-reports"), bucket, report_name
)
with open(
os.path.join(out_dir, "ccov-reports", f"{report_name}.json"), "r"
) as f:
report = json.load(f)
phabricatorUploader = PhabricatorUploader(
repo_dir, changeset_to_analyze, warnings_enabled=False
)
# Use the hg.mozilla.org server to get the automation relevant changesets, since
# this information is broken in our local repo (which mozilla-unified).
with hgmo.HGMO(server_address=server_address) as hgmo_remote_server:
changesets = hgmo_remote_server.get_automation_relevance_changesets(
changeset_to_analyze
)
results = phabricatorUploader.generate(thread_local.hg, report, changesets)
for changeset in changesets:
# Lookup changeset coverage from phabricator uploader
coverage = results.get(changeset["node"])
if coverage is None:
logger.info("No coverage found", changeset=changeset)
commit_coverage[changeset["node"]] = None
continue
commit_coverage[changeset["node"]] = {
"added": sum(c["lines_added"] for c in coverage["paths"].values()),
"covered": sum(c["lines_covered"] for c in coverage["paths"].values()),
"unknown": sum(c["lines_unknown"] for c in coverage["paths"].values()),
}
max_workers = min(32, (os.cpu_count() or 1) + 4)
logger.info(f"Analyzing {len(changesets_to_analyze)} with {max_workers} workers")
with ThreadPoolExecutorResult(
initializer=_init_thread, initargs=(repo_dir,)
) as executor:
futures = [
executor.submit(analyze_changeset, changeset)
for changeset in changesets_to_analyze
]
for changeset, future in tqdm(
zip(changesets_to_analyze, concurrent.futures.as_completed(futures)),
total=len(futures),
):
exc = future.exception()
if exc is not None:
logger.error(f"Exception {exc} while analyzing {changeset}")
if time.monotonic() - start_time >= 600:
_upload()
start_time = time.monotonic()
while len(hg_servers) > 0:
hg_server = hg_servers.pop()
hg_server.close()
_upload()
with open(commit_coverage_path, "wb") as zf:
with cctx.stream_writer(zf) as compressor:
with io.TextIOWrapper(compressor, encoding="ascii") as f:
json.dump(commit_coverage, f)