tools/code_coverage_tools/gcp.py (70 lines of code) (raw):
# -*- coding: utf-8 -*-
import os
import re
from datetime import datetime
from datetime import timedelta
from typing import Iterator
from typing import Optional
from typing import Tuple
import pytz
import structlog
import zstandard
from google.cloud import storage as gcp_storage
from google.oauth2.service_account import Credentials
logger = structlog.get_logger(__name__)
DEFAULT_FILTER = "all"
def get_bucket(service_account: dict) -> gcp_storage.bucket.Bucket:
"""
Build a Google Cloud Storage client & bucket
from Taskcluster secret
"""
# Load credentials from Taskcluster secret
if "bucket" not in service_account:
raise KeyError("Missing bucket in GOOGLE_CLOUD_STORAGE")
bucket = service_account["bucket"]
# Use those credentials to create a Storage client
# The project is needed to avoid checking env variables and crashing
creds = Credentials.from_service_account_info(service_account)
client = gcp_storage.Client(project=creds.project_id, credentials=creds)
return client.get_bucket(bucket)
def get_name(repository: str, changeset: str, platform: str, suite: str) -> str:
return f"{repository}/{changeset}/{platform}:{suite}"
def download_report(
base_dir: str, bucket: gcp_storage.bucket.Bucket, name: str
) -> bool:
path = f"{name}.json"
archive_path = f"{name}.json.zstd"
full_archive_path = os.path.join(base_dir, archive_path)
full_path = os.path.join(base_dir, path)
blob = bucket.blob(archive_path)
if not blob.exists():
logger.debug("No report found on GCP", path=archive_path)
return False
if os.path.exists(full_path):
logger.info("Report already available", path=full_path)
return True
os.makedirs(os.path.dirname(full_archive_path), exist_ok=True)
blob.download_to_filename(full_archive_path, raw_download=True)
logger.info("Downloaded report archive", path=full_archive_path)
with open(full_path, "wb") as output:
with open(full_archive_path, "rb") as archive:
dctx = zstandard.ZstdDecompressor()
reader = dctx.stream_reader(archive)
while True:
chunk = reader.read(16384)
if not chunk:
break
output.write(chunk)
os.unlink(full_archive_path)
return True
def list_reports(
bucket: gcp_storage.bucket.Bucket, repository: str, until: Optional[datetime] = None
) -> Iterator[Tuple[str, str, str]]:
REGEX_BLOB = re.compile(
r"^{}/(\w+)/([\w\-]+):([\w\-]+).json.zstd$".format(repository)
)
now = datetime.utcnow().replace(tzinfo=pytz.UTC)
for blob in bucket.list_blobs(prefix=repository):
if isinstance(until, timedelta) and (now - blob.time_created) >= until:
logger.debug(f"Skipping old blob {blob}")
continue
# Get changeset from blob name
match = REGEX_BLOB.match(blob.name)
if match is None:
logger.warn("Invalid blob found {}".format(blob.name))
continue
changeset = match.group(1)
platform = match.group(2)
suite = match.group(3)
# Build report instance and ingest it
yield changeset, platform, suite