bot/code_coverage_bot/uploader.py (83 lines of code) (raw):

# -*- coding: utf-8 -*- import itertools import os.path import requests import structlog import tenacity import zstandard as zstd from google.cloud.storage.bucket import Bucket from code_coverage_bot.secrets import secrets from code_coverage_tools.gcp import get_bucket logger = structlog.get_logger(__name__) GCP_COVDIR_PATH = "{repository}/{revision}/{platform}:{suite}.json.zstd" def gcp(repository, revision, report, platform, suite): """ Upload a grcov raw report on Google Cloud Storage * Compress with zstandard * Upload on bucket using revision in name * Trigger ingestion on channel's backend """ assert isinstance(report, bytes) assert isinstance(platform, str) assert isinstance(suite, str) bucket = get_bucket(secrets[secrets.GOOGLE_CLOUD_STORAGE]) # Compress report compressor = zstd.ZstdCompressor(threads=-1) archive = compressor.compress(report) # Upload archive path = GCP_COVDIR_PATH.format( repository=repository, revision=revision, platform=platform, suite=suite ) blob = bucket.blob(path) blob.upload_from_string(archive) # Update headers blob.content_type = "application/json" blob.content_encoding = "zstd" blob.patch() logger.info("Uploaded {} on {}".format(path, bucket)) # Trigger ingestion on backend gcp_ingest(repository, revision, platform, suite) return blob def gcp_covdir_exists( bucket: Bucket, repository: str, revision: str, platform: str, suite: str ) -> bool: """ Check if a covdir report exists on the Google Cloud Storage bucket """ path = GCP_COVDIR_PATH.format( repository=repository, revision=revision, platform=platform, suite=suite ) blob = bucket.blob(path) return blob.exists() @tenacity.retry( stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_exponential(multiplier=1, min=16, max=64), reraise=True, ) def gcp_ingest(repository, revision, platform, suite): """ The GCP report ingestion is triggered remotely on a backend by making a simple HTTP request on the /v2/path endpoint By specifying the exact new revision processed, the backend will download automatically the new report. """ params = {"repository": repository, "changeset": revision} if platform: params["platform"] = platform if suite: params["suite"] = suite backend_host = secrets[secrets.BACKEND_HOST] logger.info( "Ingesting report on backend", host=backend_host, repository=repository, revision=revision, platform=platform, suite=suite, ) resp = requests.get("{}/v2/path".format(backend_host), params=params) resp.raise_for_status() logger.info("Successfully ingested report on backend !") return resp def gcp_latest(repository): """ List the latest reports ingested on the backend """ params = {"repository": repository} backend_host = secrets[secrets.BACKEND_HOST] resp = requests.get("{}/v2/latest".format(backend_host), params=params) resp.raise_for_status() return resp.json() def covdir_paths(report): """ Load a covdir report and recursively list all the paths """ assert isinstance(report, dict) def _extract(obj, base_path=""): out = [] children = obj.get("children", {}) if children: # Recursive on folder files out += itertools.chain( *[ _extract(child, os.path.join(base_path, obj["name"])) for child in children.values() ] ) else: # Add full filename out.append(os.path.join(base_path, obj["name"])) return out return _extract(report)